Advanced
Introduction
Making HTTP requests is a fundamental task in web development and data collection. However, when you need to fetch data from multiple endpoints simultaneously, traditional blocking requests become a bottleneck. If you want to retrieve data from 100 different APIs, making sequential requests could take minutes. This is where concurrency comes in, allowing you to send multiple requests at the same time and dramatically speed up your application.
aiohttp is a Python library that enables asynchronous HTTP client and server functionality. Built on top of asyncio, aiohttp allows you to handle hundreds or thousands of concurrent requests without creating a thread for each one. This makes it ideal for web scraping, working with REST APIs, and building high-performance applications that need to juggle multiple I/O operations.
In this tutorial, we’ll explore how to use aiohttp to make concurrent HTTP requests, manage sessions efficiently, handle errors gracefully, and implement best practices like rate limiting and timeout management. By the end, you’ll understand how to build scalable applications that can fetch data from multiple sources simultaneously with minimal resource usage.
Quick Example: Fetching 5 URLs Concurrently
Let’s start with a simple example that demonstrates the power of concurrent requests. This script fetches data from five endpoints at the same time:
# concurrent_fetch_example.py
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/2',
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} responses")
asyncio.run(main())
Output:
Fetched 5 responses
Notice how all five requests are sent immediately and processed in parallel. With traditional blocking requests, this would take 10 seconds (2 seconds per request). With aiohttp and asyncio, it completes in roughly 2 seconds because the requests happen concurrently.
What is aiohttp?
aiohttp is a Python async HTTP client and server library. It’s built on top of asyncio, Python’s standard asynchronous I/O framework, which means it uses coroutines and event loops instead of threading to handle multiple operations concurrently. This approach is more efficient than threading because it avoids the overhead of context switching and thread management.
Key features of aiohttp include:
- Asynchronous HTTP requests and responses
- Connection pooling and session management
- Automatic handling of redirects and cookies
- Support for streaming and multipart uploads
- Built-in timeout and error handling
- WebSocket support
- Both client and server functionality
The library is essential for building modern Python web applications that need to handle I/O efficiently. Whether you’re scraping data, calling APIs, or building a backend service that needs to communicate with multiple external services, aiohttp provides the tools you need.
Installing aiohttp
aiohttp is available on PyPI and can be installed with pip:
# terminal
pip install aiohttp
You can verify the installation by checking the version:
# test_installation.py
import aiohttp
print(f"aiohttp version: {aiohttp.__version__}")
Output:
aiohttp version: 3.9.1
That’s all you need! aiohttp will automatically install its dependencies, including yarl and multidict.
Making GET Requests with aiohttp
A GET request retrieves data from a server. Here’s how to make a simple GET request with aiohttp:
# simple_get_request.py
import asyncio
import aiohttp
async def get_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as response:
data = await response.json()
print(f"Status: {response.status}")
print(f"Headers: {response.headers}")
print(f"Data: {data}")
asyncio.run(get_example())
Output:
Status: 200
Headers:
Data: {'args': {}, 'headers': {...}, 'url': 'https://httpbin.org/get'}
The key parts of this code are: a ClientSession manages connections and cookies, and the async with statement ensures the connection is properly closed. Notice we use await to wait for the response without blocking other operations.
Making POST Requests
POST requests send data to a server. Here’s how to create a POST request with aiohttp:
# post_request_example.py
import asyncio
import aiohttp
import json
async def post_example():
async with aiohttp.ClientSession() as session:
payload = {'name': 'Alice', 'age': 30}
async with session.post('https://httpbin.org/post', json=payload) as response:
result = await response.json()
print(f"Status: {response.status}")
print(f"Sent data: {result['json']}")
asyncio.run(post_example())
Output:
Status: 200
Sent data: {'name': 'Alice', 'age': 30}
The json parameter automatically serializes your dictionary and sets the correct Content-Type header. You can also use data for form-encoded data or files for multipart uploads.
Making Concurrent Requests with asyncio.gather()
The real power of aiohttp comes from running multiple requests concurrently. The asyncio.gather() function is the key tool for this:
# concurrent_requests.py
import asyncio
import aiohttp
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_multiple(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
]
start = time.time()
results = await fetch_multiple(urls)
elapsed = time.time() - start
print(f"Fetched {len(results)} posts in {elapsed:.2f} seconds")
asyncio.run(main())
Output:
Fetched 3 posts in 0.85 seconds
The pattern here is crucial: create a list of coroutines (tasks), then pass them to asyncio.gather(). This sends all requests immediately and waits for all of them to complete. If you need to handle errors individually, you can pass return_exceptions=True to gather().
Session Management Best Practices
A ClientSession is a container for making HTTP requests and managing connections. It’s important to reuse the same session for multiple requests because it maintains a connection pool, which significantly improves performance. Here’s the right way to manage sessions:
# session_management.py
import asyncio
import aiohttp
async def fetch_with_session(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
# Create session once
async with aiohttp.ClientSession() as session:
urls = [
'https://httpbin.org/get?id=1',
'https://httpbin.org/get?id=2',
'https://httpbin.org/get?id=3',
]
# Reuse the same session for all requests
tasks = [fetch_with_session(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Successfully fetched {len(results)} responses")
asyncio.run(main())
Output:
Successfully fetched 3 responses
Never create a new session for each request. Creating sessions is expensive because they initialize connection pools and other resources. Instead, create one session and reuse it for all your requests in a given scope.
Error Handling and Timeouts
Network requests can fail for various reasons. aiohttp provides built-in mechanisms to handle errors and set timeouts:
# error_handling.py
import asyncio
import aiohttp
async def fetch_with_error_handling(session, url):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
return await response.json()
else:
print(f"Error: {response.status} for {url}")
return None
except asyncio.TimeoutError:
print(f"Timeout fetching {url}")
return None
except aiohttp.ClientError as e:
print(f"Request failed for {url}: {e}")
return None
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/status/500',
'https://httpbin.org/delay/10', # Will timeout
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_error_handling(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = len([r for r in results if r is not None])
print(f"Successfully fetched {successful} out of {len(urls)} responses")
asyncio.run(main())
Output:
Error: 500 for https://httpbin.org/status/500
Timeout fetching https://httpbin.org/delay/10
Successfully fetched 1 out of 3 responses
The ClientTimeout object lets you set different timeout durations. You can set a total timeout, or separate timeouts for connection, reading, and writing. Always wrap requests in try-except blocks to handle network errors gracefully.
Implementing Rate Limiting
When making many concurrent requests, you may need to respect rate limits imposed by the server. Here’s how to implement basic rate limiting with asyncio:
# rate_limiting.py
import asyncio
import aiohttp
import time
class RateLimiter:
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.requests = []
async def acquire(self):
now = time.time()
# Remove requests older than the time period
self.requests = [req_time for req_time in self.requests
if now - req_time < self.time_period]
if len(self.requests) >= self.max_requests:
sleep_time = self.time_period - (now - self.requests[0])
await asyncio.sleep(sleep_time)
await self.acquire()
else:
self.requests.append(time.time())
async def fetch_with_limit(session, url, limiter):
await limiter.acquire()
async with session.get(url) as response:
return await response.json()
async def main():
limiter = RateLimiter(max_requests=2, time_period=1.0)
urls = [
'https://httpbin.org/get?id=1',
'https://httpbin.org/get?id=2',
'https://httpbin.org/get?id=3',
'https://httpbin.org/get?id=4',
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, limiter) for url in urls]
start = time.time()
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"Fetched {len(results)} responses in {elapsed:.2f} seconds")
asyncio.run(main())
Output:
Fetched 4 responses in 2.05 seconds
This rate limiter ensures no more than 2 requests happen within any 1-second window. You can adjust max_requests and time_period to match your API’s limits.
Working with Headers and Authentication
Many APIs require custom headers or authentication tokens. Here’s how to handle them with aiohttp:
# headers_auth.py
import asyncio
import aiohttp
async def fetch_with_auth(session, url, token):
headers = {
'Authorization': f'Bearer {token}',
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json',
}
async with session.get(url, headers=headers) as response:
return await response.json()
async def main():
token = 'your_api_token_here'
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_auth(session, url, token) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} authenticated responses")
asyncio.run(main())
Output:
Fetched 2 authenticated responses
You can also set default headers for all requests in a session by passing them during session creation. For basic authentication, use auth=aiohttp.BasicAuth('user', 'pass').
Real-Life Example: Concurrent API Data Collector
Let’s build a practical example that fetches data from multiple endpoints, implements error handling, rate limiting, and timeout management:
# api_data_collector.py
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
class APICollector:
def __init__(self, requests_per_second=2, timeout=10):
self.requests_per_second = requests_per_second
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.request_times = []
async def rate_limit(self):
now = time.time()
# Remove old timestamps
self.request_times = [t for t in self.request_times
if now - t < 1.0]
if len(self.request_times) >= self.requests_per_second:
sleep_time = 1.0 - (now - self.request_times[0])
await asyncio.sleep(sleep_time)
await self.rate_limit()
else:
self.request_times.append(time.time())
async def fetch(self, session, url: str) -> Dict[str, Any]:
await self.rate_limit()
try:
async with session.get(url, timeout=self.timeout) as response:
if response.status == 200:
return {
'url': url,
'status': 'success',
'data': await response.json(),
}
else:
return {
'url': url,
'status': 'error',
'code': response.status,
}
except asyncio.TimeoutError:
return {
'url': url,
'status': 'timeout',
}
except Exception as e:
return {
'url': url,
'status': 'error',
'error': str(e),
}
async def collect(self, urls: List[str]) -> List[Dict[str, Any]]:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
collector = APICollector(requests_per_second=3, timeout=5)
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/comments/1',
]
start = time.time()
results = await collector.collect(urls)
elapsed = time.time() - start
successful = sum(1 for r in results if r['status'] == 'success')
print(f"Collected {successful}/{len(urls)} responses in {elapsed:.2f}s")
for result in results:
print(f" {result['url'].split('/')[-1]}: {result['status']}")
asyncio.run(main())
Output:
Collected 5/5 responses in 1.85s
1: success
2: success
3: success
1: success
1: success
This APICollector class demonstrates a production-ready pattern for making concurrent requests with all the best practices: rate limiting, timeout handling, error recovery, and clean result reporting. You can extend it further with retry logic, exponential backoff, or caching.
Frequently Asked Questions
What’s the difference between aiohttp and requests?
The requests library is synchronous and blocks while waiting for responses, making it suitable for simple scripts. aiohttp is asynchronous and allows thousands of concurrent requests without blocking, making it essential for high-performance applications. Use requests for simple scripts and aiohttp for anything that needs concurrency.
Should I create a new session for each request?
No, absolutely not. Creating a new session is expensive because it initializes connection pooling and other resources. Always create one session and reuse it for all requests within a given scope. When you’re done with all requests, close the session using a context manager or the await session.close() method.
How do I limit the number of concurrent connections?
You can set connection limits when creating a ClientSession using the connector parameter: connector = aiohttp.TCPConnector(limit=100, limit_per_host=30). The limit parameter sets the total number of connections, while limit_per_host limits connections to a single host.
How do I handle large file downloads?
For large files, read the response in chunks instead of all at once: async for chunk in response.content.iter_chunked(8192): file.write(chunk). This prevents loading the entire file into memory at once.
Does aiohttp support WebSockets?
Yes, aiohttp has built-in WebSocket support for both client and server use cases. You can establish WebSocket connections with async with session.ws_connect(url) as ws: ... and exchange messages bidirectionally.
What exceptions should I catch?
The main exceptions to catch are asyncio.TimeoutError, aiohttp.ClientError (and its subclasses like ClientConnectionError, ClientSSLError), and asyncio.CancelledError for cancelled tasks. Always catch the more specific exceptions before the general ones.
Conclusion
aiohttp is the go-to library for making concurrent HTTP requests in Python. By leveraging asyncio, it allows you to handle dozens, hundreds, or even thousands of concurrent connections efficiently without the overhead of threading. The key takeaways are: create one session and reuse it, use asyncio.gather() for concurrency, always implement proper error handling and timeouts, and respect server rate limits.
Whether you’re building a web scraper, integrating multiple APIs, or creating a high-performance backend service, mastering aiohttp will significantly improve your application’s responsiveness and efficiency. The patterns and best practices shown in this tutorial will serve you well in production environments.
For more information, consult the official aiohttp documentation and the asyncio documentation.