Intermediate

You have a Python script that needs to fetch data from five different APIs, and right now it calls them one after another. Each call takes about two seconds, so the whole thing crawls along for ten seconds total. The frustrating part is that those API calls are completely independent — there is no reason your program should sit idle waiting for one response before sending the next request. This is exactly the problem that Python’s asyncio module solves, and once you understand it, you will never look at I/O-bound code the same way again.

The good news is that asyncio is part of Python’s standard library, so there is nothing extra to install. It has been available since Python 3.4 and has matured significantly — Python 3.11 introduced TaskGroup for structured concurrency, and Python 3.12 refined the event loop internals for better performance. All you need is Python 3.11 or later to use every feature covered in this article, though most examples work on Python 3.7 and above.

In this article we will cover everything you need to know to write concurrent Python code with asyncio. We will start with the fundamentals of async and await, then explore coroutines and the event loop. From there we will dive into running multiple tasks concurrently with asyncio.gather(), handling errors gracefully, and using the modern TaskGroup API for structured concurrency. We will also cover asyncio.wait(), timeouts, semaphores for rate limiting, and finish with a real-life project that fetches data from multiple URLs concurrently. By the end, you will be writing async Python code with confidence.

Python Asyncio: Quick Example

Before diving deep, here is a taste of what asyncio can do. This example runs three simulated tasks concurrently instead of sequentially, cutting the total time from six seconds down to about two.

# quick_example.py
import asyncio
import time

async def fetch_data(name, delay):
    """Simulate an API call that takes 'delay' seconds."""
    print(f"Starting {name}...")
    await asyncio.sleep(delay)  # Non-blocking sleep
    print(f"Finished {name}!")
    return f"{name}: {delay}s of data"

async def main():
    start = time.perf_counter()
    # Run all three tasks concurrently
    results = await asyncio.gather(
        fetch_data("Users API", 2),
        fetch_data("Orders API", 2),
        fetch_data("Products API", 2),
    )
    elapsed = time.perf_counter() - start
    print(f"\nAll done in {elapsed:.2f} seconds")
    for r in results:
        print(f"  {r}")

asyncio.run(main())

Output:

Starting Users API...
Starting Orders API...
Starting Products API...
Finished Users API!
Finished Orders API!
Finished Products API!

All done in 2.00 seconds
  Users API: 2s of data
  Orders API: 2s of data
  Products API: 2s of data

Notice that all three tasks started immediately and finished at roughly the same time, even though each one waited two seconds. If we had run them sequentially with regular time.sleep(), the total would have been six seconds. The magic here is asyncio.gather() — it schedules all three coroutines to run on the event loop and waits until they all complete. The await asyncio.sleep() call is the key: it tells the event loop “I am done for now, go run something else while I wait.”

Want to go deeper? Below we cover how the event loop works under the hood, explore gather() in detail, learn about TaskGroup for safer error handling, and build a real concurrent URL fetcher you can use in your own projects.

What Is Asyncio and Why Use It?

At its core, asyncio is Python’s framework for writing concurrent code using a single thread. Instead of creating multiple threads or processes, asyncio uses an event loop — a central coordinator that switches between tasks whenever one of them is waiting for something (like a network response or a file read). Think of it like a chef in a kitchen who starts boiling water, then chops vegetables while waiting for the water to boil, then checks the oven — one person doing many things by never standing idle.

This approach is called cooperative multitasking because each task voluntarily gives up control when it hits an await expression. The event loop then picks up another task that is ready to run. This is fundamentally different from threads, where the operating system forcibly switches between them. The cooperative model is simpler to reason about because you know exactly where your code can be interrupted — only at await points.

The question most beginners ask is: when should you use asyncio versus threads versus multiprocessing? Here is a comparison table to help you decide.

Featureasynciothreadingmultiprocessing
Best forI/O-bound tasks (network, file, database)I/O-bound tasks with blocking librariesCPU-bound tasks (math, image processing)
Concurrency modelSingle thread, event loopMultiple threads, OS-scheduledMultiple processes, separate memory
GIL impactNot affected (single thread)Limited by GIL for CPU workNo GIL limitation
Memory overheadVery low (coroutines are lightweight)Moderate (each thread has a stack)High (each process has its own memory)
ComplexityModerate (async/await syntax)High (race conditions, locks)Moderate (serialization overhead)
ScalabilityThousands of concurrent tasks easilyHundreds of threads at mostLimited by CPU cores

