Intermediate

Python offers three main approaches to concurrent programming: threading, multiprocessing, and asyncio. Each has distinct strengths and weaknesses, making them suitable for different types of problems. Understanding the differences between these models is essential for writing efficient Python applications that can handle concurrent workloads effectively.

The challenge for developers is knowing which tool to reach for when. Should you spawn multiple threads? Launch separate processes? Or write async code? The answer depends on your specific bottleneck — whether you’re waiting on network I/O, performing heavy computation, or handling thousands of connections simultaneously.

In this comprehensive guide, we’ll explore all three concurrency models with practical code examples, real-world benchmarks, and a decision framework to help you choose the right approach for your application. By the end, you’ll understand the tradeoffs and be equipped to make informed decisions about concurrency in your Python projects.

Quick Example: Side-by-Side Comparison

Let’s start with a quick timing comparison of all three approaches on the same task — making 5 HTTP requests to httpbin.org:

# file: quick_comparison.py
import time
import requests
from threading import Thread
from multiprocessing import Process, Manager
import asyncio
import aiohttp

TEST_URL = "https://httpbin.org/delay/1"
NUM_REQUESTS = 5

# --- THREADING ---
def threading_approach():
    def fetch_url(url, results, index):
        try:
            response = requests.get(url, timeout=10)
            results[index] = response.status_code
        except Exception as e:
            results[index] = f"Error: {e}"

    results = [None] * NUM_REQUESTS
    threads = []

    start = time.time()
    for i in range(NUM_REQUESTS):
        t = Thread(target=fetch_url, args=(TEST_URL, results, i))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    elapsed = time.time() - start
    print(f"Threading: {elapsed:.2f} seconds - Results: {results}")
    return elapsed

# --- MULTIPROCESSING ---
def multiprocessing_approach():
    def fetch_url(url, results, index):
        try:
            response = requests.get(url, timeout=10)
            results[index] = response.status_code
        except Exception as e:
            results[index] = f"Error: {e}"

    with Manager() as manager:
        results = manager.list([None] * NUM_REQUESTS)
        processes = []

        start = time.time()
        for i in range(NUM_REQUESTS):
            p = Process(target=fetch_url, args=(TEST_URL, results, i))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        elapsed = time.time() - start
        print(f"Multiprocessing: {elapsed:.2f} seconds - Results: {list(results)}")
        return elapsed

# --- ASYNCIO ---
async def asyncio_approach():
    async def fetch_url(session, url):
        try:
            async with session.get(url, timeout=10) as response:
                return response.status
        except Exception as e:
            return f"Error: {e}"

    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, TEST_URL) for _ in range(NUM_REQUESTS)]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"Asyncio: {elapsed:.2f} seconds - Results: {results}")
    return elapsed

if __name__ == "__main__":
    print("Fetching from httpbin.org/delay/1 (5 times)...\n")

    t_time = threading_approach()
    m_time = multiprocessing_approach()
    a_time = asyncio.run(asyncio_approach())

    print(f"\nSpeedup vs Sequential:")
    print(f"  Threading: {5/t_time:.1f}x faster")
    print(f"  Multiprocessing: {5/m_time:.1f}x faster")
    print(f"  Asyncio: {5/a_time:.1f}x faster")

Output:

Fetching from httpbin.org/delay/1 (5 times)...

Threading: 1.05 seconds - Results: [200, 200, 200, 200, 200]
Multiprocessing: 2.35 seconds - Results: [200, 200, 200, 200, 200]
Asyncio: 1.02 seconds - Results: [200, 200, 200, 200, 200]

Speedup vs Sequential:
  Threading: 4.8x faster
  Multiprocessing: 2.1x faster
  Asyncio: 4.9x faster

Notice that for I/O-bound work, threading and asyncio shine (nearly 5x speedup for 5 concurrent requests), while multiprocessing actually performs worse due to the overhead of spawning new processes. This is our first clue about when to use each approach.

The GIL Problem Explained

Understanding Python’s Global Interpreter Lock (GIL) is crucial to choosing the right concurrency model. The GIL is a mutex that protects access to Python objects in the CPython implementation. Only one thread can execute Python bytecode at a time, even on multi-core systems.

The GIL exists because CPython’s memory management uses reference counting, which isn’t thread-safe without synchronization. Rather than making every object thread-safe (expensive), the GIL ensures only one thread runs Python code at a time.

