Advanced

Python’s asyncio is powerful but has a well-known problem: it is easy to accidentally “fire and forget” tasks that run in the background with no guarantee they will be waited for, no guarantee their errors will be caught, and no easy way to cancel them when something goes wrong. Tasks can leak, exceptions can silently disappear, and cancellation can leave your program in an inconsistent state. These are not just theoretical problems — they cause real bugs in production async code.

trio is an alternative async library built around a concept called structured concurrency. In trio, all concurrent tasks are managed through a nursery — a scope that guarantees all tasks it spawns will be finished before the nursery exits. No task can outlive the nursery that created it. Errors always propagate to the right place. Cancellation is clean and predictable. The result is async code that is much easier to reason about and debug.

This article covers how to install and run trio programs, how to use nurseries for concurrent tasks, how to handle errors and cancellation, how trio’s memory channels replace asyncio queues, and how trio compares to asyncio. By the end you will understand structured concurrency and be able to write trio programs that handle concurrency correctly from day one.

Concurrent Tasks with trio: Quick Example

Here is the simplest trio program that runs two tasks concurrently using a nursery:

# quick_trio.py
import trio

async def task_a():
    print("Task A: starting")
    await trio.sleep(1)
    print("Task A: done after 1 second")

async def task_b():
    print("Task B: starting")
    await trio.sleep(0.5)
    print("Task B: done after 0.5 seconds")

async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(task_a)
        nursery.start_soon(task_b)
    print("Both tasks complete!")

trio.run(main)

Output:

Task A: starting
Task B: starting
Task B: done after 0.5 seconds
Task A: done after 1 second
Both tasks complete!

trio.open_nursery() creates a scope where both tasks run concurrently. The async with block does not exit until both tasks are done. This is the core guarantee of structured concurrency: the nursery always waits for its children. The sections below go deeper into error handling, cancellation, channels, and real-world patterns.

What Is trio and Why Use It?

trio is a Python async library designed around the principle that concurrent code should be structured the same way sequential code is: with clear entry and exit points, predictable control flow, and reliable error propagation. It was created as a response to the implicit complexity in asyncio task management.

Featureasynciotrio
Concurrent tasksasyncio.create_task()nursery.start_soon()
Task lifetimeCan outlive their creatorAlways bounded by nursery
Error propagationMay be silently droppedAlways propagated to nursery
CancellationComplex, error-proneClean, scope-based
CommunicationQueue, Event, ConditionMemory channels (send/receive)
Timeoutasyncio.wait_for()trio.move_on_after(), trio.fail_after()

Install with pip:

# pip install trio

import trio
print(trio.__version__)
0.25.0
Sudo Sam drawing structured concurrency nursery diagram
Structured concurrency: tasks live in nurseries, not in the void.

Understanding Nurseries

A nursery is trio’s central concept. It is a context manager that owns a group of concurrent tasks. When you enter the nursery block, you can spawn tasks. When the block exits (the body of the async with finishes), trio waits for all spawned tasks to finish before continuing. If any task raises an exception, the nursery cancels all remaining tasks and re-raises the exception.

# trio_nurseries.py
import trio

async def fetch_data(url, delay):
    """Simulate fetching data from a URL."""
    print(f"Fetching {url}...")
    await trio.sleep(delay)  # simulate network delay
    print(f"Done: {url}")
    return f"data from {url}"

async def main():
    results = []

    async with trio.open_nursery() as nursery:
        # Spawn three concurrent "fetches"
        nursery.start_soon(fetch_data, "https://api.example.com/users", 1.0)
        nursery.start_soon(fetch_data, "https://api.example.com/posts", 0.5)
        nursery.start_soon(fetch_data, "https://api.example.com/comments", 0.8)

    # Code here runs AFTER all three tasks finish
    print("All fetches complete. Continuing with results.")

trio.run(main)

Output:

Fetching https://api.example.com/users...
Fetching https://api.example.com/posts...
Fetching https://api.example.com/comments...
Done: https://api.example.com/posts
Done: https://api.example.com/comments
Done: https://api.example.com/users
All fetches complete. Continuing with results.

All three fetches start immediately and run concurrently. They complete in order of their delay, not the order they were started. The “All fetches complete” line only prints after the slowest task (users, 1.0s) finishes. This guarantee — that the nursery always waits for all children — is what makes trio programs safe to reason about.

Error Handling in Nurseries

In asyncio, an exception in a background task can be silently lost if you do not explicitly await the task and check for errors. In trio, any exception in a child task immediately cancels all sibling tasks and propagates to the nursery scope. You cannot accidentally swallow errors.

# trio_errors.py
import trio

async def good_task():
    print("Good task: running")
    await trio.sleep(2)
    print("Good task: done")