The takeaway is simple: if your code spends most of its time waiting for external resources — API calls, database queries, file downloads — asyncio is usually the best choice. It can handle thousands of concurrent connections with minimal memory, which is why frameworks like FastAPI and aiohttp are built on top of it. Now let us look at the building blocks.

Understanding async and await

The two keywords that make asyncio work are async and await. When you put async before a function definition, it becomes a coroutine function. Calling it does not run the function immediately — it returns a coroutine object that needs to be scheduled on the event loop. The await keyword is how you actually run a coroutine and get its result, while also telling the event loop it can switch to other tasks.

# async_basics.py
import asyncio

async def greet(name, delay):
    """A coroutine that waits, then returns a greeting."""
    await asyncio.sleep(delay)
    return f"Hello, {name}!"

async def main():
    # Calling greet() returns a coroutine object, not the result
    coro = greet("Alice", 1)
    print(f"Type of coro: {type(coro)}")

    # To actually run it, we await it
    result = await greet("Alice", 1)
    print(result)

asyncio.run(main())

Output:

Type of coro: <class 'coroutine'>
Hello, Alice!

The important thing to understand here is the difference between calling a coroutine function and awaiting it. When we wrote coro = greet("Alice", 1), nothing happened — the function body did not execute. Only when we used await greet("Alice", 1) did the code inside actually run. This is a common source of bugs for beginners: forgetting to await a coroutine means it silently does nothing, and Python will even warn you about it.

The asyncio.run() function is the entry point that creates an event loop, runs your main() coroutine, and shuts everything down cleanly when it finishes. You should call asyncio.run() exactly once at the top level of your program — never from inside another coroutine.

Sudo Sam in front of event loop
The event loop is a single-threaded traffic controller. Block it and everything stops.

The Event Loop Explained

The event loop is the engine that drives all of asyncio. It runs in a single thread and continuously cycles through a queue of tasks, executing each one until it hits an await, then moving on to the next ready task. Understanding this cycle helps you write better async code because you know exactly when your code runs and when it yields control.

# event_loop_demo.py
import asyncio

async def task_a():
    print("Task A: step 1")
    await asyncio.sleep(0)  # Yield control to the event loop
    print("Task A: step 2")
    await asyncio.sleep(0)
    print("Task A: step 3")

async def task_b():
    print("Task B: step 1")
    await asyncio.sleep(0)
    print("Task B: step 2")
    await asyncio.sleep(0)
    print("Task B: step 3")

async def main():
    # Schedule both tasks to run concurrently
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())

Output:

Task A: step 1
Task B: step 1
Task A: step 2
Task B: step 2
Task A: step 3
Task B: step 3

See how the tasks interleave? Each await asyncio.sleep(0) is a zero-second pause that simply says “let others run.” The event loop picks up Task B after Task A yields, then switches back. This is cooperative multitasking in action — the tasks voluntarily take turns. If Task A had a long CPU-bound computation without any await, it would block the entire event loop and prevent Task B from running at all. That is why asyncio is designed for I/O-bound work, not number crunching.

Running Tasks Concurrently With asyncio.gather()

asyncio.gather() is the workhorse function for running multiple coroutines concurrently. You pass it any number of awaitables (coroutines, tasks, or futures), and it schedules them all to run on the event loop simultaneously. It returns a list of results in the same order you passed the coroutines, regardless of which one finishes first.

# gather_example.py
import asyncio
import time

async def fetch_user(user_id):
    """Simulate fetching a user from a database."""
    await asyncio.sleep(1.5)  # Simulated DB query
    return {"id": user_id, "name": f"User_{user_id}", "active": True}

async def fetch_orders(user_id):
    """Simulate fetching orders for a user."""
    await asyncio.sleep(2.0)  # Simulated API call
    return [{"order_id": 101, "amount": 29.99}, {"order_id": 102, "amount": 59.99}]

async def fetch_preferences(user_id):
    """Simulate fetching user preferences."""
    await asyncio.sleep(1.0)  # Simulated config lookup
    return {"theme": "dark", "language": "en", "notifications": True}