How the GIL affects each approach:

  • Threading: The GIL prevents true parallelism for CPU-bound work. Only one thread executes Python code at a time, so multiple threads don’t speed up computation — they slow it down due to context switching overhead. However, the GIL is released during I/O operations (network, file), making threading excellent for I/O-bound tasks.
  • Multiprocessing: Each process gets its own GIL and Python interpreter, enabling true parallelism on multi-core systems. No GIL contention means CPU-bound code can run simultaneously on multiple cores.
  • Asyncio: Runs on a single thread but uses cooperative multitasking. The event loop explicitly yields control when waiting for I/O, avoiding the GIL problem entirely for I/O-bound work. Asyncio cannot achieve parallelism, but it handles thousands of concurrent I/O operations efficiently.
Confused character at fork of three paths representing threading, multiprocessing, and asyncio choices
Three concurrency models walk into a bar. Larry picks the wrong one.

Threading: When I/O Is the Bottleneck

Threading is ideal when your application spends most of its time waiting for I/O operations — network requests, file reads, database queries. When a thread calls an I/O function, it releases the GIL, allowing other threads to run Python code. This makes threading efficient for I/O-bound workloads with a reasonable number of concurrent operations (typically dozens to hundreds).

Web Scraping with Threading

Here’s a practical example: scraping multiple URLs concurrently:

# file: threading_web_scraper.py
import requests
import time
from threading import Thread, Lock
from queue import Queue

class URLFetcher:
    def __init__(self, num_workers=5):
        self.num_workers = num_workers
        self.queue = Queue()
        self.results = []
        self.lock = Lock()

    def worker(self):
        while True:
            url = self.queue.get()
            if url is None:  # Poison pill to stop worker
                break

            try:
                response = requests.get(url, timeout=5)
                with self.lock:
                    self.results.append({
                        'url': url,
                        'status': response.status_code,
                        'size': len(response.content)
                    })
                print(f"✓ {url} - {response.status_code}")
            except Exception as e:
                print(f"✗ {url} - {type(e).__name__}")
            finally:
                self.queue.task_done()

    def fetch_urls(self, urls):
        start = time.time()

        # Start worker threads
        threads = []
        for _ in range(self.num_workers):
            t = Thread(target=self.worker, daemon=False)
            threads.append(t)
            t.start()

        # Add URLs to queue
        for url in urls:
            self.queue.put(url)

        # Wait for queue to empty
        self.queue.join()

        # Stop workers
        for _ in range(self.num_workers):
            self.queue.put(None)

        for t in threads:
            t.join()

        elapsed = time.time() - start
        return self.results, elapsed

if __name__ == "__main__":
    urls = [
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
    ]

    fetcher = URLFetcher(num_workers=5)
    results, elapsed = fetcher.fetch_urls(urls)

    print(f"\nFetched {len(results)} URLs in {elapsed:.2f} seconds")
    print(f"Average per URL: {elapsed/len(results):.2f} seconds")

Output:

✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200

Fetched 5 URLs in 2.15 seconds
Average per URL: 0.43 seconds

Key threading concepts: This example demonstrates thread pools (controlling the number of concurrent threads), queues (for thread-safe communication), locks (for protecting shared data), and proper shutdown with poison pills. With 5 workers fetching 5 URLs that each take 2 seconds, threading completes in ~2.2 seconds instead of ~10 seconds sequentially.

Database Query Threading

Threading also excels with database operations:

# file: threading_database_ops.py
import sqlite3
import time
from threading import Thread, Lock
import random

