Advanced

Introduction

Making HTTP requests is a fundamental task in web development and data collection. However, when you need to fetch data from multiple endpoints simultaneously, traditional blocking requests become a bottleneck. If you want to retrieve data from 100 different APIs, making sequential requests could take minutes. This is where concurrency comes in, allowing you to send multiple requests at the same time and dramatically speed up your application.

aiohttp is a Python library that enables asynchronous HTTP client and server functionality. Built on top of asyncio, aiohttp allows you to handle hundreds or thousands of concurrent requests without creating a thread for each one. This makes it ideal for web scraping, working with REST APIs, and building high-performance applications that need to juggle multiple I/O operations.

In this tutorial, we’ll explore how to use aiohttp to make concurrent HTTP requests, manage sessions efficiently, handle errors gracefully, and implement best practices like rate limiting and timeout management. By the end, you’ll understand how to build scalable applications that can fetch data from multiple sources simultaneously with minimal resource usage.

Quick Example: Fetching 5 URLs Concurrently

Let’s start with a simple example that demonstrates the power of concurrent requests. This script fetches data from five endpoints at the same time:

# concurrent_fetch_example.py
import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Fetched {len(results)} responses")

asyncio.run(main())

Output:

Fetched 5 responses

Notice how all five requests are sent immediately and processed in parallel. With traditional blocking requests, this would take 10 seconds (2 seconds per request). With aiohttp and asyncio, it completes in roughly 2 seconds because the requests happen concurrently.

aiohttp-intro
aiohttp.ClientSession: one connection pool to rule them all.

What is aiohttp?

aiohttp is a Python async HTTP client and server library. It’s built on top of asyncio, Python’s standard asynchronous I/O framework, which means it uses coroutines and event loops instead of threading to handle multiple operations concurrently. This approach is more efficient than threading because it avoids the overhead of context switching and thread management.

Key features of aiohttp include:

  • Asynchronous HTTP requests and responses
  • Connection pooling and session management
  • Automatic handling of redirects and cookies
  • Support for streaming and multipart uploads
  • Built-in timeout and error handling
  • WebSocket support
  • Both client and server functionality

The library is essential for building modern Python web applications that need to handle I/O efficiently. Whether you’re scraping data, calling APIs, or building a backend service that needs to communicate with multiple external services, aiohttp provides the tools you need.

Installing aiohttp

aiohttp is available on PyPI and can be installed with pip:

# terminal
pip install aiohttp

You can verify the installation by checking the version:

# test_installation.py
import aiohttp
print(f"aiohttp version: {aiohttp.__version__}")

Output:

aiohttp version: 3.9.1

That’s all you need! aiohttp will automatically install its dependencies, including yarl and multidict.

aiohttp-concurrent
asyncio.Semaphore: a bouncer for your API client.

Making GET Requests with aiohttp

A GET request retrieves data from a server. Here’s how to make a simple GET request with aiohttp:

# simple_get_request.py
import asyncio
import aiohttp

async def get_example():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://httpbin.org/get') as response:
            data = await response.json()
            print(f"Status: {response.status}")
            print(f"Headers: {response.headers}")
            print(f"Data: {data}")

asyncio.run(get_example())

Output:

Status: 200
Headers: 
Data: {'args': {}, 'headers': {...}, 'url': 'https://httpbin.org/get'}

The key parts of this code are: a ClientSession manages connections and cookies, and the async with statement ensures the connection is properly closed. Notice we use await to wait for the response without blocking other operations.

Making POST Requests

POST requests send data to a server. Here’s how to create a POST request with aiohttp:

# post_request_example.py
import asyncio
import aiohttp
import json

async def post_example():
    async with aiohttp.ClientSession() as session:
        payload = {'name': 'Alice', 'age': 30}
        async with session.post('https://httpbin.org/post', json=payload) as response:
            result = await response.json()
            print(f"Status: {response.status}")
            print(f"Sent data: {result['json']}")

asyncio.run(post_example())

Output:

Status: 200
Sent data: {'name': 'Alice', 'age': 30}

The json parameter automatically serializes your dictionary and sets the correct Content-Type header. You can also use data for form-encoded data or files for multipart uploads.

aiohttp-rate-limiting-1
400 requests. One event loop. Zero blocking.

Making Concurrent Requests with asyncio.gather()

The real power of aiohttp comes from running multiple requests concurrently. The asyncio.gather() function is the key tool for this:

# concurrent_requests.py
import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def fetch_multiple(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/posts/3',
    ]

    start = time.time()
    results = await fetch_multiple(urls)
    elapsed = time.time() - start

    print(f"Fetched {len(results)} posts in {elapsed:.2f} seconds")

