Intermediate
You chose asyncio for your project. Six months later a new team member wants to use trio. Your library might need to support both. Rewriting async code to switch runtimes is painful — the primitives are different, the cancellation models differ, and every asyncio.sleep() call becomes a liability. Writing async libraries that work across runtimes has historically meant publishing two separate codebases.
anyio solves this by providing a unified async API that runs on top of asyncio, trio, and (experimentally) curio. You write your code once against the anyio API and it runs on whichever backend is active. FastAPI and Starlette use anyio internally — if you have used those frameworks, you have already benefited from it. When writing your own async libraries, anyio is the recommended way to ensure portability without sacrificing features.
This article covers the anyio fundamentals: starting the event loop, task groups, cancellation, synchronization primitives, and file I/O integration. Each section shows how the anyio API maps to the equivalent asyncio and trio patterns. By the end you will know how to write async code that is testable on both backends and future-proof against runtime churn.
Running Async Code with anyio: Quick Example
The entry point to anyio is anyio.run(), which replaces asyncio.run(). You can switch backends by passing a backend argument:
# quick_anyio.py
import anyio
async def greet(name, delay):
await anyio.sleep(delay)
print(f'Hello, {name}!')
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(greet, 'asyncio', 0.1)
tg.start_soon(greet, 'trio', 0.05)
tg.start_soon(greet, 'world', 0.15)
# Run on asyncio (default)
anyio.run(main)
# To run on trio: anyio.run(main, backend='trio')
# (requires: pip install trio)
Output:
Hello, trio!
Hello, asyncio!
Hello, world!
Three tasks run concurrently inside a task group. The output order reflects the different delay values — trio finishes first at 0.05s. Switching to the trio backend requires only changing backend='trio' in the anyio.run() call; the coroutine code does not change at all. This backend portability is anyio’s core value proposition.
What Is anyio and How Does It Compare?
anyio is a compatibility shim that abstracts the differences between async runtimes. Its API is deliberately close to trio’s structured concurrency model — which is considered more correct than asyncio’s lower-level primitives. The result is that anyio code tends to be cleaner than raw asyncio code even when only targeting asyncio.
| Feature | asyncio | trio | anyio |
|---|---|---|---|
| Concurrent tasks | asyncio.gather() | trio.open_nursery() | anyio.create_task_group() |
| Sleep | asyncio.sleep() | trio.sleep() | anyio.sleep() |
| Timeout | asyncio.wait_for() | trio.fail_after() | anyio.fail_after() |
| Cancel scope | manual with Task.cancel() | trio.CancelScope | anyio.CancelScope |
| File I/O | aiofiles (external) | built-in | built-in |
| Backend portability | No | No | Yes |
Install anyio with: pip install anyio. To also enable the trio backend: pip install anyio[trio]. The asyncio backend is built-in — no extra install needed.
Task Groups
Task groups are the structured concurrency primitive in anyio. Every task started inside a create_task_group() block must finish before the block exits. If any task raises an exception, all other tasks in the group are cancelled — no silent failures, no zombie tasks.
# task_groups.py
import anyio
import httpx
async def fetch_status(client, url):
resp = await client.get(url)
print(f'{url}: HTTP {resp.status_code}')
return resp.status_code
async def main():
urls = [
'https://httpbin.org/status/200',
'https://httpbin.org/status/301',
'https://httpbin.org/status/404',
]
async with httpx.AsyncClient() as client:
async with anyio.create_task_group() as tg:
for url in urls:
tg.start_soon(fetch_status, client, url)
print('All requests complete')
anyio.run(main)
Output:
https://httpbin.org/status/200: HTTP 200
https://httpbin.org/status/404: HTTP 404
https://httpbin.org/status/301: HTTP 200
All requests complete
All three requests run concurrently. The order of results depends on network response times. The task group guarantees that print('All requests complete') only runs after every task has finished — unlike asyncio.gather() which requires careful error handling to avoid swallowed exceptions. Note: this example requires pip install httpx.
Timeouts and Cancellation
Failing After a Deadline
Timeouts are one of the most common async correctness mistakes. anyio.fail_after() raises TimeoutError if the block does not complete within the specified seconds. Unlike asyncio.wait_for(), it works as a context manager and composes correctly with task groups.
# timeout_demo.py
import anyio
async def slow_operation(seconds):
print(f'Starting {seconds}s operation')
await anyio.sleep(seconds)
print(f'Finished {seconds}s operation')
return f'result_{seconds}'
async def main():
# This will complete in time
try:
with anyio.fail_after(2.0):
await slow_operation(0.5)
print('Fast op: succeeded')
except TimeoutError:
print('Fast op: timed out')
# This will time out
try:
with anyio.fail_after(0.3):
await slow_operation(2.0)
print('Slow op: succeeded')
except TimeoutError:
print('Slow op: TIMED OUT after 0.3s')
anyio.run(main)
Output:
Starting 0.5s operation
Finished 0.5s operation
Fast op: succeeded
Starting 2.0s operation
Slow op: TIMED OUT after 0.3s
fail_after() cancels the code inside its block when the deadline is exceeded and raises TimeoutError. The move_on_after() variant silently skips to the end of the block without raising — useful when a timeout is acceptable but not exceptional.
Cancel Scopes
For fine-grained cancellation control, use anyio.CancelScope directly. You can cancel a scope programmatically or check whether a scope was cancelled with scope.cancelled_caught.
# cancel_scope.py
import anyio
async def worker(name, delay):
try:
await anyio.sleep(delay)
print(f'{name}: done')
except anyio.get_cancelled_exc_class():
print(f'{name}: was cancelled')
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(worker, 'fast', 0.1)
tg.start_soon(worker, 'medium', 0.5)
tg.start_soon(worker, 'slow', 2.0)
await anyio.sleep(0.3)
tg.cancel_scope.cancel() # Cancel all remaining tasks
print('Cancel scope triggered')
anyio.run(main)
Output:
fast: done
Cancel scope triggered
medium: was cancelled
slow: was cancelled
anyio.get_cancelled_exc_class() returns the correct cancellation exception for the current backend (asyncio.CancelledError on asyncio, trio.Cancelled on trio). Using it instead of catching a specific exception class is required for correct backend-portable cancellation handling.
Synchronization Primitives
anyio provides the same synchronization primitives as asyncio but with backend-portable implementations: Lock, Event, Semaphore, and CapacityLimiter. The example below uses an Event to signal between tasks and a Semaphore to limit concurrent database connections.
# sync_primitives.py
import anyio
async def producer(event, results):
print('Producer: generating data')
await anyio.sleep(0.2)
results.extend([1, 2, 3, 4, 5])
print('Producer: data ready, signalling')
event.set()
async def consumer(event, results):
print('Consumer: waiting for data')
await event.wait()
print(f'Consumer: got {len(results)} items: {results}')
async def limited_worker(semaphore, worker_id):
async with semaphore:
print(f'Worker {worker_id} acquired slot')
await anyio.sleep(0.1)
print(f'Worker {worker_id} releasing slot')
async def main():
# Event signalling
event = anyio.Event()
results = []
async with anyio.create_task_group() as tg:
tg.start_soon(producer, event, results)
tg.start_soon(consumer, event, results)
# Semaphore: max 2 concurrent workers
print('\nSemaphore demo (max 2 concurrent):')
sem = anyio.Semaphore(2)
async with anyio.create_task_group() as tg:
for i in range(5):
tg.start_soon(limited_worker, sem, i)
anyio.run(main)
Output:
Consumer: waiting for data
Producer: generating data
Producer: data ready, signalling
Consumer: got 5 items: [1, 2, 3, 4, 5]
Semaphore demo (max 2 concurrent):
Worker 0 acquired slot
Worker 1 acquired slot
Worker 0 releasing slot
Worker 2 acquired slot
Worker 1 releasing slot
Worker 3 acquired slot
...
The semaphore limits concurrent access to 2 slots at a time — useful for rate-limiting outbound API calls or capping the number of active database connections. The anyio CapacityLimiter is similar but designed specifically for limiting thread pool workers.
Real-Life Example: Parallel URL Health Checker
# health_checker.py
import anyio
import httpx
from datetime import datetime
async def check_url(client, url, results, semaphore):
async with semaphore:
start = datetime.now()
try:
with anyio.fail_after(5.0):
resp = await client.get(url, follow_redirects=True)
elapsed = (datetime.now() - start).total_seconds()
results.append({
'url': url,
'status': resp.status_code,
'ok': 200 <= resp.status_code < 400,
'latency': round(elapsed, 3)
})
except TimeoutError:
results.append({'url': url, 'status': 'timeout', 'ok': False, 'latency': 5.0})
except Exception as exc:
results.append({'url': url, 'status': str(exc)[:40], 'ok': False, 'latency': 0})
async def run_health_check(urls, max_concurrent=5):
results = []
semaphore = anyio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
async with anyio.create_task_group() as tg:
for url in urls:
tg.start_soon(check_url, client, url, results, semaphore)
results.sort(key=lambda r: r['latency'])
return results
async def main():
urls = [
'https://httpbin.org/status/200',
'https://httpbin.org/status/404',
'https://httpbin.org/delay/1',
'https://httpbin.org/status/500',
'https://jsonplaceholder.typicode.com/posts/1',
]
results = await run_health_check(urls, max_concurrent=3)
print(f'{"URL":<45} {"Status":<10} {"OK":<6} {"Latency"}')
print('-' * 72)
for r in results:
url_short = r['url'].split('httpbin.org')[-1] or r['url']
print(f'{url_short:<45} {str(r["status"]):<10} {str(r["ok"]):<6} {r["latency"]}s')
anyio.run(main)
Output:
URL Status OK Latency
------------------------------------------------------------------------
/status/200 200 True 0.152s
/status/404 404 False 0.161s
/status/500 500 False 0.178s
jsonplaceholder.typicode.com/posts/1 200 True 0.312s
/delay/1 200 True 1.203s
The health checker combines three anyio patterns: a task group for concurrency, a semaphore to cap simultaneous connections, and fail_after() to enforce per-URL timeouts. The entire script runs on asyncio by default and can be switched to trio with a one-line change. Extend it by writing results to a file with anyio.open_file() or posting to a webhook.
Frequently Asked Questions
Should I use asyncio or trio as my anyio backend?
For most projects, asyncio is the right default -- it is part of the standard library, widely supported, and required by libraries like aiohttp and FastAPI. Use trio as your backend when you want its stricter structured concurrency guarantees and better error propagation, particularly for long-running services. anyio makes the choice reversible, so you can start with asyncio and switch later.
Can I use asyncio.gather() with anyio?
Yes, but it is not recommended. Mixing asyncio.gather() inside anyio code works on the asyncio backend but breaks on trio. Use anyio.create_task_group() instead -- it is equivalent to asyncio.gather()` for most use cases and is backend-portable. The task group model also has better error semantics: one failing task cancels the rest rather than leaving them running.
How does anyio interact with blocking code?
Use anyio.to_thread.run_sync() to run blocking code in a thread without blocking the event loop. This is the anyio equivalent of asyncio.get_event_loop().run_in_executor(). The syntax is: result = await anyio.to_thread.run_sync(blocking_function, arg1, arg2).
Why does the example use httpx instead of aiohttp?
httpx supports both asyncio and trio backends, making it the natural HTTP client for anyio code. aiohttp is asyncio-only and will fail on the trio backend. For backend-portable code, prefer httpx, which is used by FastAPI's test client and has a nearly identical API to the synchronous requests library.
Does anyio work with FastAPI?
Yes -- FastAPI uses anyio internally. Any async def route handler runs inside anyio's runtime, which means you can use anyio.create_task_group(), anyio.fail_after(), and other primitives directly inside FastAPI route functions without any extra setup.
Conclusion
You have covered anyio's essential toolkit: anyio.run() for starting the event loop with any backend, create_task_group() for structured concurrency, fail_after() and move_on_after() for timeouts, CancelScope for manual cancellation, and synchronization primitives including Event and Semaphore. The health checker example shows how these combine into a robust async utility.
The key shift anyio encourages is away from ad-hoc asyncio.gather() patterns toward structured concurrency with task groups. Even if you never switch backends, the cleaner error propagation and cancellation model is worth the migration. For deeper coverage of structured concurrency concepts, see the anyio documentation and Nathaniel Smith's original notes on structured concurrency.