Advanced

Introduction

Making HTTP requests is a fundamental task in web development and data collection. However, when you need to fetch data from multiple endpoints simultaneously, traditional blocking requests become a bottleneck. If you want to retrieve data from 100 different APIs, making sequential requests could take minutes. This is where concurrency comes in, allowing you to send multiple requests at the same time and dramatically speed up your application.

aiohttp is a Python library that enables asynchronous HTTP client and server functionality. Built on top of asyncio, aiohttp allows you to handle hundreds or thousands of concurrent requests without creating a thread for each one. This makes it ideal for web scraping, working with REST APIs, and building high-performance applications that need to juggle multiple I/O operations.

In this tutorial, we’ll explore how to use aiohttp to make concurrent HTTP requests, manage sessions efficiently, handle errors gracefully, and implement best practices like rate limiting and timeout management. By the end, you’ll understand how to build scalable applications that can fetch data from multiple sources simultaneously with minimal resource usage.

Quick Example: Fetching 5 URLs Concurrently

Let’s start with a simple example that demonstrates the power of concurrent requests. This script fetches data from five endpoints at the same time:

# concurrent_fetch_example.py
import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/2',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Fetched {len(results)} responses")

asyncio.run(main())

Output:

Fetched 5 responses

Notice how all five requests are sent immediately and processed in parallel. With traditional blocking requests, this would take 10 seconds (2 seconds per request). With aiohttp and asyncio, it completes in roughly 2 seconds because the requests happen concurrently.

aiohttp-intro
aiohttp.ClientSession: one connection pool to rule them all.

What is aiohttp?

aiohttp is a Python async HTTP client and server library. It’s built on top of asyncio, Python’s standard asynchronous I/O framework, which means it uses coroutines and event loops instead of threading to handle multiple operations concurrently. This approach is more efficient than threading because it avoids the overhead of context switching and thread management.

Key features of aiohttp include:

  • Asynchronous HTTP requests and responses
  • Connection pooling and session management
  • Automatic handling of redirects and cookies
  • Support for streaming and multipart uploads
  • Built-in timeout and error handling
  • WebSocket support
  • Both client and server functionality

The library is essential for building modern Python web applications that need to handle I/O efficiently. Whether you’re scraping data, calling APIs, or building a backend service that needs to communicate with multiple external services, aiohttp provides the tools you need.

Installing aiohttp

aiohttp is available on PyPI and can be installed with pip:

# terminal
pip install aiohttp

You can verify the installation by checking the version:

# test_installation.py
import aiohttp
print(f"aiohttp version: {aiohttp.__version__}")

Output:

aiohttp version: 3.9.1

That’s all you need! aiohttp will automatically install its dependencies, including yarl and multidict.

aiohttp-concurrent
asyncio.Semaphore: a bouncer for your API client.

Making GET Requests with aiohttp

A GET request retrieves data from a server. Here’s how to make a simple GET request with aiohttp:

# simple_get_request.py
import asyncio
import aiohttp

async def get_example():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://httpbin.org/get') as response:
            data = await response.json()
            print(f"Status: {response.status}")
            print(f"Headers: {response.headers}")
            print(f"Data: {data}")

asyncio.run(get_example())

Output:

Status: 200
Headers: 
Data: {'args': {}, 'headers': {...}, 'url': 'https://httpbin.org/get'}

The key parts of this code are: a ClientSession manages connections and cookies, and the async with statement ensures the connection is properly closed. Notice we use await to wait for the response without blocking other operations.

Making POST Requests

POST requests send data to a server. Here’s how to create a POST request with aiohttp:

# post_request_example.py
import asyncio
import aiohttp
import json

async def post_example():
    async with aiohttp.ClientSession() as session:
        payload = {'name': 'Alice', 'age': 30}
        async with session.post('https://httpbin.org/post', json=payload) as response:
            result = await response.json()
            print(f"Status: {response.status}")
            print(f"Sent data: {result['json']}")

asyncio.run(post_example())

Output:

Status: 200
Sent data: {'name': 'Alice', 'age': 30}

The json parameter automatically serializes your dictionary and sets the correct Content-Type header. You can also use data for form-encoded data or files for multipart uploads.

aiohttp-rate-limiting-1
400 requests. One event loop. Zero blocking.

Making Concurrent Requests with asyncio.gather()

The real power of aiohttp comes from running multiple requests concurrently. The asyncio.gather() function is the key tool for this:

# concurrent_requests.py
import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def fetch_multiple(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/posts/3',
    ]

    start = time.time()
    results = await fetch_multiple(urls)
    elapsed = time.time() - start

    print(f"Fetched {len(results)} posts in {elapsed:.2f} seconds")

asyncio.run(main())

Output:

Fetched 3 posts in 0.85 seconds

The pattern here is crucial: create a list of coroutines (tasks), then pass them to asyncio.gather(). This sends all requests immediately and waits for all of them to complete. If you need to handle errors individually, you can pass return_exceptions=True to gather().

aiohttp-real-life-1
ClientTimeout exists. Your 30-second silence has been noticed.

Session Management Best Practices

A ClientSession is a container for making HTTP requests and managing connections. It’s important to reuse the same session for multiple requests because it maintains a connection pool, which significantly improves performance. Here’s the right way to manage sessions:

# session_management.py
import asyncio
import aiohttp

async def fetch_with_session(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    # Create session once
    async with aiohttp.ClientSession() as session:
        urls = [
            'https://httpbin.org/get?id=1',
            'https://httpbin.org/get?id=2',
            'https://httpbin.org/get?id=3',
        ]

        # Reuse the same session for all requests
        tasks = [fetch_with_session(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Successfully fetched {len(results)} responses")

asyncio.run(main())

Output:

Successfully fetched 3 responses

Never create a new session for each request. Creating sessions is expensive because they initialize connection pools and other resources. Instead, create one session and reuse it for all your requests in a given scope.

Error Handling and Timeouts

Network requests can fail for various reasons. aiohttp provides built-in mechanisms to handle errors and set timeouts:

# error_handling.py
import asyncio
import aiohttp

async def fetch_with_error_handling(session, url):
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
            if response.status == 200:
                return await response.json()
            else:
                print(f"Error: {response.status} for {url}")
                return None
    except asyncio.TimeoutError:
        print(f"Timeout fetching {url}")
        return None
    except aiohttp.ClientError as e:
        print(f"Request failed for {url}: {e}")
        return None

async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/status/500',
        'https://httpbin.org/delay/10',  # Will timeout
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_error_handling(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        successful = len([r for r in results if r is not None])
        print(f"Successfully fetched {successful} out of {len(urls)} responses")

asyncio.run(main())

Output:

Error: 500 for https://httpbin.org/status/500
Timeout fetching https://httpbin.org/delay/10
Successfully fetched 1 out of 3 responses

The ClientTimeout object lets you set different timeout durations. You can set a total timeout, or separate timeouts for connection, reading, and writing. Always wrap requests in try-except blocks to handle network errors gracefully.

Implementing Rate Limiting

When making many concurrent requests, you may need to respect rate limits imposed by the server. Here’s how to implement basic rate limiting with asyncio:

# rate_limiting.py
import asyncio
import aiohttp
import time

class RateLimiter:
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.requests = []

    async def acquire(self):
        now = time.time()
        # Remove requests older than the time period
        self.requests = [req_time for req_time in self.requests
                        if now - req_time < self.time_period]

        if len(self.requests) >= self.max_requests:
            sleep_time = self.time_period - (now - self.requests[0])
            await asyncio.sleep(sleep_time)
            await self.acquire()
        else:
            self.requests.append(time.time())

async def fetch_with_limit(session, url, limiter):
    await limiter.acquire()
    async with session.get(url) as response:
        return await response.json()

async def main():
    limiter = RateLimiter(max_requests=2, time_period=1.0)

    urls = [
        'https://httpbin.org/get?id=1',
        'https://httpbin.org/get?id=2',
        'https://httpbin.org/get?id=3',
        'https://httpbin.org/get?id=4',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, limiter) for url in urls]
        start = time.time()
        results = await asyncio.gather(*tasks)
        elapsed = time.time() - start
        print(f"Fetched {len(results)} responses in {elapsed:.2f} seconds")

asyncio.run(main())

Output:

Fetched 4 responses in 2.05 seconds

This rate limiter ensures no more than 2 requests happen within any 1-second window. You can adjust max_requests and time_period to match your API’s limits.

Three HTTP requests at once. Total time = max, not sum.
Three HTTP requests at once. Total time = max, not sum.

Working with Headers and Authentication

Many APIs require custom headers or authentication tokens. Here’s how to handle them with aiohttp:

# headers_auth.py
import asyncio
import aiohttp

async def fetch_with_auth(session, url, token):
    headers = {
        'Authorization': f'Bearer {token}',
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json',
    }

    async with session.get(url, headers=headers) as response:
        return await response.json()

async def main():
    token = 'your_api_token_here'
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_auth(session, url, token) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Fetched {len(results)} authenticated responses")

asyncio.run(main())

Output:

Fetched 2 authenticated responses

You can also set default headers for all requests in a session by passing them during session creation. For basic authentication, use auth=aiohttp.BasicAuth('user', 'pass').

Real-Life Example: Concurrent API Data Collector

Let’s build a practical example that fetches data from multiple endpoints, implements error handling, rate limiting, and timeout management:

# api_data_collector.py
import asyncio
import aiohttp
import time
from typing import List, Dict, Any

class APICollector:
    def __init__(self, requests_per_second=2, timeout=10):
        self.requests_per_second = requests_per_second
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.request_times = []

    async def rate_limit(self):
        now = time.time()
        # Remove old timestamps
        self.request_times = [t for t in self.request_times
                             if now - t < 1.0]

        if len(self.request_times) >= self.requests_per_second:
            sleep_time = 1.0 - (now - self.request_times[0])
            await asyncio.sleep(sleep_time)
            await self.rate_limit()
        else:
            self.request_times.append(time.time())

    async def fetch(self, session, url: str) -> Dict[str, Any]:
        await self.rate_limit()
        try:
            async with session.get(url, timeout=self.timeout) as response:
                if response.status == 200:
                    return {
                        'url': url,
                        'status': 'success',
                        'data': await response.json(),
                    }
                else:
                    return {
                        'url': url,
                        'status': 'error',
                        'code': response.status,
                    }
        except asyncio.TimeoutError:
            return {
                'url': url,
                'status': 'timeout',
            }
        except Exception as e:
            return {
                'url': url,
                'status': 'error',
                'error': str(e),
            }

    async def collect(self, urls: List[str]) -> List[Dict[str, Any]]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            return await asyncio.gather(*tasks)

async def main():
    collector = APICollector(requests_per_second=3, timeout=5)

    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/posts/3',
        'https://jsonplaceholder.typicode.com/users/1',
        'https://jsonplaceholder.typicode.com/comments/1',
    ]

    start = time.time()
    results = await collector.collect(urls)
    elapsed = time.time() - start

    successful = sum(1 for r in results if r['status'] == 'success')
    print(f"Collected {successful}/{len(urls)} responses in {elapsed:.2f}s")

    for result in results:
        print(f"  {result['url'].split('/')[-1]}: {result['status']}")

asyncio.run(main())

Output:

Collected 5/5 responses in 1.85s
  1: success
  2: success
  3: success
  1: success
  1: success

This APICollector class demonstrates a production-ready pattern for making concurrent requests with all the best practices: rate limiting, timeout handling, error recovery, and clean result reporting. You can extend it further with retry logic, exponential backoff, or caching.

Frequently Asked Questions

What’s the difference between aiohttp and requests?

The requests library is synchronous and blocks while waiting for responses, making it suitable for simple scripts. aiohttp is asynchronous and allows thousands of concurrent requests without blocking, making it essential for high-performance applications. Use requests for simple scripts and aiohttp for anything that needs concurrency.

Should I create a new session for each request?

No, absolutely not. Creating a new session is expensive because it initializes connection pooling and other resources. Always create one session and reuse it for all requests within a given scope. When you’re done with all requests, close the session using a context manager or the await session.close() method.

How do I limit the number of concurrent connections?

You can set connection limits when creating a ClientSession using the connector parameter: connector = aiohttp.TCPConnector(limit=100, limit_per_host=30). The limit parameter sets the total number of connections, while limit_per_host limits connections to a single host.

How do I handle large file downloads?

For large files, read the response in chunks instead of all at once: async for chunk in response.content.iter_chunked(8192): file.write(chunk). This prevents loading the entire file into memory at once.

Does aiohttp support WebSockets?

Yes, aiohttp has built-in WebSocket support for both client and server use cases. You can establish WebSocket connections with async with session.ws_connect(url) as ws: ... and exchange messages bidirectionally.

What exceptions should I catch?

The main exceptions to catch are asyncio.TimeoutError, aiohttp.ClientError (and its subclasses like ClientConnectionError, ClientSSLError), and asyncio.CancelledError for cancelled tasks. Always catch the more specific exceptions before the general ones.

Conclusion

aiohttp is the go-to library for making concurrent HTTP requests in Python. By leveraging asyncio, it allows you to handle dozens, hundreds, or even thousands of concurrent connections efficiently without the overhead of threading. The key takeaways are: create one session and reuse it, use asyncio.gather() for concurrency, always implement proper error handling and timeouts, and respect server rate limits.

Whether you’re building a web scraper, integrating multiple APIs, or creating a high-performance backend service, mastering aiohttp will significantly improve your application’s responsiveness and efficiency. The patterns and best practices shown in this tutorial will serve you well in production environments.

For more information, consult the official aiohttp documentation and the asyncio documentation.

The asyncio + aiohttp Pattern

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as r:
        r.raise_for_status()
        return await r.text()

async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

urls = ["https://example.com", "https://python.org", "https://github.com"]
results = asyncio.run(main(urls))
for url, result in zip(urls, results):
    if isinstance(result, Exception):
        print(url, "FAILED:", result)
    else:
        print(url, "got", len(result), "bytes")

One ClientSession per program (or per long-lived context) — re-creating sessions is expensive. gather(return_exceptions=True) lets one failure not cancel the rest.

Concurrency Limit

import asyncio
import aiohttp

async def fetch_with_sem(sem, session, url):
    async with sem:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as r:
            return await r.text()

async def main(urls, concurrency=20):
    sem = asyncio.Semaphore(concurrency)
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_with_sem(sem, session, url) for url in urls],
            return_exceptions=True,
        )
    return results