async def main():
    user_id = 42
    start = time.perf_counter()

    # Fetch all three pieces of data concurrently
    user, orders, prefs = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_preferences(user_id),
    )

    elapsed = time.perf_counter() - start
    print(f"Fetched everything in {elapsed:.2f} seconds\n")
    print(f"User: {user}")
    print(f"Orders: {orders}")
    print(f"Preferences: {prefs}")

asyncio.run(main())

Output:

Fetched everything in 2.00 seconds

User: {'id': 42, 'name': 'User_42', 'active': True}
Orders: [{'order_id': 101, 'amount': 29.99}, {'order_id': 102, 'amount': 59.99}]
Preferences: {'theme': 'dark', 'language': 'en', 'notifications': True}

The total time was about two seconds — the duration of the slowest task (fetch_orders) — instead of 4.5 seconds if we had called them sequentially. The results list preserves the order we passed to gather(), so we can unpack them directly into variables. This pattern is incredibly common in web applications where a single page might need data from multiple microservices.

Handling Errors in gather()

By default, if any coroutine passed to gather() raises an exception, the entire gather() call raises that exception and the other tasks may or may not have completed. You can change this behavior with the return_exceptions=True parameter, which makes gather() return exception objects in the results list instead of raising them.

# gather_errors.py
import asyncio

async def safe_task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} completed"

async def failing_task():
    await asyncio.sleep(0.5)
    raise ValueError("Something went wrong in the API!")

async def main():
    # With return_exceptions=True, errors become results
    results = await asyncio.gather(
        safe_task("Task A", 1),
        failing_task(),
        safe_task("Task C", 1.5),
        return_exceptions=True,  # Don't let one failure crash everything
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i}: FAILED - {type(result).__name__}: {result}")
        else:
            print(f"Task {i}: {result}")

asyncio.run(main())

Output:

Task 0: Task A completed
Task 1: FAILED - ValueError: Something went wrong in the API!
Task 2: Task C completed

This is a powerful pattern for building resilient applications. Instead of letting one failed API call crash your entire data-fetching pipeline, you collect all results and handle failures individually. The key line is return_exceptions=True — without it, the ValueError from the failing task would propagate up and you would lose the results from the other two tasks that completed successfully.

Debug Dee examining tangled connections
Race conditions in async code are subtle. asyncio.Lock() is not subtle. Use it.

Structured Concurrency With TaskGroup

asyncio.TaskGroup was introduced in Python 3.11 as a safer alternative to gather(). The main difference is how it handles errors: when any task in a TaskGroup fails, it automatically cancels all remaining tasks and raises an ExceptionGroup containing all the errors. This “fail fast” behavior prevents orphaned tasks from running in the background after something has already gone wrong.

# taskgroup_example.py
import asyncio

async def download_file(filename, size_mb, delay):
    """Simulate downloading a file."""
    print(f"Downloading {filename} ({size_mb}MB)...")
    await asyncio.sleep(delay)
    print(f"Finished {filename}")
    return {"file": filename, "size_mb": size_mb, "status": "complete"}

async def main():
    results = []

    async with asyncio.TaskGroup() as tg:
        # create_task schedules coroutines within the group
        task1 = tg.create_task(download_file("report.pdf", 5, 2))
        task2 = tg.create_task(download_file("data.csv", 12, 3))
        task3 = tg.create_task(download_file("image.png", 2, 1))

    # If we get here, ALL tasks succeeded
    results = [task1.result(), task2.result(), task3.result()]
    print("\nAll downloads complete:")
    for r in results:
        print(f"  {r['file']}: {r['size_mb']}MB - {r['status']}")

asyncio.run(main())

Output:

Downloading report.pdf (5MB)...
Downloading data.csv (12MB)...
Downloading image.png (2MB)...
Finished image.png
Finished report.pdf
Finished data.csv

All downloads complete:
  report.pdf: 5MB - complete
  data.csv: 12MB - complete
  image.png: 2MB - complete