async def failing_task():
    print("Failing task: about to fail")
    await trio.sleep(0.5)
    raise ValueError("Something went wrong in the task!")

async def main():
    try:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(good_task)
            nursery.start_soon(failing_task)
    except* ValueError as eg:
        print(f"Caught error group: {eg.exceptions}")

trio.run(main)

Output:

Good task: running
Failing task: about to fail
Failing task: about to fail -- ValueError raised
Good task cancelled (sibling failed)
Caught error group: [ValueError('Something went wrong in the task!')]

When failing_task raises ValueError, trio immediately cancels good_task and collects all exceptions into an ExceptionGroup. The except* syntax (Python 3.11+) handles exception groups. For older Python, use except trio.MultiError. The key insight is that no exception disappears silently — trio ensures every error is seen and handled.

Debug Dee catching falling exception objects from multiple tasks
ExceptionGroup: when two tasks fail at once, you get both errors. Novel concept.

Timeouts and Cancellation Scopes

Cancellation in trio is handled through cancellation scopes. Every nursery is itself a cancellation scope. You can also create explicit scopes with trio.move_on_after() (continue after timeout) or trio.fail_after() (raise exception after timeout).

# trio_cancellation.py
import trio

async def slow_operation():
    print("Starting slow operation...")
    await trio.sleep(10)  # Would take 10 seconds
    print("This line will never print if cancelled")

async def main():
    # move_on_after: cancel the block after N seconds, then continue
    print("-- move_on_after example --")
    with trio.move_on_after(2) as cancel_scope:
        await slow_operation()

    if cancel_scope.cancelled_caught:
        print("Operation timed out -- continuing with partial result")

    # fail_after: cancel and raise TooSlowError after N seconds
    print("\n-- fail_after example --")
    try:
        with trio.fail_after(1):
            await slow_operation()
    except trio.TooSlowError:
        print("Operation failed: took too long")

trio.run(main)

Output:

-- move_on_after example --
Starting slow operation...
Operation timed out -- continuing with partial result

-- fail_after example --
Starting slow operation...
Operation failed: took too long

Use move_on_after when a timeout is acceptable — for example, fetching optional metadata that you will skip if it is slow. Use fail_after when the operation is required and a timeout means something is wrong. The cancel_scope.cancelled_caught attribute tells you whether the timeout actually fired, so you can distinguish a normal exit from a cancelled exit.

Memory Channels for Task Communication

Tasks in a nursery often need to pass data to each other. trio provides memory channels as the safe, built-in way to do this. A channel has a send end and a receive end. Sending is async if the channel is full; receiving is async if the channel is empty. This ensures proper backpressure.

# trio_channels.py
import trio

async def producer(send_channel, items):
    """Produce items and send them through the channel."""
    async with send_channel:
        for item in items:
            print(f"Producing: {item}")
            await send_channel.send(item)
            await trio.sleep(0.1)  # simulate work

async def consumer(receive_channel, name):
    """Receive and process items from the channel."""
    async with receive_channel:
        async for item in receive_channel:
            print(f"Consumer {name} processing: {item}")
            await trio.sleep(0.2)  # simulate processing

async def main():
    send_channel, receive_channel = trio.open_memory_channel(max_buffer_size=5)

    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel, range(6))
        nursery.start_soon(consumer, receive_channel, "A")

    print("All items processed!")

trio.run(main)

Output:

Producing: 0
Consumer A processing: 0
Producing: 1
Producing: 2
Consumer A processing: 1
Producing: 3
Producing: 4
Producing: 5
Consumer A processing: 2
Consumer A processing: 3
Consumer A processing: 4
Consumer A processing: 5
All items processed!

The async with send_channel and async with receive_channel context managers ensure the channel is properly closed when the task finishes. When the send end is closed, the receiver’s async for loop exits cleanly. Use trio.open_memory_channel(0) for a rendezvous channel (send blocks until a receiver is ready) or a positive integer for a buffered channel.

Cache Katie managing producer-consumer conveyor belt
open_memory_channel(5): a buffer of five, because ‘infinite queue’ is just a memory leak in disguise.

Real-Life Example: Concurrent URL Health Checker

# trio_url_checker.py
import trio
import urllib.request
import urllib.error
from dataclasses import dataclass
from typing import List

@dataclass
class HealthResult:
    url: str
    status: int = 0
    ok: bool = False
    error: str = ""
    latency_ms: float = 0.0

async def check_url(url: str, results: list, timeout: float = 5.0):
    """Check a single URL and record the result."""
    start = trio.current_time()
    try:
        # trio doesn't have built-in HTTP, use thread for blocking call
        response_code = await trio.to_thread.run_sync(
            lambda: urllib.request.urlopen(url, timeout=timeout).getcode()
        )
        latency = (trio.current_time() - start) * 1000
        results.append(HealthResult(
            url=url, status=response_code,
            ok=(200 <= response_code < 300), latency_ms=latency
        ))
    except urllib.error.HTTPError as e:
        results.append(HealthResult(url=url, status=e.code, ok=False, error=str(e)))
    except Exception as e:
        results.append(HealthResult(url=url, ok=False, error=str(e)))