# Now scraping 10,000 URLs maxes out at 20 in flight
results = asyncio.run(main(big_url_list, concurrency=20))

Without a semaphore, asyncio.gather() launches ALL tasks at once. 10,000 simultaneous connections = errors, throttling, OS file-handle exhaustion. The semaphore caps how many run concurrently.

POST, Headers, and JSON

async with aiohttp.ClientSession() as session:
    # POST with JSON body
    async with session.post(
        "https://api.example.com/users",
        json={"name": "Alice", "email": "alice@example.com"},
        headers={"Authorization": "Bearer " + token},
    ) as resp:
        result = await resp.json()
    
    # POST form data
    async with session.post(url, data={"key": "value"}) as resp:
        text = await resp.text()
    
    # File upload (multipart)
    with open("photo.jpg", "rb") as f:
        async with session.post(url, data={"file": f}) as resp:
            ...

Retries with tenacity

from tenacity import retry, stop_after_attempt, wait_random_exponential
import aiohttp

@retry(
    stop=stop_after_attempt(5),
    wait=wait_random_exponential(multiplier=1, max=30),
)
async def fetch_with_retry(session, url):
    async with session.get(url) as r:
        r.raise_for_status()
        return await r.text()

tenacity supports async functions natively. Retries respect the event loop; jitter prevents thundering-herd retry storms.

