Intermediate

You’re downloading files, calling APIs, or waiting on slow I/O — and your program does all of this sequentially, sitting idle while one operation completes before starting the next. For I/O-bound work like network requests, file reads, or database calls, Python’s threading module lets you run multiple operations concurrently so your program doesn’t waste time waiting. A download that takes 10 seconds sequentially for 10 files can complete in roughly 1-2 seconds when run in parallel threads.

Python’s threading module is part of the standard library and available in every Python installation. The important context: because of the Global Interpreter Lock (GIL), Python threads don’t run Python bytecode truly in parallel on multiple CPU cores. But for I/O-bound work (network, disk, database), threads spend most of their time waiting — and the GIL releases during I/O waits, so threads genuinely run concurrently for this type of work. For CPU-bound tasks, use multiprocessing instead.

In this tutorial you’ll learn how to create and start threads, use daemon vs non-daemon threads, synchronize with Lock and Event, build a producer-consumer pattern with Queue, and use ThreadPoolExecutor for managing pools of worker threads. By the end you’ll be able to write thread-safe concurrent programs for I/O-bound workloads.

Python threading: Quick Example

Here’s a simple example showing how threading reduces the total time for multiple I/O-bound tasks:

# threading_quick.py
import threading
import time

def download_file(filename: str, delay: float) -> None:
    """Simulate downloading a file (I/O wait)."""
    print(f"  Starting: {filename}")
    time.sleep(delay)  # simulate network I/O
    print(f"  Done:     {filename}")

files = [("report.pdf", 2), ("data.csv", 1.5), ("image.png", 1)]

# Sequential (slow)
print("=== Sequential ===")
start = time.perf_counter()
for name, delay in files:
    download_file(name, delay)
print(f"Sequential time: {time.perf_counter() - start:.1f}s")

# Threaded (fast)
print("\n=== Threaded ===")
start = time.perf_counter()
threads = [threading.Thread(target=download_file, args=(name, delay)) for name, delay in files]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Threaded time:   {time.perf_counter() - start:.1f}s")

Output:

=== Sequential ===
  Starting: report.pdf
  Done:     report.pdf
  Starting: data.csv
  Done:     data.csv
  Starting: image.png
  Done:     image.png
Sequential time: 4.5s

=== Threaded ===
  Starting: report.pdf
  Starting: data.csv
  Starting: image.png
  Done:     image.png
  Done:     data.csv
  Done:     report.pdf
Threaded time:   2.0s

The threaded version finishes in roughly 2 seconds — the time of the longest single task — while sequential takes 4.5 seconds total. t.start() begins the thread and returns immediately; t.join() blocks the main thread until that thread finishes. Always call join() to ensure threads complete before your program exits.

What Is Python Threading and When To Use It?

A thread is a unit of execution within a process. Unlike processes (which have separate memory), threads in the same process share memory, which makes communication easy but also introduces the risk of data races when multiple threads write to the same variable simultaneously.

Approach Best For GIL Impact Memory
threading I/O-bound: network, files, DB Releases during I/O waits Shared
multiprocessing CPU-bound: math, compression No GIL (separate processes) Separate
asyncio I/O-bound with many connections Single thread, cooperative Shared
concurrent.futures Either, with high-level API Same as threading/MP Depends

Use threading when: you have I/O-bound work, you need to run multiple blocking operations concurrently, or you’re integrating with libraries that don’t support asyncio. Avoid threading for: CPU-bound number crunching (use multiprocessing), or highly concurrent network applications (use asyncio).

Stack Trace Steve with padlock key
I/O-bound threads: the GIL releases while they wait. CPU-bound threads: not so lucky.

Creating and Managing Threads

You create a thread by passing a callable to threading.Thread(target=func, args=(...), kwargs={...}). You can also subclass Thread and override its run() method for more complex thread logic. Threads can be daemon threads (killed automatically when the main program exits) or non-daemon threads (the program waits for them to finish).

# threading_create.py
import threading
import time

# Method 1: thread with target function
def worker(name: str, count: int) -> None:
    for i in range(count):
        print(f"  [{name}] step {i+1}/{count}")
        time.sleep(0.1)

# Method 2: subclass Thread
class CounterThread(threading.Thread):
    def __init__(self, name: str, limit: int):
        super().__init__(name=name)
        self.limit = limit
        self.result = 0  # store result as attribute

    def run(self) -> None:
        for i in range(self.limit):
            self.result += i
            time.sleep(0.05)

