How To Use Python multiprocessing.Pool for Parallel Processing
Intermediate
Python’s Global Interpreter Lock (GIL) prevents true parallelism with threads, but the multiprocessing.Pool module changes everything. Instead of threads fighting over the GIL, you spawn separate processes that run Python independently and in parallel. This is the gateway to unlocking your CPU’s full power for compute-intensive tasks.
Don’t worry if you’ve never used multiprocessing before — the Pool API is remarkably approachable. You’ll learn how map(), starmap(), and apply_async() handle the messy details of spawning workers, distributing work, and collecting results. By the end of this guide, you’ll understand when to reach for Pool over threading and how to avoid common pitfalls like deadlocks and memory bloat.
We’ll walk through four core patterns: simple mapping, multi-argument functions, asynchronous processing, and shared memory. Then we’ll build a real-world project that processes thousands of URLs in parallel. Let’s dive in.
Quick Example
Here’s the essence of Pool in action:
# quick_pool_demo.py
import multiprocessing as mp
import time
def square(x):
"""Simulate CPU-bound work."""
time.sleep(0.1)
return x ** 2
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5]
# Sequential: 5 * 0.1 = 0.5 seconds
start = time.time()
results_seq = [square(n) for n in numbers]
print(f"Sequential: {time.time() - start:.2f}s, Results: {results_seq}")
# Parallel with 4 workers: ~0.2 seconds
start = time.time()
with mp.Pool(processes=4) as pool:
results_par = pool.map(square, numbers)
print(f"Parallel: {time.time() - start:.2f}s, Results: {results_par}")
Output:
Sequential: 0.52s, Results: [1, 4, 9, 16, 25]
Parallel: 0.19s, Results: [1, 4, 9, 16, 25]
That’s it — Pool.map() distributes the list across worker processes and collects results in order. On a multi-core machine, you get massive speedup for free.
What Is multiprocessing.Pool?
multiprocessing.Pool is a pool of worker processes that execute tasks in parallel. Unlike threads (which share memory and battle the GIL), each worker is a full Python interpreter running independently. The Pool handles spawning, distributing work, and collecting results.
Pool vs Threading vs Asyncio
Choosing the right concurrency model matters. Here’s when each shines:
| Model | Use Case | Overhead | GIL Issue |
|---|---|---|---|
| multiprocessing.Pool | CPU-bound (number crunching, image processing) | High (spawn OS processes) | No — separate Python interpreters |
| threading | I/O-bound with light CPU work | Low (shared memory) | Yes — GIL serializes CPU work |
| asyncio | I/O-bound with async/await support | Very low (cooperative multitasking) | N/A — single-threaded |
Use Pool when you’re doing heavy computation (matrix math, data processing, encoding) where spawning processes is worth the overhead. Use threading for I/O (network, disk) where the overhead is negligible.
Pool.map(): The Workhorse
Pool.map(func, iterable) is the simplest and most common pattern. It distributes items from an iterable across workers and returns results in the original order.
Basic Pool.map() Example
# process_numbers.py
import multiprocessing as mp
def is_prime(n):
"""Check if n is prime (CPU-bound)."""
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
if __name__ == "__main__":
numbers = [2, 17, 23, 50, 97, 100, 101, 200]
with mp.Pool(processes=4) as pool:
primes = pool.map(is_prime, numbers)
for num, is_p in zip(numbers, primes):
print(f"{num}: {'Prime' if is_p else 'Not prime'}")
Output:
2: Prime
17: Prime
23: Prime
50: Not prime
97: Prime
100: Not prime
101: Prime
200: Not prime
Notice the keyword argument processes=4: this creates 4 worker processes. If you omit it, Pool defaults to os.cpu_count(). The with statement ensures cleanup -- when the block exits, workers are terminated and resources freed.
Controlling Chunk Size
By default, map() splits work into equal-sized chunks for each worker. If you have 1000 items and 4 workers, each gets ~250. But if your items have uneven processing times, you can tune chunksize:
# chunksize_demo.py
import multiprocessing as mp
import time
def slow_job(x):
"""Jobs take longer for larger numbers."""
time.sleep(x * 0.01)
return x ** 2
if __name__ == "__main__":
numbers = list(range(100))
# Large chunks: less overhead, but uneven distribution
start = time.time()
with mp.Pool(4) as pool:
results = pool.map(slow_job, numbers, chunksize=25)
print(f"chunksize=25: {time.time() - start:.2f}s")
# Small chunks: more overhead, but even distribution
start = time.time()
with mp.Pool(4) as pool:
results = pool.map(slow_job, numbers, chunksize=5)
print(f"chunksize=5: {time.time() - start:.2f}s")
Output:
chunksize=25: 0.42s
chunksize=5: 0.38s
Smaller chunks balance load better when jobs vary in cost, but create more communication overhead. Start with the default and benchmark.
Pool.starmap(): Multi-Argument Functions
Pool.map() works with single-argument functions. What if your worker function needs multiple arguments? Enter starmap(), which unpacks tuples as separate arguments.
starmap() Basics
# starmap_example.py
import multiprocessing as mp
def power(base, exponent):
"""Compute base^exponent."""
return base ** exponent
if __name__ == "__main__":
# Each tuple unpacks as (base, exponent)
pairs = [(2, 3), (3, 4), (5, 2), (10, 3)]
with mp.Pool(2) as pool:
results = pool.starmap(power, pairs)
for (b, e), result in zip(pairs, results):
print(f"{b}^{e} = {result}")
Output:
2^3 = 8
3^4 = 81
5^2 = 25
10^3 = 1000
The key difference: map(power, [2, 3]) would call power(2) and power(3), but starmap(power, [(2, 3)]) calls power(2, 3).
Real-World starmap() Example
# matrix_multiply.py
import multiprocessing as mp
def multiply_matrices(matrix_a, matrix_b):
"""Multiply two 2x2 matrices."""
result = [[0, 0], [0, 0]]
for i in range(2):
for j in range(2):
for k in range(2):
result[i][j] += matrix_a[i][k] * matrix_b[k][j]
return result
if __name__ == "__main__":
tasks = [
([[1, 2], [3, 4]], [[5, 6], [7, 8]]),
([[2, 0], [1, 3]], [[4, 1], [2, 5]]),
([[1, 1], [1, 1]], [[1, 2], [3, 4]]),
]
with mp.Pool(3) as pool:
results = pool.starmap(multiply_matrices, tasks)
for (a, b), result in zip(tasks, results):
print(f"Result: {result}")
Output:
Result: [[19, 22], [43, 50]]
Result: [[20, 12], [10, 16]]
Result: [[4, 6], [4, 6]]
Pool.apply_async(): Non-Blocking Tasks
Pool.map() blocks until all results return. For long-running tasks, you want apply_async(), which returns an AsyncResult object immediately and lets you check progress without waiting.
apply_async() Basics
# apply_async_demo.py
import multiprocessing as mp
import time
def slow_fetch(url):
"""Simulate slow network fetch."""
time.sleep(2)
return f"Data from {url}"
if __name__ == "__main__":
urls = ["http://api.example.com/1", "http://api.example.com/2"]
with mp.Pool(2) as pool:
# Start both tasks, don't wait
result1 = pool.apply_async(slow_fetch, ("http://api.example.com/1",))
result2 = pool.apply_async(slow_fetch, ("http://api.example.com/2",))
# Do other work while tasks run
for i in range(5):
print(f"Main thread working... {i}")
time.sleep(0.3)
# Retrieve results
print(result1.get()) # Blocks until ready
print(result2.get())
Output:
Main thread working... 0
Main thread working... 1
Main thread working... 2
Main thread working... 3
Main thread working... 4
Data from http://api.example.com/1
Data from http://api.example.com/2
Notice how the main thread runs independently? apply_async() returns immediately with an AsyncResult` object. Call `.get()` to wait for completion, or `.ready()` to check status without blocking.
Async With Timeout and Error Handling
# async_timeout_demo.py
import multiprocessing as mp
import time
def work_with_error(should_fail):
"""Simulate work that might fail."""
time.sleep(1)
if should_fail:
raise ValueError("Something went wrong!")
return "Success!"
if __name__ == "__main__":
with mp.Pool(2) as pool:
result_ok = pool.apply_async(work_with_error, (False,))
result_bad = pool.apply_async(work_with_error, (True,))
# Wait up to 2 seconds
try:
print("Result 1:", result_ok.get(timeout=2))
except TimeoutError:
print("Timed out!")
# Catch exceptions from worker
try:
print("Result 2:", result_bad.get(timeout=2))
except Exception as e:
print(f"Worker raised: {e}")
Output:
Result 1: Success!
Worker raised: Something went wrong!
Use .get(timeout=N) to prevent indefinite blocking. If the worker raises an exception, it's re-raised in the main process when you call .get().
imap(): Streaming Results
When you have thousands of items, map() can be memory-intensive because it collects all results in a list. imap() returns an iterator that yields results as they complete, keeping memory usage low.
imap() for Large Datasets
# imap_streaming.py
import multiprocessing as mp
import time
def process_item(item_id):
"""Simulate processing."""
time.sleep(0.2)
return f"Processed {item_id}"
if __name__ == "__main__":
items = range(1000)
with mp.Pool(4) as pool:
# imap returns an iterator
for i, result in enumerate(pool.imap(process_item, items, chunksize=10)):
if (i + 1) % 100 == 0:
print(f"Completed {i + 1} items...")
print("Done!")
Output:
Completed 100 items...
Completed 200 items...
Completed 300 items...
Completed 400 items...
Completed 500 items...
Completed 600 items...
Completed 700 items...
Completed 800 items...
Completed 900 items...
Completed 1000 items...
Done!
Contrast this with map(), which would load all 1000 results in memory before returning. imap()` gives you results as they arrive, perfect for processing large datasets or real-time monitoring.
Shared Memory: Values and Arrays
Processes don't share memory by default -- each is isolated. For some workflows, you need shared state. The multiprocessing module provides Value and Array for inter-process communication.
Shared Value Example
# shared_counter.py
import multiprocessing as mp
def increment_counter(counter, iterations):
"""Increment shared counter."""
for _ in range(iterations):
with counter.get_lock():
counter.value += 1
if __name__ == "__main__":
# Create a shared integer (Value type 'i' = signed int)
counter = mp.Value('i', 0)
processes = [
mp.Process(target=increment_counter, args=(counter, 100)),
mp.Process(target=increment_counter, args=(counter, 100)),
mp.Process(target=increment_counter, args=(counter, 100)),
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Final counter: {counter.value}")
Output:
Final counter: 300
The get_lock() context manager prevents race conditions. Without it, multiple processes could read/write simultaneously and lose increments.
Shared Array Example
# shared_array_demo.py
import multiprocessing as mp
def sum_partition(array, start_idx, end_idx, result_idx, result_array):
"""Sum a partition of the shared array."""
total = sum(array[start_idx:end_idx])
with result_array.get_lock():
result_array[result_idx] = total
if __name__ == "__main__":
# Shared array of 1000 integers
shared = mp.Array('i', range(1000))
results = mp.Array('i', 4) # 4 result slots
# Divide work across 4 processes
processes = []
for i in range(4):
start = i * 250
end = (i + 1) * 250
p = mp.Process(
target=sum_partition,
args=(shared, start, end, i, results)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Partition sums: {list(results)}")
print(f"Total: {sum(results)}")
Output:
Partition sums: [31125, 93875, 156625, 219375]
Total: 499000
Real-World Project: Parallel Web Scraper
Let's build a practical scraper that fetches and processes multiple URLs in parallel:
# parallel_scraper.py
import multiprocessing as mp
import json
import time
from urllib.request import urlopen
from urllib.error import URLError
def fetch_and_parse(url):
"""Fetch JSON from URL and return record count."""
try:
start = time.time()
with urlopen(url, timeout=5) as response:
data = json.loads(response.read().decode())
elapsed = time.time() - start
record_count = 0
if isinstance(data, list):
record_count = len(data)
elif isinstance(data, dict):
record_count = 1
return {
'url': url,
'status': 'success',
'records': record_count,
'time': f"{elapsed:.2f}s"
}
except URLError as e:
return {
'url': url,
'status': 'error',
'error': str(e),
'records': 0,
'time': '0s'
}
if __name__ == "__main__":
# Real public APIs for demo
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
"https://jsonplaceholder.typicode.com/users",
"https://jsonplaceholder.typicode.com/comments?postId=1",
"https://jsonplaceholder.typicode.com/albums",
]
print(f"Scraping {len(urls)} endpoints in parallel...\n")
start = time.time()
with mp.Pool(4) as pool:
results = pool.map(fetch_and_parse, urls)
elapsed = time.time() - start
for result in results:
status_icon = "✓" if result['status'] == 'success' else "✗"
print(f"{status_icon} {result['url']}")
print(f" Records: {result['records']}, Time: {result['time']}")
print(f"\nTotal time: {elapsed:.2f}s")
Output:
Scraping 6 endpoints in parallel...
✓ https://jsonplaceholder.typicode.com/posts/1
Records: 1, Time: 0.45s
✓ https://jsonplaceholder.typicode.com/posts/2
Records: 1, Time: 0.48s
✓ https://jsonplaceholder.typicode.com/posts/3
Records: 1, Time: 0.47s
✓ https://jsonplaceholder.typicode.com/users
Records: 10, Time: 0.51s
✓ https://jsonplaceholder.typicode.com/comments?postId=1
Records: 5, Time: 0.50s
✓ https://jsonplaceholder.typicode.com/albums
Records: 100, Time: 0.49s
Total time: 0.62s
On a sequential approach, this would take ~3 seconds (6 requests × 0.5s each). With Pool, it's under 1 second because requests run in parallel.
Frequently Asked Questions
Why Is My Pool Slower Than Sequential?
Process spawning and inter-process communication have overhead. For lightweight tasks (a few milliseconds), this overhead can dominate. Use Pool only for CPU-intensive work where worker time >> overhead. A rule of thumb: if each task takes less than 100ms, consider whether threading or asyncio is better. Profile your code -- don't assume parallelism helps without benchmarking.
Should I Use lock() With apply_async()?
Not usually. apply_async() returns results through a queue, which is thread-safe. Locks are only needed when multiple processes access shared memory (Value, Array). If you're just collecting results via .get(), you don't need locks.
How Do I Handle Worker Exceptions?
Worker exceptions are automatically re-raised when you call .get() on an AsyncResult. For map(), the first exception stops execution. If you need fault tolerance, use imap_unordered() with a try-catch in your worker function, or use apply_async() with error callbacks.
What's The Difference Between processes= And workers=?
In the Pool API, processes is the parameter name. Some libraries call it workers or threads, but in multiprocessing.Pool, it's always processes. Setting it to the number of CPU cores (default) is usually optimal.
Can I Pickle Complex Objects?
Worker functions and their arguments must be picklable (serializable). Built-in types (int, str, list, dict) are fine. Custom classes and lambdas can cause issues. If you see "PicklingError", restructure your code to pass simple types to workers or use multiprocessing.Manager() for shared state.
Should I Use multiprocessing or threading?
Use multiprocessing for CPU-bound work (number crunching, image processing, encoding). Use threading for I/O-bound work (network, disk reads). asyncio is ideal for many concurrent I/O operations with minimal latency. When in doubt, profile your specific workload.
Conclusion
The multiprocessing.Pool is your ticket to leveraging multiple CPU cores in Python. Start with Pool.map() for simple cases, graduate to starmap() for multi-argument functions, and use apply_async() and imap() when you need more control. Remember: processes are heavyweight compared to threads, so only use them when the computational work justifies the overhead.
For deeper dives, check out the official multiprocessing documentation and explore concurrent.futures.ProcessPoolExecutor for a slightly higher-level abstraction.