The async with asyncio.TaskGroup() as tg context manager creates a scope for your concurrent tasks. You add tasks using tg.create_task(), and when the async with block exits, it waits for all tasks to complete — similar to gather(). The critical difference shows up when errors occur: TaskGroup cancels sibling tasks immediately instead of letting them run to completion with unknown state. This is what the asyncio community calls “structured concurrency” and it prevents a whole class of subtle bugs.

TaskGroup Error Handling

When a task inside a TaskGroup raises an exception, the group cancels all other running tasks and collects the exceptions into an ExceptionGroup. You catch this with the except* syntax (also new in Python 3.11), which lets you handle different exception types selectively.

# taskgroup_errors.py
import asyncio

async def reliable_task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"

async def flaky_api_call():
    await asyncio.sleep(0.5)
    raise ConnectionError("API server is down")

async def bad_data_task():
    await asyncio.sleep(0.8)
    raise ValueError("Invalid response format")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(reliable_task("Backup", 2))
            tg.create_task(flaky_api_call())
            tg.create_task(bad_data_task())
    except* ConnectionError as eg:
        for exc in eg.exceptions:
            print(f"Connection error: {exc}")
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Value error: {exc}")

    print("Program continues after handling errors")

asyncio.run(main())

Output:

Connection error: API server is down
Value error: Invalid response format
Program continues after handling errors

Notice that the “Backup” task was cancelled even though it had not failed — that is TaskGroup‘s strict policy. When the flaky_api_call raised ConnectionError, the group immediately cancelled all remaining tasks and collected the exceptions. The except* syntax handles each exception type separately, which is much cleaner than manually iterating through results looking for errors. If you need tasks to continue even when siblings fail, use gather(return_exceptions=True) instead.

gather() vs TaskGroup: When To Use Which

Now that you have seen both approaches, here is a direct comparison to help you choose the right tool for each situation.

Featureasyncio.gather()asyncio.TaskGroup
Python version3.4+3.11+
Error behavior (default)First exception propagates, others may still runAll tasks cancelled on first error
return_exceptions optionYes — collects errors as resultsNo — always cancels on error
Error handling syntaxCheck isinstance() on resultsexcept* ExceptionGroup
Task cancellationManualAutomatic on error
Best forIndependent tasks where partial results are OKRelated tasks that should all succeed or all fail

Use gather() when your tasks are independent and you want best-effort results — for example, fetching data from five APIs where getting four out of five is still useful. Use TaskGroup when your tasks are related and a partial result is meaningless — for example, a multi-step transaction where all steps must succeed. In practice, many developers use gather(return_exceptions=True) for resilient data fetching and TaskGroup for transactional workflows.

Creating and Managing Individual Tasks

Sometimes you need more control than gather() or TaskGroup provide. The asyncio.create_task() function lets you schedule a coroutine to run in the background without immediately waiting for its result. This is useful when you want to start something, do other work, and check on it later.

# create_task_demo.py
import asyncio

async def background_sync(data):
    """Simulate syncing data to a remote server."""
    print(f"Syncing {len(data)} records in background...")
    await asyncio.sleep(3)
    print("Sync complete!")
    return len(data)

async def process_request(request_id):
    """Simulate processing an incoming request."""
    await asyncio.sleep(0.5)
    return f"Request {request_id} processed"

async def main():
    # Start background sync — don't wait for it yet
    sync_task = asyncio.create_task(background_sync(["user1", "user2", "user3"]))

    # Process requests while sync runs in background
    for i in range(1, 4):
        result = await process_request(i)
        print(result)

    # Now wait for the sync to finish
    synced_count = await sync_task
    print(f"\nBackground sync finished: {synced_count} records synced")

asyncio.run(main())

Output:

Syncing 3 records in background...
Request 1 processed
Request 2 processed
Request 3 processed
Sync complete!

Background sync finished: 3 records synced

The key here is that asyncio.create_task() returns a Task object immediately without blocking. The sync coroutine starts running in the background while we process requests in the foreground. When we finally await sync_task, it either returns the result instantly (if it already finished) or waits until it completes. This pattern is perfect for fire-and-forget operations like logging, caching, or background data synchronization.

Cache Katie organizing task cards
TaskGroup: spawn ten coroutines, guarantee all ten get cleaned up. Even if one explodes.

Fine-Grained Control With asyncio.wait()

