Intermediate
You’re downloading files, calling APIs, or waiting on slow I/O — and your program does all of this sequentially, sitting idle while one operation completes before starting the next. For I/O-bound work like network requests, file reads, or database calls, Python’s threading module lets you run multiple operations concurrently so your program doesn’t waste time waiting. A download that takes 10 seconds sequentially for 10 files can complete in roughly 1-2 seconds when run in parallel threads.
Python’s threading module is part of the standard library and available in every Python installation. The important context: because of the Global Interpreter Lock (GIL), Python threads don’t run Python bytecode truly in parallel on multiple CPU cores. But for I/O-bound work (network, disk, database), threads spend most of their time waiting — and the GIL releases during I/O waits, so threads genuinely run concurrently for this type of work. For CPU-bound tasks, use multiprocessing instead.
In this tutorial you’ll learn how to create and start threads, use daemon vs non-daemon threads, synchronize with Lock and Event, build a producer-consumer pattern with Queue, and use ThreadPoolExecutor for managing pools of worker threads. By the end you’ll be able to write thread-safe concurrent programs for I/O-bound workloads.
Python threading: Quick Example
Here’s a simple example showing how threading reduces the total time for multiple I/O-bound tasks:
# threading_quick.py
import threading
import time
def download_file(filename: str, delay: float) -> None:
"""Simulate downloading a file (I/O wait)."""
print(f" Starting: {filename}")
time.sleep(delay) # simulate network I/O
print(f" Done: {filename}")
files = [("report.pdf", 2), ("data.csv", 1.5), ("image.png", 1)]
# Sequential (slow)
print("=== Sequential ===")
start = time.perf_counter()
for name, delay in files:
download_file(name, delay)
print(f"Sequential time: {time.perf_counter() - start:.1f}s")
# Threaded (fast)
print("\n=== Threaded ===")
start = time.perf_counter()
threads = [threading.Thread(target=download_file, args=(name, delay)) for name, delay in files]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Threaded time: {time.perf_counter() - start:.1f}s")
Output:
=== Sequential ===
Starting: report.pdf
Done: report.pdf
Starting: data.csv
Done: data.csv
Starting: image.png
Done: image.png
Sequential time: 4.5s
=== Threaded ===
Starting: report.pdf
Starting: data.csv
Starting: image.png
Done: image.png
Done: data.csv
Done: report.pdf
Threaded time: 2.0s
The threaded version finishes in roughly 2 seconds — the time of the longest single task — while sequential takes 4.5 seconds total. t.start() begins the thread and returns immediately; t.join() blocks the main thread until that thread finishes. Always call join() to ensure threads complete before your program exits.
What Is Python Threading and When To Use It?
A thread is a unit of execution within a process. Unlike processes (which have separate memory), threads in the same process share memory, which makes communication easy but also introduces the risk of data races when multiple threads write to the same variable simultaneously.
| Approach | Best For | GIL Impact | Memory |
|---|---|---|---|
| threading | I/O-bound: network, files, DB | Releases during I/O waits | Shared |
| multiprocessing | CPU-bound: math, compression | No GIL (separate processes) | Separate |
| asyncio | I/O-bound with many connections | Single thread, cooperative | Shared |
| concurrent.futures | Either, with high-level API | Same as threading/MP | Depends |
Use threading when: you have I/O-bound work, you need to run multiple blocking operations concurrently, or you’re integrating with libraries that don’t support asyncio. Avoid threading for: CPU-bound number crunching (use multiprocessing), or highly concurrent network applications (use asyncio).