Streaming Large Responses

async with aiohttp.ClientSession() as session:
    async with session.get(big_file_url) as resp:
        with open("output.bin", "wb") as f:
            async for chunk in resp.content.iter_chunked(8192):
                f.write(chunk)
                # Process or write each 8KB chunk; memory stays constant

Common Pitfalls

  • Creating a Session per request. A new ClientSession allocates a new connection pool. Open once, reuse for all requests.
  • Forgetting to close the session. Use async with. Forgetting leaks connections.
  • No timeout. Default timeout is unlimited. A hanging request blocks forever. Always pass timeout=aiohttp.ClientTimeout(total=N).
  • Using synchronous requests inside async. requests.get blocks the event loop. Switch to aiohttp or wrap in asyncio.to_thread.
  • SSL errors. If you see “SSL certificate verify failed”, DON’T disable verification in production. Update certifi, check the actual cert, or use ssl.SSLContext explicitly.

FAQ

Q: aiohttp or httpx?
A: aiohttp is older and battle-tested; httpx supports sync AND async, with a requests-like API. httpx for new code; aiohttp when you have existing aiohttp code or need server features.

Q: How fast is concurrent vs sequential?
A: For I/O-bound work, near-linear speedup until you saturate network or target server. 100 sequential 100ms requests = 10 seconds; 100 concurrent = ~150ms.

Q: How do I handle rate limits?
A: Honor the Retry-After header from 429 responses. Use a semaphore for per-second limits. Tenacity wraps both.

Q: Connection pooling settings?
A: aiohttp’s TCPConnector has limit (total) and limit_per_host. Defaults are 100/30, usually fine. Tune higher for many-host scrapers, lower for single-host hammering.

Q: Streaming POST?
A: session.post(url, data=async_iterator) — pass an async generator that yields chunks. aiohttp streams them as they come.

Wrapping Up

Concurrent HTTP in Python is asyncio + aiohttp + a semaphore. One Session, scoped concurrency limit, timeouts on every request, retries via tenacity. That four-piece combo handles everything from small concurrent fetches to massive web crawlers without unraveling.