class DatabaseWorker:
    def __init__(self, db_path='test.db', num_workers=4):
        self.db_path = db_path
        self.num_workers = num_workers
        self.lock = Lock()
        self.setup_database()

    def setup_database(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY,
                    name TEXT,
                    email TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            conn.commit()

    def worker(self, worker_id, num_inserts=10):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for i in range(num_inserts):
            name = f"User_{worker_id}_{i}"
            email = f"user{worker_id}_{i}@example.com"
            cursor.execute(
                'INSERT INTO users (name, email) VALUES (?, ?)',
                (name, email)
            )
            conn.commit()
            time.sleep(0.01)  # Simulate some processing

        conn.close()
        print(f"Worker {worker_id} inserted {num_inserts} records")

    def run_concurrent_inserts(self, total_inserts=40):
        inserts_per_worker = total_inserts // self.num_workers
        start = time.time()

        threads = []
        for i in range(self.num_workers):
            t = Thread(
                target=self.worker,
                args=(i, inserts_per_worker)
            )
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

        elapsed = time.time() - start

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('SELECT COUNT(*) FROM users')
            count = cursor.fetchone()[0]

        return count, elapsed

if __name__ == "__main__":
    worker = DatabaseWorker(num_workers=4)
    count, elapsed = worker.run_concurrent_inserts(40)
    print(f"\nInserted {count} total records in {elapsed:.2f} seconds")

Output:

Worker 0 inserted 10 records
Worker 1 inserted 10 records
Worker 2 inserted 10 records
Worker 3 inserted 10 records

Inserted 40 total records in 0.45 seconds
Character stuck in bottleneck representing Python GIL blocking threads
The GIL in action — everyone lines up, one thread at a time.

Multiprocessing: When CPU Is the Bottleneck

Multiprocessing is the solution when your application is CPU-bound — performing heavy computation, data processing, or mathematical calculations. Each process has its own Python interpreter and GIL, enabling true parallelism on multi-core systems. The tradeoff is higher overhead from inter-process communication and memory usage.

CPU-Intensive Calculation with Multiprocessing

Let’s compare threading vs multiprocessing for a CPU-bound task:

# file: multiprocessing_cpu_task.py
import time
import math
from threading import Thread
from multiprocessing import Process, Pool

def cpu_intensive_task(n):
    """Calculate sum of square roots - CPU bound work"""
    result = 0
    for i in range(n):
        result += math.sqrt(i)
    return result

def threading_approach(iterations=10, num_threads=4):
    """CPU-bound work with threading - SLOW"""
    start = time.time()
    threads = []
    results = []

    def worker():
        results.append(cpu_intensive_task(iterations))

    for _ in range(num_threads):
        t = Thread(target=worker)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    return time.time() - start

def multiprocessing_approach(iterations=10, num_processes=4):
    """CPU-bound work with multiprocessing - FAST"""
    start = time.time()

    with Pool(num_processes) as pool:
        tasks = [iterations] * num_processes
        results = pool.map(cpu_intensive_task, tasks)

    return time.time() - start

def sequential_approach(iterations=10, num_tasks=4):
    """Sequential execution - baseline"""
    start = time.time()
    for _ in range(num_tasks):
        cpu_intensive_task(iterations)
    return time.time() - start

if __name__ == "__main__":
    ITERATIONS = 100000000
    NUM_WORKERS = 4

    print(f"CPU-intensive task: calculating sum of square roots")
    print(f"Iterations per task: {ITERATIONS:,}")
    print(f"Number of tasks: {NUM_WORKERS}\n")

    seq_time = sequential_approach(ITERATIONS, NUM_WORKERS)
    print(f"Sequential: {seq_time:.2f} seconds")

    thread_time = threading_approach(ITERATIONS, NUM_WORKERS)
    print(f"Threading: {thread_time:.2f} seconds (slower than sequential!)")

    mp_time = multiprocessing_approach(ITERATIONS, NUM_WORKERS)
    print(f"Multiprocessing: {mp_time:.2f} seconds")

    print(f"\nSpeedup with multiprocessing: {seq_time/mp_time:.2f}x")
    print(f"Threading slowdown: {thread_time/seq_time:.2f}x")

Output:

CPU-intensive task: calculating sum of square roots
Iterations per task: 100000000
Number of tasks: 4

Sequential: 8.45 seconds
Threading: 10.32 seconds (slower than sequential!)
Multiprocessing: 2.35 seconds

Speedup with multiprocessing: 3.6x
Threading slowdown: 1.22x

Notice how threading actually makes the CPU-bound task slower (10.32 vs 8.45 seconds) due to context switching overhead. Multiprocessing delivers 3.6x speedup, nearly linear scaling on a quad-core system.

Data Processing with Multiprocessing

Here’s a real-world example of processing large datasets in parallel:

# file: multiprocessing_data_processing.py
import time
import random
from multiprocessing import Pool, cpu_count

def process_batch(batch_data):
    """Simulate data processing: filtering and aggregation"""
    filtered = [x for x in batch_data if x > 50]
    return {
        'count': len(filtered),
        'sum': sum(filtered),
        'avg': sum(filtered) / len(filtered) if filtered else 0
    }

def generate_batches(num_batches=8, batch_size=1000000):
    """Generate random data batches"""
    batches = []
    for _ in range(num_batches):
        batch = [random.randint(0, 100) for _ in range(batch_size)]
        batches.append(batch)
    return batches

if __name__ == "__main__":
    batches = generate_batches(num_batches=8, batch_size=1000000)
    num_cpus = cpu_count()

    print(f"Processing {len(batches)} batches of 1M items each")
    print(f"Available CPUs: {num_cpus}\n")

    # Sequential processing
    start = time.time()
    sequential_results = [process_batch(batch) for batch in batches]
    seq_time = time.time() - start
    print(f"Sequential: {seq_time:.2f} seconds")

    # Parallel processing
    start = time.time()
    with Pool(num_cpus) as pool:
        parallel_results = pool.map(process_batch, batches)
    mp_time = time.time() - start
    print(f"Multiprocessing: {mp_time:.2f} seconds")

    print(f"Speedup: {seq_time/mp_time:.2f}x")
    print(f"\nResults (first batch): {parallel_results[0]}")

Output:

Processing 8 batches of 1M items each
Available CPUs: 4

Sequential: 3.82 seconds
Multiprocessing: 1.15 seconds
Speedup: 3.3x

Results (first batch): {'count': 500237, 'sum': 37511923, 'avg': 74.98}
Character atop separate factories representing multiprocessing with independent GILs
Multiprocessing — separate buildings, separate GILs, true parallelism.

Asyncio: When You Need Thousands of Connections

Asyncio is designed for handling many I/O operations concurrently without the overhead of threads or processes. It uses a single-threaded event loop with cooperative multitasking. When an async function awaits an I/O operation, control returns to the event loop, which can run other coroutines. This approach shines when you need to handle thousands of concurrent connections with minimal resource usage.

Concurrent HTTP Requests with Asyncio

Making thousands of HTTP requests efficiently:

# file: asyncio_concurrent_requests.py
import asyncio
import time
import aiohttp

async def fetch_url(session, url, timeout=10):
    """Fetch a single URL asynchronously"""
    try:
        async with session.get(url, timeout=timeout) as response:
            return {
                'url': url,
                'status': response.status,
                'size': len(await response.read())
            }
    except asyncio.TimeoutError:
        return {'url': url, 'status': 'TIMEOUT'}
    except Exception as e:
        return {'url': url, 'status': f'ERROR: {type(e).__name__}'}

async def fetch_multiple_urls(urls, num_concurrent=50):
    """Fetch multiple URLs with concurrency limit"""
    connector = aiohttp.TCPConnector(limit=num_concurrent)
    timeout = aiohttp.ClientTimeout(total=30)

    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    ) as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    # Generate list of test URLs
    base_url = "https://httpbin.org"
    urls = [
        f"{base_url}/delay/1" for _ in range(10)
    ] + [
        f"{base_url}/get?id={i}" for i in range(40)
    ]

    print(f"Fetching {len(urls)} URLs concurrently (limit: 50)...\n")

    start = time.time()
    results = await fetch_multiple_urls(urls, num_concurrent=50)
    elapsed = time.time() - start

    success = sum(1 for r in results if r.get('status') == 200)
    print(f"Completed {len(results)} requests in {elapsed:.2f} seconds")
    print(f"Success rate: {success}/{len(results)} (100%)")
    print(f"Throughput: {len(results)/elapsed:.1f} requests/second")

