Intermediate

You’re building a web scraper that fetches URLs in parallel and processes the results. Or a data pipeline where one thread downloads files while another parses them. Or a task processor where a main thread receives jobs and worker threads execute them. In all of these cases you need multiple threads to communicate safely without corrupting shared state. Using a plain list for this is a race condition waiting to happen. Python’s queue module is the right tool.

The queue module provides Queue, LifoQueue, and PriorityQueue — thread-safe data structures that use internal locks to guarantee that concurrent put() and get() calls don’t corrupt data or create race conditions. They also handle blocking automatically: a thread calling get() on an empty queue waits until an item is available, which is the foundation of the producer-consumer pattern. The module is in the standard library, no installation required.

In this article we’ll cover the three queue types, the basic producer-consumer pattern, how to signal workers to stop using sentinel values, bounded queues for backpressure, priority queues for ordered processing, and a complete multi-threaded downloader example. We’ll also show common mistakes that cause threads to hang forever.

queue Module Quick Example

Here’s the minimal producer-consumer pattern — one thread puts items into a queue, another takes them out and processes them:

# quick_queue.py
import queue
import threading
import time

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(f"item_{i}")
        time.sleep(0.1)
    q.put(None)  # sentinel to signal "done"

def consumer():
    while True:
        item = q.get()
        if item is None:
            print("Consumer: done.")
            q.task_done()
            break
        print(f"Consumer: processed {item}")
        q.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

Output:

Consumer: processed item_0
Consumer: processed item_1
Consumer: processed item_2
Consumer: processed item_3
Consumer: processed item_4
Consumer: done.

There are three patterns here that matter. First, the queue handles all locking automatically — no threading.Lock() needed. Second, the consumer calls q.task_done() after processing each item — this is required if you use q.join() elsewhere to wait for all items to finish. Third, the None sentinel is the standard signal to tell a worker thread to stop. We’ll revisit all of these in detail below.

The Three Queue Types

Python provides three queue implementations with the same API but different ordering semantics:

ClassOrderingUse case
queue.QueueFIFO (first in, first out)Task queues, pipelines, most cases
queue.LifoQueueLIFO (last in, first out — a stack)Depth-first search, undo stacks
queue.PriorityQueueLowest priority number firstTask scheduling, event processing by urgency

All three share the same core methods: put(item), get(), task_done(), join(), qsize(), empty(), and full(). They also all accept a maxsize parameter to cap the queue size. The choice between them is entirely about ordering — the thread-safety guarantees are identical.

Producer-consumer pattern
A list with a lock. A contract between threads. A queue.

Producer-Consumer Pattern

The producer-consumer pattern is the fundamental building block for threaded pipelines. Producers put work items in the queue; consumers get them and do the work. The queue is the communication channel.

Multiple Workers

For CPU or I/O-heavy tasks, you want multiple consumer workers. The cleanest way to stop multiple workers is to put one None sentinel per worker:

# multi_worker.py
import queue
import threading
import time
import random

NUM_WORKERS = 3
task_queue = queue.Queue()
results = []
results_lock = threading.Lock()


def worker(worker_id: int):
    """Process tasks from the queue until a None sentinel is received."""
    while True:
        task = task_queue.get()
        if task is None:
            print(f"Worker {worker_id}: stopping.")
            task_queue.task_done()
            return
        # Simulate work
        duration = random.uniform(0.1, 0.4)
        time.sleep(duration)
        result = f"task_{task}_done_by_worker_{worker_id}"
        with results_lock:
            results.append(result)
        task_queue.task_done()
        print(f"Worker {worker_id}: completed task {task} in {duration:.2f}s")


# Start workers
workers = []
for i in range(NUM_WORKERS):
    t = threading.Thread(target=worker, args=(i,))
    t.start()
    workers.append(t)

# Submit 10 tasks
for task_id in range(10):
    task_queue.put(task_id)

# Send one sentinel per worker to shut them down
for _ in range(NUM_WORKERS):
    task_queue.put(None)

# Wait for all tasks to finish
task_queue.join()
for t in workers:
    t.join()

print(f"\nCompleted {len(results)} tasks total.")

Output (order varies by timing):

Worker 0: completed task 0 in 0.12s
Worker 1: completed task 1 in 0.19s
Worker 2: completed task 2 in 0.11s
Worker 0: completed task 3 in 0.38s
...
Worker 0: stopping.
Worker 1: stopping.
Worker 2: stopping.

Completed 10 tasks total.