async def check_all(urls: List[str], concurrency: int = 5) -> List[HealthResult]:
    """Check all URLs concurrently with a limit on parallel requests."""
    results = []
    limiter = trio.CapacityLimiter(concurrency)

    async def bounded_check(url):
        async with limiter:
            await check_url(url, results)

    async with trio.open_nursery() as nursery:
        for url in urls:
            nursery.start_soon(bounded_check, url)

    return sorted(results, key=lambda r: r.url)

async def main():
    urls = [
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/delay/1",
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/users/1",
    ]

    print(f"Checking {len(urls)} URLs...\n")
    with trio.fail_after(15):
        results = await check_all(urls, concurrency=3)

    print(f"{'URL':<50} {'Status':>8} {'OK':>5} {'Latency':>10}")
    print("-" * 78)
    for r in results:
        status = r.status if r.status else "ERR"
        latency = f"{r.latency_ms:.0f}ms" if r.ok else r.error[:15]
        print(f"{r.url:<50} {str(status):>8} {'Yes' if r.ok else 'No':>5} {latency:>10}")

trio.run(main)

Output:

Checking 5 URLs...

URL                                                Status    OK    Latency
------------------------------------------------------------------------------
https://httpbin.org/delay/1                           200   Yes      1043ms
https://httpbin.org/status/200                        200   Yes        89ms
https://httpbin.org/status/404                        404    No  HTTP Error
https://jsonplaceholder.typicode.com/posts/1          200   Yes       134ms
https://jsonplaceholder.typicode.com/users/1          200   Yes       128ms

This checker runs all URL checks concurrently, limited to 3 at a time by trio.CapacityLimiter. The entire batch fails with TooSlowError if it takes more than 15 seconds. The trio.to_thread.run_sync() call offloads the blocking HTTP call to a thread without blocking the trio event loop. You could extend this to send Slack alerts, write results to a database, or retry failed URLs with backoff.

Frequently Asked Questions

Should I use trio or asyncio?

For new projects where you want the cleanest possible async code and do not need compatibility with existing asyncio libraries, trio is excellent. For projects that use FastAPI, aiohttp, or other asyncio-based frameworks, stick with asyncio — trio is not compatible with the asyncio event loop. The anyio library provides an abstraction that works on both trio and asyncio backends if you need portability.

How do I make HTTP requests in trio?

trio does not include an HTTP client. Use httpx with the trio backend: install httpx[trio] and use httpx.AsyncClient() inside your trio program. For simple cases, trio.to_thread.run_sync() offloads any blocking HTTP call to a thread without blocking the event loop, as shown in the real-life example above.

How do I return values from nursery tasks?

trio tasks cannot directly return values to the nursery (unlike asyncio’s gather() which collects return values). The idiomatic approach is to pass a shared list or use a memory channel. Tasks append results to a shared list (as in the URL checker example), and the caller reads from the list after the nursery exits. Alternatively, use a send channel inside tasks and a receive loop outside the nursery.

What does “cancel-safe” mean and why does it matter?

A function is cancel-safe if it behaves correctly even when cancelled mid-execution. trio can cancel any awaitable at any await point. If your code holds a lock, writes to a file, or modifies shared state across multiple awaits, cancellation mid-way can leave things in an inconsistent state. trio’s built-in primitives (channels, locks, events) are cancel-safe by design. When writing your own code, avoid long operations across multiple awaits without proper cleanup using try/finally or code>shield().

Can I use trio with regular threading?

Yles. trio.to_thread.run_sync(func) runs a blocking function in a thread pool without blocking the event loop. trio.from_thread.run_sync(async_func) calls async trio functions from a thread. These bridge the sync/async boundary cleanly and are the recommended way to use blocking libraries (like database drivers or legacy HTTP clients) inside trio programs.

Conclusion

The trio library brings structured concurrency to Python async programming. You learned how nurseries guarantee task lifetime and error propagation, how cancellation scopes handle timeouts cleanly, how memory channels enable safe producer-consumer patterns, and how CapacityLimiter controls concurrency. The URL health checker showed all these concepts working together in a realistic scenario.

The structured concurrency model takes some getting used to, but the payoff is async code that behaves predictably even in error and cancellation scenarios. The next step is to convert one small asyncio program to trio and observe how the error handling and task lifetime guarantees change your debugging experience. The trio documentation is exceptionally detailed and includes explanations of the design decisions behind each API choice.