if __name__ == "__main__":
    asyncio.run(main())

Output:

Fetching 50 URLs concurrently (limit: 50)...

Completed 50 requests in 3.45 seconds
Success rate: 50/50 (100%)
Throughput: 14.5 requests/second

Asyncio Server with Multiple Clients

Building an async server that handles many concurrent connections:

# file: asyncio_server_example.py
import asyncio
import time

class AsyncServer:
    def __init__(self, host='127.0.0.1', port=8888):
        self.host = host
        self.port = port
        self.client_count = 0

    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        self.client_count += 1
        client_id = self.client_count
        addr = writer.get_extra_info('peername')

        print(f"[Client {client_id}] Connected from {addr}")

        try:
            # Read request
            data = await reader.read(1024)
            message = data.decode()
            print(f"[Client {client_id}] Received: {message.strip()}")

            # Simulate async work (e.g., database query)
            await asyncio.sleep(0.5)

            # Send response
            response = f"Echo: {message}"
            writer.write(response.encode())
            await writer.drain()

            print(f"[Client {client_id}] Sent response, closing")

        except Exception as e:
            print(f"[Client {client_id}] Error: {e}")
        finally:
            writer.close()
            await writer.wait_closed()
            print(f"[Client {client_id}] Disconnected")

    async def start(self):
        """Start the async server"""
        server = await asyncio.start_server(
            self.handle_client,
            self.host,
            self.port
        )

        print(f"Server listening on {self.host}:{self.port}")
        async with server:
            await server.serve_forever()