The critical pattern here: put exactly NUM_WORKERS sentinels after all the real tasks. Each worker stops when it receives one sentinel. If you put too few, some workers wait forever. If the workers share mutable state (like the results list), protect it with a separate lock — the queue only protects the queue itself, not your application data.

Bounded Queues for Backpressure

By default, queue.Queue is unbounded — a producer can put items faster than consumers process them, causing unbounded memory growth. Set maxsize to apply backpressure: when the queue is full, put() blocks until a consumer takes an item out.

# bounded_queue.py
import queue
import threading
import time

# Queue holds at most 3 items -- producer will block when full
MAXSIZE = 3
q = queue.Queue(maxsize=MAXSIZE)


def fast_producer():
    for i in range(10):
        print(f"Producer: putting item {i} (queue size: {q.qsize()})")
        q.put(i)  # blocks if queue is full
    q.put(None)  # sentinel


def slow_consumer():
    while True:
        item = q.get()
        if item is None:
            q.task_done()
            print("Consumer: done.")
            return
        time.sleep(0.3)  # simulate slow processing
        print(f"Consumer: processed item {item}")
        q.task_done()


t1 = threading.Thread(target=fast_producer)
t2 = threading.Thread(target=slow_consumer)
t1.start()
t2.start()
t1.join()
t2.join()

Output (shows natural backpressure):

Producer: putting item 0 (queue size: 0)
Producer: putting item 1 (queue size: 1)
Producer: putting item 2 (queue size: 2)
Producer: putting item 3 (queue size: 3)   <-- blocks here until consumer processes item 0
Consumer: processed item 0
Producer: putting item 4 (queue size: 3)
...

Bounded queues are essential in production pipelines where producers are faster than consumers. Without a bound, you’d accumulate millions of items in memory before a consumer crash or slowdown becomes visible. The blocking behavior of put() when the queue is full is exactly what you want — it slows down the producer to match the consumer’s pace.

Queue maxsize limit
maxsize=3 is the speed bump that keeps your RAM from exploding at 3am.

PriorityQueue for Ordered Processing

When tasks have different urgency levels, PriorityQueue processes the lowest-priority-number item first. Items are tuples of (priority_number, data) — Python compares tuples element by element, so the number determines order:

# priority_queue.py
import queue
import threading

pq = queue.PriorityQueue()

# Lower number = higher priority
pq.put((1, "CRITICAL: Payment service down"))
pq.put((3, "LOW: Update help docs"))
pq.put((2, "HIGH: Slow API response"))
pq.put((1, "CRITICAL: Database unreachable"))
pq.put((2, "HIGH: Memory usage spike"))

print("Processing in priority order:")
while not pq.empty():
    priority, task = pq.get()
    label = {1: "CRITICAL", 2: "HIGH    ", 3: "LOW     "}.get(priority, "UNKNOWN ")
    print(f"  [{label}] {task}")
    pq.task_done()

Output:

Processing in priority order:
  [CRITICAL] CRITICAL: Payment service down
  [CRITICAL] CRITICAL: Database unreachable
  [HIGH    ] HIGH: Slow API response
  [HIGH    ] HIGH: Memory usage spike
  [LOW     ] LOW: Update help docs

Within the same priority level, ordering is not guaranteed — FIFO ordering only applies within a standard Queue. If you need stable ordering within a priority level, add a counter as a tiebreaker: (priority, counter, data). This ensures that two tasks with the same priority are processed in insertion order.

Real-Life Example: Threaded File Downloader

Here’s a complete multi-threaded downloader that processes a list of URLs using a worker pool, collects results, and reports errors — all coordinated through queues:

# threaded_downloader.py
"""Multi-threaded URL fetcher using queue.Queue for work distribution."""
import queue
import threading
import time
import random
from dataclasses import dataclass
from typing import Optional


@dataclass
class DownloadResult:
    url: str
    success: bool
    size_bytes: int = 0
    error: Optional[str] = None
    worker_id: int = 0


def simulate_download(url: str) -> tuple[bool, int]:
    """Simulate downloading a URL. Returns (success, bytes_downloaded)."""
    time.sleep(random.uniform(0.1, 0.5))
    if random.random() < 0.1:  # 10% failure rate
        return False, 0
    return True, random.randint(1000, 500000)


def download_worker(worker_id: int, work_queue: queue.Queue, result_queue: queue.Queue):
    """Worker thread: pull URLs from work_queue, put results in result_queue."""
    while True:
        url = work_queue.get()
        if url is None:
            work_queue.task_done()
            return

        success, size = simulate_download(url)
        result = DownloadResult(
            url=url,
            success=success,
            size_bytes=size,
            error=None if success else "Connection timeout",
            worker_id=worker_id,
        )
        result_queue.put(result)
        work_queue.task_done()