While gather() waits for all tasks to complete, asyncio.wait() gives you more flexibility. You can wait for the first task to finish, wait until any task raises an exception, or set a timeout. It returns two sets: done (completed tasks) and pending (still running tasks).

# wait_example.py
import asyncio

async def fetch_from_mirror(mirror_name, delay):
    """Simulate fetching from different mirror servers."""
    await asyncio.sleep(delay)
    return f"Data from {mirror_name}"

async def main():
    tasks = [
        asyncio.create_task(fetch_from_mirror("US-East", 3)),
        asyncio.create_task(fetch_from_mirror("EU-West", 1)),
        asyncio.create_task(fetch_from_mirror("Asia-Pacific", 2)),
    ]

    # Wait for the FIRST task to complete
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    # Use the fastest result
    for task in done:
        print(f"First result: {task.result()}")

    # Cancel the rest — we already have what we need
    print(f"Cancelling {len(pending)} remaining tasks...")
    for task in pending:
        task.cancel()

asyncio.run(main())

Output:

First result: Data from EU-West
Cancelling 2 remaining tasks...

This pattern is called “first response wins” and it is incredibly useful for redundant requests. If you have multiple mirror servers or backup APIs, you can query all of them simultaneously and use whichever responds first, then cancel the rest. The return_when parameter accepts three values: FIRST_COMPLETED (return when any task finishes), FIRST_EXCEPTION (return when any task raises), and ALL_COMPLETED (wait for everything, the default).

Setting Timeouts With asyncio.wait_for()

When calling external services, you should always set a timeout so your program does not hang indefinitely. The asyncio.wait_for() function wraps any awaitable with a timeout — if it does not complete in time, it raises asyncio.TimeoutError and cancels the task.

# timeout_example.py
import asyncio

async def slow_database_query():
    """Simulate a database query that takes too long."""
    print("Running complex query...")
    await asyncio.sleep(10)  # This takes way too long
    return "query results"

async def main():
    try:
        # Give it 3 seconds max
        result = await asyncio.wait_for(slow_database_query(), timeout=3.0)
        print(f"Got result: {result}")
    except asyncio.TimeoutError:
        print("Query timed out after 3 seconds!")
        print("Falling back to cached data...")
        result = "cached results"

    print(f"Using: {result}")

asyncio.run(main())

Output:

Running complex query...
Query timed out after 3 seconds!
Falling back to cached data...
Using: cached results

The timeout mechanism is essential for production code. Without it, a single unresponsive service can bring down your entire application. The asyncio.wait_for() function cancels the underlying coroutine when the timeout expires, so you do not end up with zombie tasks consuming resources in the background. A good practice is to combine timeouts with a fallback strategy — cached data, default values, or a retry with exponential backoff.

Rate Limiting With Semaphores

When you have hundreds of tasks to run, launching them all at once can overwhelm the target server or hit API rate limits. An asyncio.Semaphore acts as a bouncer — it limits how many coroutines can run a particular section of code at the same time. This is essential for being a good citizen when working with external APIs.

# semaphore_example.py
import asyncio
import time

async def fetch_page(session_semaphore, page_num):
    """Fetch a page, but respect the concurrency limit."""
    async with session_semaphore:
        # Only N tasks can be inside this block at once
        print(f"  Fetching page {page_num}...")
        await asyncio.sleep(1)  # Simulated HTTP request
        return f"Page {page_num} content"

async def main():
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent requests
    start = time.perf_counter()

    # Launch 9 tasks, but only 3 run at a time
    tasks = [fetch_page(semaphore, i) for i in range(1, 10)]
    results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start
    print(f"\nFetched {len(results)} pages in {elapsed:.2f} seconds")
    print(f"(3 at a time, ~1 second each = ~3 batches = ~3 seconds)")

asyncio.run(main())

Output:

  Fetching page 1...
  Fetching page 2...
  Fetching page 3...
  Fetching page 4...
  Fetching page 5...
  Fetching page 6...
  Fetching page 7...
  Fetching page 8...
  Fetching page 9...

Fetched 9 pages in 3.00 seconds
(3 at a time, ~1 second each = ~3 batches = ~3 seconds)