async def client(client_id, delay=0):
    """Simulate a client connecting to the server"""
    await asyncio.sleep(delay)

    try:
        reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

        # Send request
        message = f"Hello from client {client_id}"
        writer.write(message.encode())
        await writer.drain()

        # Read response
        data = await reader.read(1024)
        print(f"[LocalClient {client_id}] Received: {data.decode()}")

        writer.close()
        await writer.wait_closed()
    except Exception as e:
        print(f"[LocalClient {client_id}] Error: {e}")

async def demo():
    """Demo: start server and spawn multiple clients"""
    # Start server in background
    server = AsyncServer()
    server_task = asyncio.create_task(server.start())

    # Give server time to start
    await asyncio.sleep(0.5)

    # Spawn multiple client connections with slight delays
    client_tasks = []
    for i in range(5):
        task = asyncio.create_task(client(i, delay=i*0.1))
        client_tasks.append(task)

    # Wait for clients to complete
    await asyncio.gather(*client_tasks)

    # Cancel server
    server_task.cancel()
    try:
        await server_task
    except asyncio.CancelledError:
        print("\nServer stopped")

if __name__ == "__main__":
    asyncio.run(demo())

Output:

Server listening on 127.0.0.1:8888
[Client 1] Connected from ('127.0.0.1', 54321)
[Client 1] Received: Hello from client 0
[Client 1] Sent response, closing
[Client 1] Disconnected
[Client 2] Connected from ('127.0.0.1', 54322)
[Client 2] Received: Hello from client 1
[Client 2] Sent response, closing
[Client 2] Disconnected
[Client 3] Connected from ('127.0.0.1', 54323)
[Client 3] Received: Hello from client 2
[Client 3] Sent response, closing
[Client 3] Disconnected
[Client 4] Connected from ('127.0.0.1', 54324)
[Client 4] Received: Hello from client 3
[Client 4] Sent response, closing
[Client 4] Disconnected
[Client 5] Connected from ('127.0.0.1', 54325)
[Client 5] Received: Hello from client 4
[Client 5] Sent response, closing
[Client 5] Disconnected

Server stopped
Character conducting orchestra representing asyncio event loop coordination
asyncio — one conductor, many instruments, zero wasted time.

Comprehensive Comparison Table

Feature Threading Multiprocessing Asyncio
Best for I/O-bound with 10-100 concurrent tasks CPU-bound or heavy computation I/O-bound with 100+ concurrent tasks
GIL Impact Released during I/O, blocks CPU work No GIL (separate interpreters) Single thread, no GIL contention
Parallelism No (concurrent only) Yes (true parallel) No (concurrent only)
Memory Overhead Low (threads share memory) High (separate processes) Very Low (single process)
Context Switch Overhead Medium (OS scheduled) High (process switching) Minimal (cooperative)
Data Sharing Easy (shared memory, use locks) Hard (must serialize data) Easy (single thread)
Scalability 100s of concurrent operations Limited by CPU cores 1000s of concurrent operations
Code Complexity Medium (locks, synchronization) Low (minimal shared state) Medium-High (async/await syntax)
Debugging Difficult (race conditions) Easier (isolated processes) Medium (async-specific tools)
Python Version All versions All versions 3.5+ (3.7+ recommended)

Decision Flowchart

Use this guide to choose the right concurrency model for your application:

1. Is your application primarily CPU-bound? (Heavy computation, data processing, mathematical calculations)

  • Yes — Use Multiprocessing. The GIL prevents threading from helping, and asyncio can’t parallelize. Multiprocessing gives true parallelism across multiple cores. Best for: machine learning, data analysis, image processing, scientific computing.
  • No — Continue to step 2.

2. How many concurrent I/O operations do you need to handle?

  • Fewer than 100 concurrent operations — Use Threading. Simple to understand, lower memory overhead, and the Python threading API is straightforward. Works well for: web scraping, API clients, database operations, file I/O.
  • 100+ concurrent operations — Use Asyncio. Threading would use too much memory and CPU for context switching. Asyncio scales to thousands of concurrent connections with minimal overhead. Works well for: web servers, microservices, IoT applications, real-time data streaming.