Creating and Managing Threads
You create a thread by passing a callable to threading.Thread(target=func, args=(...), kwargs={...}). You can also subclass Thread and override its run() method for more complex thread logic. Threads can be daemon threads (killed automatically when the main program exits) or non-daemon threads (the program waits for them to finish).
# threading_create.py
import threading
import time
# Method 1: thread with target function
def worker(name: str, count: int) -> None:
for i in range(count):
print(f" [{name}] step {i+1}/{count}")
time.sleep(0.1)
# Method 2: subclass Thread
class CounterThread(threading.Thread):
def __init__(self, name: str, limit: int):
super().__init__(name=name)
self.limit = limit
self.result = 0 # store result as attribute
def run(self) -> None:
for i in range(self.limit):
self.result += i
time.sleep(0.05)
# Create and start function-based threads
t1 = threading.Thread(target=worker, args=("Alpha", 3))
t2 = threading.Thread(target=worker, args=("Beta", 3))
print("=== Function-based threads ===")
t1.start()
t2.start()
t1.join()
t2.join()
# Daemon thread -- dies when main thread exits
print("\n=== Daemon thread ===")
def background_monitor():
while True:
print(" [monitor] heartbeat")
time.sleep(0.3)
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
time.sleep(0.7)
print("Main thread done -- daemon stops automatically")
# Subclass-based thread with result
print("\n=== Subclass-based thread ===")
counter = CounterThread("Counter", limit=10)
counter.start()
counter.join()
print(f" Sum 0..9 = {counter.result}")
# Thread metadata
print(f"\nActive threads: {threading.active_count()}")
print(f"Current thread: {threading.current_thread().name}")
Output:
=== Function-based threads ===
[Alpha] step 1/3
[Beta] step 1/3
[Alpha] step 2/3
[Beta] step 2/3
[Alpha] step 3/3
[Beta] step 3/3
=== Daemon thread ===
[monitor] heartbeat
[monitor] heartbeat
Main thread done -- daemon stops automatically
=== Subclass-based thread ===
Sum 0..9 = 45
Active threads: 1
Current thread: MainThread
Daemon threads are useful for background tasks like health monitors, log flushers, and cleanup workers that should not prevent the program from exiting. The key risk: daemon threads are killed immediately when the main thread exits, so they should never be in the middle of a critical operation (like writing to a database) when that happens. Use non-daemon threads for any work that must complete before exit.
Thread Safety with Lock and RLock
When multiple threads read and write the same shared data, you need synchronization to prevent data corruption. The most common tool is threading.Lock — a mutual exclusion lock that ensures only one thread accesses a critical section at a time.
# threading_lock.py
import threading
import time
# Unsafe: multiple threads incrementing a shared counter
class UnsafeCounter:
def __init__(self):
self.value = 0
def increment(self, n: int) -> None:
for _ in range(n):
current = self.value
time.sleep(0) # yield to other threads
self.value = current + 1
# Safe: protected with Lock
class SafeCounter:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
def increment(self, n: int) -> None:
for _ in range(n):
with self._lock: # acquire/release automatically
self.value += 1
def run_counters(counter_class, n_threads=5, increments=1000):
counter = counter_class()
threads = [threading.Thread(target=counter.increment, args=(increments,))
for _ in range(n_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
expected = n_threads * increments
print(f" {counter_class.__name__}: got {counter.value}, expected {expected}, "
f"{'OK' if counter.value == expected else 'DATA RACE!'}")
print("Counter comparison (5 threads x 1000 increments = 5000 expected):")
run_counters(UnsafeCounter)
run_counters(SafeCounter)
# RLock -- reentrant lock for recursive code
print("\n=== RLock (reentrant) ===")
rlock = threading.RLock()
def recursive_task(depth: int, lock: threading.RLock) -> None:
with lock:
if depth > 0:
print(f" Depth {depth}, acquiring lock again...")
recursive_task(depth - 1, lock) # same thread can re-acquire RLock
else:
print(f" Base case")
recursive_task(3, rlock)
print(" RLock released all levels -- done")
Output:
Counter comparison (5 threads x 1000 increments = 5000 expected):
UnsafeCounter: got 3847, expected 5000, DATA RACE!
SafeCounter: got 5000, expected 5000, OK
=== RLock (reentrant) ===
Depth 3, acquiring lock again...
Depth 2, acquiring lock again...
Depth 1, acquiring lock again...
Base case
RLock released all levels -- done
The with self._lock: pattern is the correct way to use locks — it ensures the lock is always released, even if the code inside raises an exception. Never use lock.acquire() without a matching lock.release() in a finally block. Use RLock instead of Lock when the same thread might need to acquire the lock multiple times (e.g., in recursive functions) — a regular Lock deadlocks if the same thread tries to acquire it twice.

Events, Barriers, and Thread Communication
Beyond locks, Python’s threading module provides higher-level synchronization primitives for coordinating thread behavior. Event signals between threads, Barrier makes a group of threads wait for each other, and Queue (from the queue module) is the preferred way to pass data between threads safely.
# threading_event.py
import threading
import time
import queue
# Event: one thread signals another
print("=== Event: start signal ===")
start_event = threading.Event()
results = []
def waiter(name: str, event: threading.Event) -> None:
print(f" [{name}] waiting for signal...")
event.wait() # blocks until event.set() is called
print(f" [{name}] received signal, working...")
time.sleep(0.1)
results.append(name)
workers = [threading.Thread(target=waiter, args=(f"Worker-{i}", start_event)) for i in range(3)]
for w in workers:
w.start()
time.sleep(0.3)
print(" [main] sending start signal")
start_event.set() # wake up all waiting threads
for w in workers:
w.join()
print(f" Results: {results}")
# Queue: producer-consumer pattern
print("\n=== Queue: producer-consumer ===")
work_queue = queue.Queue(maxsize=3)
def producer(q: queue.Queue, items: list) -> None:
for item in items:
q.put(item) # blocks if queue is full
print(f" [producer] added: {item}")
q.put(None) # sentinel to signal done
def consumer(q: queue.Queue) -> None:
while True:
item = q.get() # blocks until item available
if item is None:
q.task_done()
break
print(f" [consumer] processing: {item}")
time.sleep(0.1)
q.task_done()
tasks = ["task-A", "task-B", "task-C", "task-D"]
prod = threading.Thread(target=producer, args=(work_queue, tasks))
cons = threading.Thread(target=consumer, args=(work_queue,))
prod.start()
cons.start()
prod.join()
cons.join()
work_queue.join()
print(" All tasks processed")
Output:
=== Event: start signal ===
[Worker-0] waiting for signal...
[Worker-1] waiting for signal...
[Worker-2] waiting for signal...
[main] sending start signal
[Worker-0] received signal, working...
[Worker-1] received signal, working...
[Worker-2] received signal, working...
Results: ['Worker-0', 'Worker-1', 'Worker-2']
=== Queue: producer-consumer ===
[producer] added: task-A
[producer] added: task-B
[consumer] processing: task-A
[producer] added: task-C
[consumer] processing: task-B
...
All tasks processed
queue.Queue is thread-safe by design — its internal lock means you never need a separate Lock to protect it. The sentinel value (None) pattern is the standard way to signal a consumer to stop: put a None in the queue for each consumer thread. Always call q.task_done() after processing each item if you’re using q.join() to wait for completion.
Real-Life Example: Concurrent URL Checker
A thread pool-based URL health checker that checks multiple URLs concurrently and reports their status, using ThreadPoolExecutor for clean thread management.

# url_checker.py
import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
def check_url(url: str, timeout: int = 5) -> dict:
"""Check if a URL is reachable. Returns status dict."""
start = time.perf_counter()
try:
response = urlopen(url, timeout=timeout)
elapsed = time.perf_counter() - start
return {
"url": url,
"status": response.status,
"ok": response.status < 400,
"elapsed_ms": round(elapsed * 1000),
"error": None
}
except HTTPError as e:
return {"url": url, "status": e.code, "ok": False,
"elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e)}
except URLError as e:
return {"url": url, "status": 0, "ok": False,
"elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e.reason)}
except Exception as e:
return {"url": url, "status": 0, "ok": False,
"elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e)}
def check_urls_concurrent(urls: list[str], max_workers: int = 5) -> list[dict]:
"""Check multiple URLs concurrently using a thread pool."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_url = {executor.submit(check_url, url): url for url in urls}
# Collect results as they complete
for future in as_completed(future_to_url):
result = future.result()
results.append(result)
status_icon = "OK " if result["ok"] else "FAIL"
print(f" [{status_icon}] {result['url'][:50]:<50} "
f"{result['status']:>3} {result['elapsed_ms']:>5}ms")
return results
# Check a set of real public URLs
urls_to_check = [
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/delay/1",
"https://jsonplaceholder.typicode.com/posts/1",
"https://quotes.toscrape.com/",
]
print(f"Checking {len(urls_to_check)} URLs with 5 threads...\n")
print(f" {'STATUS':<6} {'URL':<50} {'CODE':>4} {'TIME':>6}")
print(" " + "-" * 68)
start = time.perf_counter()
results = check_urls_concurrent(urls_to_check)
total_time = time.perf_counter() - start
print(f"\nCompleted in {total_time:.1f}s")
ok_count = sum(1 for r in results if r["ok"])
print(f" {ok_count}/{len(results)} URLs healthy")
avg_ms = sum(r["elapsed_ms"] for r in results) / len(results)
print(f" Avg response time: {avg_ms:.0f}ms")
Output:
Checking 5 URLs with 5 threads...
STATUS URL CODE TIME
--------------------------------------------------------------------
[OK ] https://httpbin.org/status/200 200 312ms
[FAIL] https://httpbin.org/status/404 404 298ms
[OK ] https://jsonplaceholder.typicode.com/posts/1 200 189ms
[OK ] https://quotes.toscrape.com/ 200 245ms
[OK ] https://httpbin.org/delay/1 200 1051ms
Completed in 1.1s
4/5 URLs healthy
Avg response time: 419ms
ThreadPoolExecutor is the modern way to manage a pool of worker threads — it handles thread creation, reuse, and cleanup automatically. as_completed() returns futures as they finish (not in submission order), so faster URLs report first. Extend this checker with retry logic for timeouts, a CSV report of results, or a –watch mode that re-checks URLs every 30 seconds using a daemon thread.
Frequently Asked Questions
If Python has the GIL, why use threading at all?
The GIL prevents multiple threads from running Python bytecode simultaneously on different CPU cores — this affects CPU-bound work. But for I/O-bound work, threads spend most of their time waiting for network or disk responses, not executing Python code. Python releases the GIL during I/O system calls, so multiple threads can have their I/O in-flight simultaneously. The result: for I/O-bound workloads (HTTP requests, file reads, database queries), threading provides genuine concurrency and real speedups.
Should I use Thread directly or ThreadPoolExecutor?
Use ThreadPoolExecutor from concurrent.futures for most cases — it handles thread lifecycle, limits concurrency via the pool size, and makes it easy to collect results with Future.result(). Use threading.Thread directly when you need fine-grained control over thread behavior (daemon settings, join() ordering, custom run() methods), or when you’re implementing a specific synchronization pattern like producer-consumer with Queue.
How do I avoid deadlocks?
A deadlock occurs when two threads each hold a lock the other needs. The main prevention rules: always acquire locks in the same order across all threads, use with lock: (never acquire() without release()), prefer queue.Queue for inter-thread communication over shared variables with locks, and use lock.acquire(timeout=5) to detect potential deadlocks instead of blocking forever. If you suspect a deadlock, print threading.enumerate() to see which threads are alive and stuck.
What is threading.local() used for?
threading.local() creates thread-local storage — each thread gets its own independent copy of the data. This is how Flask stores the current request object (each request handler thread gets its own request context) and how SQLAlchemy session scoping works. Use it when you need per-thread state that shouldn’t be shared, like a database connection or a user session object. Access it via attribute assignment: local_data = threading.local(); local_data.user_id = 42.
How do I stop a thread cleanly?
Python doesn’t provide a way to forcibly kill a thread — you must signal it to stop itself. The standard pattern is a shared stop flag: stop_event = threading.Event(). The thread checks if stop_event.is_set(): break in its loop, and the main thread calls stop_event.set() to signal it to stop. For blocking operations (like queue.get()), use queue.get(timeout=1) with a loop so the thread can periodically check the stop flag. Never use thread._stop() — it’s a private API and can leave shared resources in a corrupted state.
Conclusion
Python’s threading module is the right tool for I/O-bound concurrent work. The core tools are Thread for creating threads, Lock for protecting shared state, Event for signaling between threads, Queue for safe inter-thread data transfer, and ThreadPoolExecutor for managed thread pools. The URL checker above is a template for any concurrent I/O task — adapt it for batch API calls, file processing pipelines, or database operations.
For CPU-bound parallelism, switch to multiprocessing.Pool or ProcessPoolExecutor. For very high concurrency I/O (hundreds of simultaneous connections), consider asyncio with aiohttp.
Official documentation: https://docs.python.org/3/library/threading.html