The async with session_semaphore context manager blocks when three tasks are already inside it, making the fourth task wait until one finishes. This creates a natural batching effect: pages 1-3 run first, then 4-6, then 7-9. Without the semaphore, all nine would fire at once, which could trigger rate limiting or connection errors. A good rule of thumb is to set the semaphore value to match the API’s rate limit — if an API allows 5 requests per second, use Semaphore(5) with a one-second delay inside the critical section.

Pyro Pete directing task queue
asyncio.gather() turns sequential API calls into concurrent ones. Same result, fraction of the time.

Real-Life Example: Concurrent URL Health Checker

Let us put everything together into a practical project. This health checker takes a list of URLs, pings them all concurrently (with a semaphore to limit concurrency), measures response times, and produces a status report. It uses aiohttp for async HTTP requests and demonstrates error handling, timeouts, and semaphores working together.

# url_health_checker.py
import asyncio
import time

# Using asyncio-compatible HTTP simulation
# In production, replace with aiohttp:
# import aiohttp

async def check_url(semaphore, url, timeout_seconds=5):
    """Check a single URL's health with concurrency limiting."""
    async with semaphore:
        start = time.perf_counter()
        try:
            # Simulate HTTP GET with varying response times
            # In production: async with aiohttp.ClientSession() as session:
            #                    async with session.get(url, timeout=...) as resp:
            simulated_delays = {
                "https://httpbin.org/get": 0.3,
                "https://jsonplaceholder.typicode.com/posts/1": 0.5,
                "https://httpbin.org/delay/2": 2.0,
                "https://httpbin.org/status/500": 0.2,
                "https://nonexistent.invalid/api": None,  # Will "fail"
            }
            delay = simulated_delays.get(url, 1.0)

            if delay is None:
                raise ConnectionError(f"Cannot resolve host")

            result = await asyncio.wait_for(
                asyncio.sleep(delay),  # Simulates network I/O
                timeout=timeout_seconds,
            )

            elapsed = time.perf_counter() - start
            status = 500 if "status/500" in url else 200
            return {
                "url": url,
                "status": status,
                "response_time": round(elapsed, 3),
                "healthy": 200 <= status < 400,
            }
        except asyncio.TimeoutError:
            elapsed = time.perf_counter() - start
            return {
                "url": url,
                "status": "TIMEOUT",
                "response_time": round(elapsed, 3),
                "healthy": False,
            }
        except Exception as e:
            elapsed = time.perf_counter() - start
            return {
                "url": url,
                "status": f"ERROR: {e}",
                "response_time": round(elapsed, 3),
                "healthy": False,
            }

def print_report(results, total_time):
    """Print a formatted health check report."""
    print("\n" + "=" * 65)
    print("  URL HEALTH CHECK REPORT")
    print("=" * 65)

    healthy = [r for r in results if r["healthy"]]
    unhealthy = [r for r in results if not r["healthy"]]

    for r in results:
        icon = "[OK]" if r["healthy"] else "[FAIL]"
        print(f"  {icon} {r['url']}")
        print(f"       Status: {r['status']}  |  Time: {r['response_time']}s")

    print("-" * 65)
    print(f"  Total: {len(results)} URLs checked in {total_time:.2f}s")
    print(f"  Healthy: {len(healthy)}  |  Unhealthy: {len(unhealthy)}")
    print("=" * 65)

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/500",
        "https://nonexistent.invalid/api",
    ]

    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent checks
    start = time.perf_counter()

    # Check all URLs concurrently with rate limiting
    tasks = [check_url(semaphore, url, timeout_seconds=5) for url in urls]
    results = await asyncio.gather(*tasks)

    total_time = time.perf_counter() - start
    print_report(results, total_time)

asyncio.run(main())

Output:

=================================================================
  URL HEALTH CHECK REPORT
=================================================================
  [OK] https://httpbin.org/get
       Status: 200  |  Time: 0.301s
  [OK] https://jsonplaceholder.typicode.com/posts/1
       Status: 200  |  Time: 0.501s
  [OK] https://httpbin.org/delay/2
       Status: 200  |  Time: 2.001s
  [FAIL] https://httpbin.org/status/500
       Status: 500  |  Time: 0.201s
  [FAIL] https://nonexistent.invalid/api
       Status: ERROR: Cannot resolve host  |  Time: 0.0s