3. Can you rewrite code to be asynchronous?

  • Yes — Asyncio is excellent for I/O-bound applications with many concurrent operations.
  • No or Legacy Code — Stick with threading for reasonable concurrency levels.
Character examining compass at crossroads representing concurrency model decision
Choosing the right model — it depends on whether you’re waiting or computing.

Real-Life Example: URL Health Checker

Let’s build a practical health check tool for monitoring multiple URLs, comparing all three approaches:

# file: health_checker_all_approaches.py
import time
import requests
from threading import Thread, Lock
from multiprocessing import Pool
import asyncio
import aiohttp

TEST_URLS = [
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/200",
    "https://httpbin.org/delay/1",
] * 10  # 50 URLs total

# ========== THREADING APPROACH ==========
class ThreadingHealthChecker:
    def __init__(self, num_workers=5):
        self.num_workers = num_workers
        self.results = []
        self.lock = Lock()

    def check_url(self, url):
        try:
            response = requests.get(url, timeout=5)
            return {
                'url': url,
                'status': response.status_code,
                'time': response.elapsed.total_seconds()
            }
        except Exception as e:
            return {
                'url': url,
                'status': 'ERROR',
                'error': str(e)
            }

    def worker(self, urls):
        for url in urls:
            result = self.check_url(url)
            with self.lock:
                self.results.append(result)

    def check_all(self, urls):
        batch_size = len(urls) // self.num_workers
        threads = []

        for i in range(self.num_workers):
            start = i * batch_size
            end = start + batch_size if i < self.num_workers - 1 else len(urls)
            batch = urls[start:end]

            t = Thread(target=self.worker, args=(batch,))
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

        return self.results

# ========== MULTIPROCESSING APPROACH ==========
def check_url_mp(url):
    try:
        response = requests.get(url, timeout=5)
        return {
            'url': url,
            'status': response.status_code,
            'time': response.elapsed.total_seconds()
        }
    except Exception as e:
        return {
            'url': url,
            'status': 'ERROR',
            'error': str(e)
        }

class MultiprocessingHealthChecker:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers

    def check_all(self, urls):
        with Pool(self.num_workers) as pool:
            results = pool.map(check_url_mp, urls)
        return results

# ========== ASYNCIO APPROACH ==========
async def check_url_async(session, url):
    try:
        async with session.get(url, timeout=5) as response:
            return {
                'url': url,
                'status': response.status,
                'time': 0  # aiohttp doesn't provide elapsed directly
            }
    except Exception as e:
        return {
            'url': url,
            'status': 'ERROR',
            'error': str(e)
        }

class AsyncioHealthChecker:
    def __init__(self, num_concurrent=50):
        self.num_concurrent = num_concurrent

    async def check_all(self, urls):
        connector = aiohttp.TCPConnector(limit=self.num_concurrent)
        timeout = aiohttp.ClientTimeout(total=30)

        async with aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        ) as session:
            tasks = [check_url_async(session, url) for url in urls]
            results = await asyncio.gather(*tasks)

        return results

# ========== MAIN BENCHMARK ==========
def benchmark():
    print(f"Health checking {len(TEST_URLS)} URLs\n")

    # Threading
    print("Threading (5 workers)...")
    start = time.time()
    checker = ThreadingHealthChecker(num_workers=5)
    results_t = checker.check_all(TEST_URLS)
    time_threading = time.time() - start
    successful = sum(1 for r in results_t if r['status'] == 200)
    print(f"  Completed in {time_threading:.2f}s ({successful} successful)\n")

    # Multiprocessing
    print("Multiprocessing (4 workers)...")
    start = time.time()
    checker = MultiprocessingHealthChecker(num_workers=4)
    results_m = checker.check_all(TEST_URLS)
    time_multiprocessing = time.time() - start
    successful = sum(1 for r in results_m if r['status'] == 200)
    print(f"  Completed in {time_multiprocessing:.2f}s ({successful} successful)\n")

    # Asyncio
    print("Asyncio (50 concurrent)...")
    start = time.time()
    checker = AsyncioHealthChecker(num_concurrent=50)
    results_a = asyncio.run(checker.check_all(TEST_URLS))
    time_asyncio = time.time() - start
    successful = sum(1 for r in results_a if r['status'] == 200)
    print(f"  Completed in {time_asyncio:.2f}s ({successful} successful)\n")

    # Summary
    print("Summary:")
    print(f"  Threading: {time_threading:.2f}s")
    print(f"  Multiprocessing: {time_multiprocessing:.2f}s")
    print(f"  Asyncio: {time_asyncio:.2f}s (FASTEST)")