asyncio.run(main())

Output:

Fetched 3 posts in 0.85 seconds

The pattern here is crucial: create a list of coroutines (tasks), then pass them to asyncio.gather(). This sends all requests immediately and waits for all of them to complete. If you need to handle errors individually, you can pass return_exceptions=True to gather().

aiohttp-real-life-1
ClientTimeout exists. Your 30-second silence has been noticed.

Session Management Best Practices

A ClientSession is a container for making HTTP requests and managing connections. It’s important to reuse the same session for multiple requests because it maintains a connection pool, which significantly improves performance. Here’s the right way to manage sessions:

# session_management.py
import asyncio
import aiohttp

async def fetch_with_session(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    # Create session once
    async with aiohttp.ClientSession() as session:
        urls = [
            'https://httpbin.org/get?id=1',
            'https://httpbin.org/get?id=2',
            'https://httpbin.org/get?id=3',
        ]

        # Reuse the same session for all requests
        tasks = [fetch_with_session(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Successfully fetched {len(results)} responses")

asyncio.run(main())

Output:

Successfully fetched 3 responses

Never create a new session for each request. Creating sessions is expensive because they initialize connection pools and other resources. Instead, create one session and reuse it for all your requests in a given scope.

Error Handling and Timeouts

Network requests can fail for various reasons. aiohttp provides built-in mechanisms to handle errors and set timeouts:

# error_handling.py
import asyncio
import aiohttp

async def fetch_with_error_handling(session, url):
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
            if response.status == 200:
                return await response.json()
            else:
                print(f"Error: {response.status} for {url}")
                return None
    except asyncio.TimeoutError:
        print(f"Timeout fetching {url}")
        return None
    except aiohttp.ClientError as e:
        print(f"Request failed for {url}: {e}")
        return None

async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/status/500',
        'https://httpbin.org/delay/10',  # Will timeout
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_error_handling(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        successful = len([r for r in results if r is not None])
        print(f"Successfully fetched {successful} out of {len(urls)} responses")

asyncio.run(main())

Output:

Error: 500 for https://httpbin.org/status/500
Timeout fetching https://httpbin.org/delay/10
Successfully fetched 1 out of 3 responses

The ClientTimeout object lets you set different timeout durations. You can set a total timeout, or separate timeouts for connection, reading, and writing. Always wrap requests in try-except blocks to handle network errors gracefully.

Implementing Rate Limiting

When making many concurrent requests, you may need to respect rate limits imposed by the server. Here’s how to implement basic rate limiting with asyncio:

# rate_limiting.py
import asyncio
import aiohttp
import time

class RateLimiter:
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.requests = []

    async def acquire(self):
        now = time.time()
        # Remove requests older than the time period
        self.requests = [req_time for req_time in self.requests
                        if now - req_time < self.time_period]

        if len(self.requests) >= self.max_requests:
            sleep_time = self.time_period - (now - self.requests[0])
            await asyncio.sleep(sleep_time)
            await self.acquire()
        else:
            self.requests.append(time.time())

async def fetch_with_limit(session, url, limiter):
    await limiter.acquire()
    async with session.get(url) as response:
        return await response.json()

async def main():
    limiter = RateLimiter(max_requests=2, time_period=1.0)

    urls = [
        'https://httpbin.org/get?id=1',
        'https://httpbin.org/get?id=2',
        'https://httpbin.org/get?id=3',
        'https://httpbin.org/get?id=4',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, limiter) for url in urls]
        start = time.time()
        results = await asyncio.gather(*tasks)
        elapsed = time.time() - start
        print(f"Fetched {len(results)} responses in {elapsed:.2f} seconds")

asyncio.run(main())

Output:

Fetched 4 responses in 2.05 seconds

This rate limiter ensures no more than 2 requests happen within any 1-second window. You can adjust max_requests and time_period to match your API’s limits.

Working with Headers and Authentication

Many APIs require custom headers or authentication tokens. Here’s how to handle them with aiohttp:

# headers_auth.py
import asyncio
import aiohttp

async def fetch_with_auth(session, url, token):
    headers = {
        'Authorization': f'Bearer {token}',
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json',
    }

    async with session.get(url, headers=headers) as response:
        return await response.json()

async def main():
    token = 'your_api_token_here'
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_auth(session, url, token) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Fetched {len(results)} authenticated responses")

asyncio.run(main())

Output:

Fetched 2 authenticated responses

You can also set default headers for all requests in a session by passing them during session creation. For basic authentication, use auth=aiohttp.BasicAuth('user', 'pass').

Real-Life Example: Concurrent API Data Collector

Let’s build a practical example that fetches data from multiple endpoints, implements error handling, rate limiting, and timeout management:

# api_data_collector.py
import asyncio
import aiohttp
import time
from typing import List, Dict, Any

class APICollector:
    def __init__(self, requests_per_second=2, timeout=10):
        self.requests_per_second = requests_per_second
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.request_times = []

    async def rate_limit(self):
        now = time.time()
        # Remove old timestamps
        self.request_times = [t for t in self.request_times
                             if now - t < 1.0]

        if len(self.request_times) >= self.requests_per_second:
            sleep_time = 1.0 - (now - self.request_times[0])
            await asyncio.sleep(sleep_time)
            await self.rate_limit()
        else:
            self.request_times.append(time.time())

    async def fetch(self, session, url: str) -> Dict[str, Any]:
        await self.rate_limit()
        try:
            async with session.get(url, timeout=self.timeout) as response:
                if response.status == 200:
                    return {
                        'url': url,
                        'status': 'success',
                        'data': await response.json(),
                    }
                else:
                    return {
                        'url': url,
                        'status': 'error',
                        'code': response.status,
                    }
        except asyncio.TimeoutError:
            return {
                'url': url,
                'status': 'timeout',
            }
        except Exception as e:
            return {
                'url': url,
                'status': 'error',
                'error': str(e),
            }

    async def collect(self, urls: List[str]) -> List[Dict[str, Any]]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            return await asyncio.gather(*tasks)

async def main():
    collector = APICollector(requests_per_second=3, timeout=5)

    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/posts/3',
        'https://jsonplaceholder.typicode.com/users/1',
        'https://jsonplaceholder.typicode.com/comments/1',
    ]

    start = time.time()
    results = await collector.collect(urls)
    elapsed = time.time() - start

    successful = sum(1 for r in results if r['status'] == 'success')
    print(f"Collected {successful}/{len(urls)} responses in {elapsed:.2f}s")

    for result in results:
        print(f"  {result['url'].split('/')[-1]}: {result['status']}")

asyncio.run(main())

Output:

Collected 5/5 responses in 1.85s
  1: success
  2: success
  3: success
  1: success
  1: success

This APICollector class demonstrates a production-ready pattern for making concurrent requests with all the best practices: rate limiting, timeout handling, error recovery, and clean result reporting. You can extend it further with retry logic, exponential backoff, or caching.

Frequently Asked Questions

What’s the difference between aiohttp and requests?

The requests library is synchronous and blocks while waiting for responses, making it suitable for simple scripts. aiohttp is asynchronous and allows thousands of concurrent requests without blocking, making it essential for high-performance applications. Use requests for simple scripts and aiohttp for anything that needs concurrency.

Should I create a new session for each request?

No, absolutely not. Creating a new session is expensive because it initializes connection pooling and other resources. Always create one session and reuse it for all requests within a given scope. When you’re done with all requests, close the session using a context manager or the await session.close() method.

How do I limit the number of concurrent connections?

You can set connection limits when creating a ClientSession using the connector parameter: connector = aiohttp.TCPConnector(limit=100, limit_per_host=30). The limit parameter sets the total number of connections, while limit_per_host limits connections to a single host.

How do I handle large file downloads?

For large files, read the response in chunks instead of all at once: async for chunk in response.content.iter_chunked(8192): file.write(chunk). This prevents loading the entire file into memory at once.

Does aiohttp support WebSockets?

Yes, aiohttp has built-in WebSocket support for both client and server use cases. You can establish WebSocket connections with async with session.ws_connect(url) as ws: ... and exchange messages bidirectionally.

What exceptions should I catch?

The main exceptions to catch are asyncio.TimeoutError, aiohttp.ClientError (and its subclasses like ClientConnectionError, ClientSSLError), and asyncio.CancelledError for cancelled tasks. Always catch the more specific exceptions before the general ones.

Conclusion

aiohttp is the go-to library for making concurrent HTTP requests in Python. By leveraging asyncio, it allows you to handle dozens, hundreds, or even thousands of concurrent connections efficiently without the overhead of threading. The key takeaways are: create one session and reuse it, use asyncio.gather() for concurrency, always implement proper error handling and timeouts, and respect server rate limits.

Whether you’re building a web scraper, integrating multiple APIs, or creating a high-performance backend service, mastering aiohttp will significantly improve your application’s responsiveness and efficiency. The patterns and best practices shown in this tutorial will serve you well in production environments.

For more information, consult the official aiohttp documentation and the asyncio documentation.