-----------------------------------------------------------------
  Total: 5 URLs checked in 2.00s
  Healthy: 3  |  Unhealthy: 2
=================================================================

This project demonstrates the key asyncio patterns we covered: gather() runs all checks concurrently, the semaphore limits us to three concurrent requests so we do not overwhelm any server, and wait_for() ensures no single check hangs forever. The error handling inside check_url catches both timeouts and connection errors, returning a structured result either way. To use this with real HTTP requests, install aiohttp with pip install aiohttp and replace the simulated delays with actual session.get() calls — the async structure stays exactly the same.

Frequently Asked Questions

Can I call a regular (synchronous) function from an async function?

Yes, you can call synchronous functions directly from async code, but be careful. If the synchronous function is fast (like a quick calculation or string manipulation), just call it normally. If it blocks for a long time (like time.sleep() or a synchronous HTTP request), it will freeze the entire event loop. For blocking operations, use await asyncio.to_thread(blocking_function, args) to run them in a separate thread without blocking the loop. This was added in Python 3.9 and is the recommended way to bridge sync and async code.

Can I use the requests library with asyncio?

The requests library is synchronous and will block the event loop if used directly inside an async function. You have two options: use aiohttp as a drop-in async replacement (it has a similar API with session.get() and session.post()), or wrap requests calls with await asyncio.to_thread(requests.get, url) to run them in a thread pool. The aiohttp approach is more efficient because it uses the event loop natively, while the thread approach adds thread-switching overhead.

What is the difference between asyncio.run() and get_event_loop()?

The asyncio.run() function (Python 3.7+) is the modern, recommended way to start your async program. It creates a new event loop, runs your coroutine, and cleans up afterward. The older asyncio.get_event_loop() pattern requires more manual management and is deprecated for most use cases since Python 3.10. Always use asyncio.run(main()) at the top level of your program unless you are integrating with a framework like Jupyter that manages its own event loop.

Why should I not use asyncio for CPU-bound tasks?

The asyncio event loop runs in a single thread, so CPU-intensive work blocks it completely. While one coroutine is crunching numbers, no other coroutine can run — there are no await points to yield control. For CPU-bound work like image processing, scientific computation, or data transformation, use multiprocessing or concurrent.futures.ProcessPoolExecutor. You can even combine them with asyncio using loop.run_in_executor() to run CPU work in a process pool while keeping your I/O code async.

How do I debug asyncio code?

Asyncio has a built-in debug mode you can enable by setting the environment variable PYTHONASYNCIODEBUG=1 or by passing debug=True to asyncio.run(). Debug mode warns you about common mistakes like coroutines that were never awaited, callbacks that take too long, and tasks that are destroyed while still pending. You can also use the asyncio logger with logging.getLogger('asyncio').setLevel(logging.DEBUG) to see detailed event loop activity.

What are async for and async with?

These are async versions of regular for loops and context managers. async for iterates over an asynchronous iterator — useful for streaming data from a database cursor or a websocket connection. async with enters and exits an asynchronous context manager — used heavily in aiohttp for managing HTTP sessions and connections. Both allow the event loop to run other tasks between iterations or during setup and teardown, which keeps your program responsive even when working with slow data sources.

Conclusion

You now have a solid understanding of Python's asyncio module and its key tools for concurrent programming. We covered the async/await syntax for defining coroutines, the event loop that coordinates everything, asyncio.gather() for running multiple tasks concurrently, TaskGroup for structured concurrency with automatic cancellation, asyncio.wait() for fine-grained control, timeouts with wait_for(), and semaphores for rate limiting. Each of these tools solves a specific problem, and knowing when to reach for which one is what separates a beginner from an effective async programmer.

Try extending the URL health checker project we built — add real HTTP requests with aiohttp, save results to a JSON file, or schedule periodic checks with asyncio.sleep() in a loop. You could also build an async web scraper that respects rate limits, or a chat application using websockets. The patterns you learned here apply directly to all of these projects.

For the complete reference, the official Python documentation for asyncio is excellent: https://docs.python.org/3/library/asyncio.html. The aiohttp documentation is also worth reading if you plan to make real HTTP requests in your async code.