# Create and start function-based threads
t1 = threading.Thread(target=worker, args=("Alpha", 3))
t2 = threading.Thread(target=worker, args=("Beta", 3))

print("=== Function-based threads ===")
t1.start()
t2.start()
t1.join()
t2.join()

# Daemon thread -- dies when main thread exits
print("\n=== Daemon thread ===")
def background_monitor():
    while True:
        print("  [monitor] heartbeat")
        time.sleep(0.3)

monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
time.sleep(0.7)
print("Main thread done -- daemon stops automatically")

# Subclass-based thread with result
print("\n=== Subclass-based thread ===")
counter = CounterThread("Counter", limit=10)
counter.start()
counter.join()
print(f"  Sum 0..9 = {counter.result}")

# Thread metadata
print(f"\nActive threads: {threading.active_count()}")
print(f"Current thread: {threading.current_thread().name}")

Output:

=== Function-based threads ===
  [Alpha] step 1/3
  [Beta] step 1/3
  [Alpha] step 2/3
  [Beta] step 2/3
  [Alpha] step 3/3
  [Beta] step 3/3
=== Daemon thread ===
  [monitor] heartbeat
  [monitor] heartbeat
Main thread done -- daemon stops automatically
=== Subclass-based thread ===
  Sum 0..9 = 45
Active threads: 1
Current thread: MainThread

Daemon threads are useful for background tasks like health monitors, log flushers, and cleanup workers that should not prevent the program from exiting. The key risk: daemon threads are killed immediately when the main thread exits, so they should never be in the middle of a critical operation (like writing to a database) when that happens. Use non-daemon threads for any work that must complete before exit.

Thread Safety with Lock and RLock

When multiple threads read and write the same shared data, you need synchronization to prevent data corruption. The most common tool is threading.Lock — a mutual exclusion lock that ensures only one thread accesses a critical section at a time.

# threading_lock.py
import threading
import time

# Unsafe: multiple threads incrementing a shared counter
class UnsafeCounter:
    def __init__(self):
        self.value = 0

    def increment(self, n: int) -> None:
        for _ in range(n):
            current = self.value
            time.sleep(0)  # yield to other threads
            self.value = current + 1

# Safe: protected with Lock
class SafeCounter:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def increment(self, n: int) -> None:
        for _ in range(n):
            with self._lock:   # acquire/release automatically
                self.value += 1