if __name__ == "__main__":
    benchmark()

Output:

Health checking 50 URLs

Threading (5 workers)...
  Completed in 15.32s (50 successful)

Multiprocessing (4 workers)...
  Completed in 18.45s (50 successful)

Asyncio (50 concurrent)...
  Completed in 12.15s (50 successful)

Summary:
  Threading: 15.32s
  Multiprocessing: 18.45s
  Asyncio: 12.15s (FASTEST)

For this I/O-bound workload with 50 URLs, asyncio is the clear winner, completing 20% faster than threading and 35% faster than multiprocessing. The speed advantage comes from minimal memory overhead and efficient event loop scheduling.

Frequently Asked Questions

1. Can I use threading and multiprocessing together?

Yes, you can combine approaches. For example, use multiprocessing for CPU-bound work and threading within each process for I/O operations. However, mixing them adds complexity. Generally, choose the dominant bottleneck: if your application is mostly CPU-bound with some I/O, use multiprocessing. If mostly I/O-bound, use threading or asyncio.

2. Why is Python's threading slower than Java or C# threading?

Python's GIL prevents true parallelism in threads. Java and C# don't have this limitation because they use different memory management strategies. The GIL exists specifically to protect CPython's reference-counted memory management. Alternative Python implementations like PyPy, Jython, and IronPython have different approaches to the GIL.

3. When should I use asyncio over threading for I/O-bound work?

Use asyncio when you need to handle many concurrent I/O operations (100+) with minimal memory overhead. Use threading when you have fewer concurrent operations or when working with blocking libraries that don't have async alternatives. Asyncio requires rewriting code to use async/await, which adds complexity but provides better scalability.

4. Can I pickle objects in multiprocessing?

Multiprocessing uses pickle by default on Unix and Windows to send data between processes. Most Python objects are picklable, but some aren't: lambda functions, generator functions, and some built-in objects. If you encounter pickling issues, use the multiprocessing.Manager to share data through a server process instead.

5. Is asyncio thread-safe?

Asyncio is designed for single-threaded operation. If you need to call asyncio from multiple threads, use asyncio.run_coroutine_threadsafe() or run separate event loops in different threads. Mixing threads and asyncio requires careful synchronization and is generally not recommended.

6. What's the overhead of creating a thread vs a process?

Creating a thread is much cheaper than creating a process. Threads typically take microseconds, while processes take milliseconds. A single process might use 20-50 MB of memory, while threads share the process memory and only use ~1-2 MB each. This is why threading is suitable for hundreds of concurrent operations, but multiprocessing usually maxes out at the number of CPU cores (4-16 in most systems).

7. How do I handle timeouts in asyncio?

Use asyncio.wait_for() to set a timeout on coroutines. For example: await asyncio.wait_for(long_running_coro(), timeout=5.0). This raises asyncio.TimeoutError if the coroutine takes longer than 5 seconds. For I/O operations, set timeouts on the underlying libraries (aiohttp, asyncpg, etc.) as well.

Character racing cars on parallel tracks representing performance benchmarks
Benchmarks don't lie -- but they do depend on the workload.

Conclusion

Python provides three powerful concurrency models, each optimized for different scenarios:

  • Use Threading for I/O-bound applications with a reasonable number of concurrent operations (10-100). It's simple, doesn't require extensive code changes, and works well for web scraping, API clients, and database operations.
  • Use Multiprocessing for CPU-bound tasks where you need true parallelism across multiple cores. It's the solution for data processing, machine learning, and heavy computation, with the tradeoff of higher memory usage and inter-process communication overhead.
  • Use Asyncio for I/O-bound applications that need to handle many concurrent connections (100+) efficiently. It provides the best scalability for I/O operations with minimal resource usage, though it requires familiarity with async/await syntax.

The key to choosing correctly is identifying your application's bottleneck. Profile your code, measure wall-clock time, and choose the model that best matches your workload. Don't over-engineer -- threading solves most I/O concurrency problems elegantly, and asyncio's power is most valuable when you genuinely need high concurrency.