def run_downloader(urls: list[str], num_workers: int = 4) -> list[DownloadResult]:
    work_queue = queue.Queue()
    result_queue = queue.Queue()

    # Start workers
    workers = []
    for i in range(num_workers):
        t = threading.Thread(target=download_worker, args=(i, work_queue, result_queue))
        t.daemon = True
        t.start()
        workers.append(t)

    # Enqueue all URLs
    for url in urls:
        work_queue.put(url)

    # Send sentinels
    for _ in range(num_workers):
        work_queue.put(None)

    # Wait for all work to finish
    work_queue.join()

    # Collect results
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    return results


# Run it
test_urls = [f"https://example.com/file_{i:03d}.dat" for i in range(20)]
results = run_downloader(test_urls, num_workers=4)

succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
total_bytes = sum(r.size_bytes for r in succeeded)

print(f"Downloaded {len(succeeded)}/{len(results)} files successfully")
print(f"Failed: {len(failed)}")
print(f"Total data: {total_bytes:,} bytes")
if failed:
    for r in failed:
        print(f"  FAILED: {r.url} -- {r.error}")

Output (varies by random failures):

Downloaded 18/20 files successfully
Failed: 2
Total data: 4,712,384 bytes
  FAILED: https://example.com/file_007.dat -- Connection timeout
  FAILED: https://example.com/file_015.dat -- Connection timeout

This pattern — work queue in, result queue out, sentinel per worker — is the template for any multi-threaded pipeline. The two-queue design keeps work distribution separate from result collection, and using t.daemon = True ensures worker threads don’t block program exit if something goes wrong. Adapt this to real downloads by replacing simulate_download() with requests.get(url).

Frequently Asked Questions

Why does my program hang after the queue is empty?

Most queue hangs are caused by one of two things: the sentinel was never sent (so workers are still blocking on q.get()), or you called q.join() without calling q.task_done() after every get(). The join() method waits until the internal task counter reaches zero, and that counter only decreases via task_done(). If any item is ever get()-ed without a matching task_done(), join() hangs forever. Make sure every code path through your consumer calls task_done() — including after processing the sentinel.

Is it safe to call q.empty() before q.get()?

In a multi-threaded context, no. q.empty() can return True and then another thread puts an item in before your get() runs — or vice versa, it returns False and another thread takes the item before you do. Don’t use empty() as a guard before get() in multi-threaded code. Instead, use get(block=False) and catch queue.Empty, or use the sentinel pattern. The empty() check is only reliable when you know no other threads are accessing the queue.

Why not just use a list with a lock?

You can, but it’s more work for the same result. A threading.Lock() around a list gives you mutual exclusion, but you still need to write the blocking logic for an empty list (sleep + poll), the bounded-size logic, and the condition variable coordination for waking sleeping threads. queue.Queue does all of this correctly and efficiently using threading.Condition internally. The only reason to use a raw list is if you need random access to items — queues only support FIFO/LIFO/priority access, not arbitrary indexing.

Should I use queue.Queue with asyncio?

No — queue.Queue is for threads, not coroutines. For async code, use asyncio.Queue instead. It has the same API (put(), get(), task_done(), join()) but works with await. Mixing queue.Queue with async code causes the event loop to block on the synchronous get() call. If you need to bridge between threads and asyncio (for example, putting items in a queue from a thread and consuming them in a coroutine), use asyncio.Queue with loop.call_soon_threadsafe().

Does queue.Queue work with multiprocessing?

No — queue.Queue is in-process only. For inter-process communication, use multiprocessing.Queue, which uses pipes and OS-level IPC mechanisms to pass data between separate processes. The API is the same, but the underlying mechanism is completely different. For most parallelism needs, concurrent.futures.ProcessPoolExecutor provides a higher-level interface that handles the queue plumbing for you.

Conclusion

The queue module gives you safe, blocking, thread-aware communication channels without writing any locking code yourself. We covered the three queue types (Queue, LifoQueue, PriorityQueue), the producer-consumer pattern with sentinel shutdown, multiple worker pools, bounded queues for backpressure, and a complete multi-threaded downloader example. The sentinel pattern — one None per worker — is the idiom to memorize above all others.

The natural next step is to combine queue.Queue with threading.Thread and concurrent.futures.ThreadPoolExecutor for real-world I/O-bound workloads. If your bottleneck is CPU-bound rather than I/O-bound, switch to multiprocessing.Queue to bypass the GIL. For async workloads, use asyncio.Queue directly.

For the full reference, see the official queue module documentation.