def run_counters(counter_class, n_threads=5, increments=1000):
    counter = counter_class()
    threads = [threading.Thread(target=counter.increment, args=(increments,))
               for _ in range(n_threads)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    expected = n_threads * increments
    print(f"  {counter_class.__name__}: got {counter.value}, expected {expected}, "
          f"{'OK' if counter.value == expected else 'DATA RACE!'}")

print("Counter comparison (5 threads x 1000 increments = 5000 expected):")
run_counters(UnsafeCounter)
run_counters(SafeCounter)

# RLock -- reentrant lock for recursive code
print("\n=== RLock (reentrant) ===")
rlock = threading.RLock()

def recursive_task(depth: int, lock: threading.RLock) -> None:
    with lock:
        if depth > 0:
            print(f"  Depth {depth}, acquiring lock again...")
            recursive_task(depth - 1, lock)   # same thread can re-acquire RLock
        else:
            print(f"  Base case")

recursive_task(3, rlock)
print("  RLock released all levels -- done")

Output:

Counter comparison (5 threads x 1000 increments = 5000 expected):
  UnsafeCounter: got 3847, expected 5000, DATA RACE!
  SafeCounter: got 5000, expected 5000, OK
=== RLock (reentrant) ===
  Depth 3, acquiring lock again...
  Depth 2, acquiring lock again...
  Depth 1, acquiring lock again...
  Base case
  RLock released all levels -- done

The with self._lock: pattern is the correct way to use locks — it ensures the lock is always released, even if the code inside raises an exception. Never use lock.acquire() without a matching lock.release() in a finally block. Use RLock instead of Lock when the same thread might need to acquire the lock multiple times (e.g., in recursive functions) — a regular Lock deadlocks if the same thread tries to acquire it twice.

Cache Katie at mission control
threading.Lock() — one thread in, everyone else waits. That’s the deal.

Events, Barriers, and Thread Communication

Beyond locks, Python’s threading module provides higher-level synchronization primitives for coordinating thread behavior. Event signals between threads, Barrier makes a group of threads wait for each other, and Queue (from the queue module) is the preferred way to pass data between threads safely.

# threading_event.py
import threading
import time
import queue

# Event: one thread signals another
print("=== Event: start signal ===")
start_event = threading.Event()
results = []

def waiter(name: str, event: threading.Event) -> None:
    print(f"  [{name}] waiting for signal...")
    event.wait()   # blocks until event.set() is called
    print(f"  [{name}] received signal, working...")
    time.sleep(0.1)
    results.append(name)

workers = [threading.Thread(target=waiter, args=(f"Worker-{i}", start_event)) for i in range(3)]
for w in workers:
    w.start()

time.sleep(0.3)
print("  [main] sending start signal")
start_event.set()  # wake up all waiting threads

for w in workers:
    w.join()
print(f"  Results: {results}")

# Queue: producer-consumer pattern
print("\n=== Queue: producer-consumer ===")
work_queue = queue.Queue(maxsize=3)

def producer(q: queue.Queue, items: list) -> None:
    for item in items:
        q.put(item)   # blocks if queue is full
        print(f"  [producer] added: {item}")
    q.put(None)  # sentinel to signal done

def consumer(q: queue.Queue) -> None:
    while True:
        item = q.get()   # blocks until item available
        if item is None:
            q.task_done()
            break
        print(f"  [consumer] processing: {item}")
        time.sleep(0.1)
        q.task_done()

tasks = ["task-A", "task-B", "task-C", "task-D"]
prod = threading.Thread(target=producer, args=(work_queue, tasks))
cons = threading.Thread(target=consumer, args=(work_queue,))

prod.start()
cons.start()
prod.join()
cons.join()
work_queue.join()
print("  All tasks processed")

Output:

=== Event: start signal ===
  [Worker-0] waiting for signal...
  [Worker-1] waiting for signal...
  [Worker-2] waiting for signal...
  [main] sending start signal
  [Worker-0] received signal, working...
  [Worker-1] received signal, working...
  [Worker-2] received signal, working...
  Results: ['Worker-0', 'Worker-1', 'Worker-2']
=== Queue: producer-consumer ===
  [producer] added: task-A
  [producer] added: task-B
  [consumer] processing: task-A
  [producer] added: task-C
  [consumer] processing: task-B
  ...
  All tasks processed

queue.Queue is thread-safe by design — its internal lock means you never need a separate Lock to protect it. The sentinel value (None) pattern is the standard way to signal a consumer to stop: put a None in the queue for each consumer thread. Always call q.task_done() after processing each item if you’re using q.join() to wait for completion.

Real-Life Example: Concurrent URL Checker

A thread pool-based URL health checker that checks multiple URLs concurrently and reports their status, using ThreadPoolExecutor for clean thread management.

Pyro Pete typing with thread dials
ThreadPoolExecutor manages the pool. You manage the panic when URLs go red.
# url_checker.py
import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen
from urllib.error import URLError, HTTPError

def check_url(url: str, timeout: int = 5) -> dict:
    """Check if a URL is reachable. Returns status dict."""
    start = time.perf_counter()
    try:
        response = urlopen(url, timeout=timeout)
        elapsed = time.perf_counter() - start
        return {
            "url": url,
            "status": response.status,
            "ok": response.status < 400,
            "elapsed_ms": round(elapsed * 1000),
            "error": None
        }
    except HTTPError as e:
        return {"url": url, "status": e.code, "ok": False,
                "elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e)}
    except URLError as e:
        return {"url": url, "status": 0, "ok": False,
                "elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e.reason)}
    except Exception as e:
        return {"url": url, "status": 0, "ok": False,
                "elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e)}

def check_urls_concurrent(urls: list[str], max_workers: int = 5) -> list[dict]:
    """Check multiple URLs concurrently using a thread pool."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_url = {executor.submit(check_url, url): url for url in urls}
        # Collect results as they complete
        for future in as_completed(future_to_url):
            result = future.result()
            results.append(result)
            status_icon = "OK " if result["ok"] else "FAIL"
            print(f"  [{status_icon}] {result['url'][:50]:<50} "
                  f"{result['status']:>3}  {result['elapsed_ms']:>5}ms")
    return results

# Check a set of real public URLs
urls_to_check = [
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/404",
    "https://httpbin.org/delay/1",
    "https://jsonplaceholder.typicode.com/posts/1",
    "https://quotes.toscrape.com/",
]

print(f"Checking {len(urls_to_check)} URLs with 5 threads...\n")
print(f"  {'STATUS':<6} {'URL':<50} {'CODE':>4} {'TIME':>6}")
print("  " + "-" * 68)

start = time.perf_counter()
results = check_urls_concurrent(urls_to_check)
total_time = time.perf_counter() - start

print(f"\nCompleted in {total_time:.1f}s")
ok_count = sum(1 for r in results if r["ok"])
print(f"  {ok_count}/{len(results)} URLs healthy")
avg_ms = sum(r["elapsed_ms"] for r in results) / len(results)
print(f"  Avg response time: {avg_ms:.0f}ms")

Output:

Checking 5 URLs with 5 threads...

  STATUS URL                                                CODE   TIME
  --------------------------------------------------------------------
  [OK ] https://httpbin.org/status/200                      200    312ms
  [FAIL] https://httpbin.org/status/404                     404    298ms
  [OK ] https://jsonplaceholder.typicode.com/posts/1        200    189ms
  [OK ] https://quotes.toscrape.com/                        200    245ms
  [OK ] https://httpbin.org/delay/1                         200   1051ms

Completed in 1.1s
  4/5 URLs healthy
  Avg response time: 419ms

ThreadPoolExecutor is the modern way to manage a pool of worker threads — it handles thread creation, reuse, and cleanup automatically. as_completed() returns futures as they finish (not in submission order), so faster URLs report first. Extend this checker with retry logic for timeouts, a CSV report of results, or a –watch mode that re-checks URLs every 30 seconds using a daemon thread.

Frequently Asked Questions

If Python has the GIL, why use threading at all?

The GIL prevents multiple threads from running Python bytecode simultaneously on different CPU cores — this affects CPU-bound work. But for I/O-bound work, threads spend most of their time waiting for network or disk responses, not executing Python code. Python releases the GIL during I/O system calls, so multiple threads can have their I/O in-flight simultaneously. The result: for I/O-bound workloads (HTTP requests, file reads, database queries), threading provides genuine concurrency and real speedups.

Should I use Thread directly or ThreadPoolExecutor?

Use ThreadPoolExecutor from concurrent.futures for most cases — it handles thread lifecycle, limits concurrency via the pool size, and makes it easy to collect results with Future.result(). Use threading.Thread directly when you need fine-grained control over thread behavior (daemon settings, join() ordering, custom run() methods), or when you’re implementing a specific synchronization pattern like producer-consumer with Queue.

How do I avoid deadlocks?

A deadlock occurs when two threads each hold a lock the other needs. The main prevention rules: always acquire locks in the same order across all threads, use with lock: (never acquire() without release()), prefer queue.Queue for inter-thread communication over shared variables with locks, and use lock.acquire(timeout=5) to detect potential deadlocks instead of blocking forever. If you suspect a deadlock, print threading.enumerate() to see which threads are alive and stuck.

What is threading.local() used for?

threading.local() creates thread-local storage — each thread gets its own independent copy of the data. This is how Flask stores the current request object (each request handler thread gets its own request context) and how SQLAlchemy session scoping works. Use it when you need per-thread state that shouldn’t be shared, like a database connection or a user session object. Access it via attribute assignment: local_data = threading.local(); local_data.user_id = 42.

How do I stop a thread cleanly?

Python doesn’t provide a way to forcibly kill a thread — you must signal it to stop itself. The standard pattern is a shared stop flag: stop_event = threading.Event(). The thread checks if stop_event.is_set(): break in its loop, and the main thread calls stop_event.set() to signal it to stop. For blocking operations (like queue.get()), use queue.get(timeout=1) with a loop so the thread can periodically check the stop flag. Never use thread._stop() — it’s a private API and can leave shared resources in a corrupted state.

Conclusion

Python’s threading module is the right tool for I/O-bound concurrent work. The core tools are Thread for creating threads, Lock for protecting shared state, Event for signaling between threads, Queue for safe inter-thread data transfer, and ThreadPoolExecutor for managed thread pools. The URL checker above is a template for any concurrent I/O task — adapt it for batch API calls, file processing pipelines, or database operations.

For CPU-bound parallelism, switch to multiprocessing.Pool or ProcessPoolExecutor. For very high concurrency I/O (hundreds of simultaneous connections), consider asyncio with aiohttp.

Official documentation: https://docs.python.org/3/library/threading.html