Intermediate
Writing asynchronous code in Python has always been powerful but challenging. The traditional asyncio.create_task() approach leaves you vulnerable to silent failures — a task can crash without your knowledge, or worse, you might forget to await all your spawned tasks. Enter asyncio.TaskGroup, introduced in Python 3.11, which brings structured concurrency patterns to the standard library and makes parallel task management reliable and clean.
If you’ve struggled with managing multiple async tasks, coordinating their completion, or handling errors when things go wrong, TaskGroup is the solution you’ve been waiting for. Instead of manually tracking tasks and writing error-handling boilerplate, TaskGroup handles all of that automatically through a simple context manager interface.
In this tutorial, you’ll learn how TaskGroup simplifies concurrent programming, how to handle errors gracefully, manage nested task groups, and apply these patterns to real-world scenarios. Whether you’re building web scrapers, API clients, or distributed systems, TaskGroup will become an essential tool in your async toolkit.
Quick Example
Before diving deep, here’s a taste of what TaskGroup looks like in action:
# filename: quick_taskgroup_example.py
import asyncio
async def fetch_data(url, delay):
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("api1.com", 1))
task2 = tg.create_task(fetch_data("api2.com", 2))
task3 = tg.create_task(fetch_data("api3.com", 1.5))
print(f"Result 1: {task1.result()}")
print(f"Result 2: {task2.result()}")
print(f"Result 3: {task3.result()}")
asyncio.run(main())
Output:
Result 1: Data from api1.com
Result 2: Data from api2.com
Result 3: Data from api3.com
Three tasks run in parallel, and after the async with block exits, all results are guaranteed to be ready. No fire-and-forget bugs. No manual cancellation. Just clean, structured concurrency.
What Is asyncio.TaskGroup and Why Use It?
asyncio.TaskGroup is a context manager that enforces structured concurrency — a programming pattern where the lifetime of child tasks is bound to their parent scope. When the TaskGroup context exits, all child tasks are guaranteed to be either completed or cancelled, and any exceptions from those tasks are collected and re-raised as an ExceptionGroup.
This is fundamentally different from the older asyncio.create_task() pattern, where tasks exist independently and require manual tracking. Let’s compare:
| Feature | asyncio.create_task() | asyncio.TaskGroup |
|---|---|---|
| Task lifetime tracking | Manual (you must track and await each task) | Automatic (bound to context manager scope) |
| Error handling | Individual task.result() calls can fail silently | All exceptions collected in ExceptionGroup |
| Cancellation on error | Must implement manually | Automatic — remaining tasks cancelled on first failure |
| Fire-and-forget bugs | Common — tasks can be forgotten | Prevented — all tasks must be awaited |
| Syntax clarity | Verbose — multiple await statements | Clean — single context block |
| Python version | 3.7+ | 3.11+ |
Creating and Running Task Groups
The most basic pattern for using TaskGroup is simple: create a context using async with asyncio.TaskGroup() and spawn tasks using the create_task() method. The context manager automatically waits for all spawned tasks to complete before exiting.
# filename: basic_taskgroup_patterns.py
import asyncio
async def task_one():
await asyncio.sleep(1)
return "Task 1 done"
async def task_two():
await asyncio.sleep(0.5)
return "Task 2 done"
async def task_three():
await asyncio.sleep(1.5)
return "Task 3 done"
async def main():
print("Starting tasks...")
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task_one())
t2 = tg.create_task(task_two())
t3 = tg.create_task(task_three())
# All tasks have completed here
print(f"Results: {t1.result()}, {t2.result()}, {t3.result()}")
print("All tasks completed")
asyncio.run(main())
Output:
Starting tasks...
Results: Task 1 done, Task 2 done, Task 3 done
All tasks completed
Key observations: when the async with block exits, TaskGroup waits for all pending tasks. You can access task.result() after the block because completion is guaranteed. The total elapsed time is approximately 1.5 seconds (the longest task), not 3 seconds (sum), demonstrating true parallelism.
Spawning Tasks with TaskGroup.create_task()
The create_task() method on TaskGroup returns a standard asyncio.Task object, just like asyncio.create_task(). The difference is that the task is automatically tracked and must be completed before the context exits.
# filename: taskgroup_spawning_demo.py
import asyncio
from datetime import datetime
async def work(task_id, duration):
start = datetime.now()
await asyncio.sleep(duration)
elapsed = (datetime.now() - start).total_seconds()
return f"Task {task_id} slept for {elapsed:.1f}s"
async def main():
async with asyncio.TaskGroup() as tg:
tasks = []
for i in range(5):
task = tg.create_task(work(i, 0.5 + i * 0.1))
tasks.append(task)
for task in tasks:
print(task.result())
asyncio.run(main())
Output:
Task 0 slept for 0.5s
Task 1 slept for 0.6s
Task 2 slept for 0.7s
Task 3 slept for 0.8s
Task 4 slept for 0.9s
This loop creates five tasks concurrently. All tasks run in parallel, and the context manager ensures all are complete before proceeding.
Error Handling with ExceptionGroup
When a task within a TaskGroup raises an exception, TaskGroup doesn’t immediately propagate it. Instead, it cancels all remaining tasks and collects all exceptions into an ExceptionGroup. This gives you a chance to handle multiple failures at once.
# filename: taskgroup_exception_handling.py
import asyncio
async def reliable_task():
await asyncio.sleep(0.5)
return "Success"
async def failing_task():
await asyncio.sleep(0.2)
raise ValueError("Something went wrong")
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(reliable_task())
t2 = tg.create_task(failing_task())
except ExceptionGroup as eg:
print(f"Caught ExceptionGroup with {len(eg.exceptions)} exceptions")
for exc in eg.exceptions:
print(f" - {type(exc).__name__}: {exc}")
asyncio.run(main())
Output:
Caught ExceptionGroup with 1 exceptions
- ValueError: Something went wrong
When failing_task raises a ValueError, TaskGroup catches it, cancels the remaining tasks (though reliable_task had already completed), and raises an ExceptionGroup containing that ValueError.
Handling Multiple Exceptions
If multiple tasks fail, all exceptions are collected:
# filename: taskgroup_multiple_exceptions.py
import asyncio
async def failing_task(task_id, delay):
await asyncio.sleep(delay)
raise RuntimeError(f"Task {task_id} failed")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1, 0.2))
tg.create_task(failing_task(2, 0.3))
tg.create_task(failing_task(3, 0.1))
except ExceptionGroup as eg:
print(f"Caught {len(eg.exceptions)} exceptions:")
for exc in eg.exceptions:
print(f" {exc}")
asyncio.run(main())
Output:
Caught 3 exceptions:
Task 1 failed
Task 2 failed
Task 3 failed
All three failures are collected and re-raised together as a single ExceptionGroup, which you can inspect and handle holistically.
Selective Exception Handling with except*
Python 3.11 introduces the except* syntax specifically for ExceptionGroup, allowing you to handle different exception types separately:
# filename: taskgroup_except_star.py
import asyncio
async def task_raises_value_error():
await asyncio.sleep(0.2)
raise ValueError("Invalid value")
async def task_raises_type_error():
await asyncio.sleep(0.3)
raise TypeError("Wrong type")
async def task_succeeds():
await asyncio.sleep(0.1)
return "Success"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_raises_value_error())
tg.create_task(task_raises_type_error())
tg.create_task(task_succeeds())
except* ValueError as eg:
print(f"Handled ValueError: {eg}")
except* TypeError as eg:
print(f"Handled TypeError: {eg}")
asyncio.run(main())
Output:
Handled ValueError: group([ValueError('Invalid value')])
Handled TypeError: group([TypeError('Wrong type')])
The except* syntax filters and separates exceptions by type, making selective error handling clean and Pythonic.
Nested Task Groups
TaskGroup supports nesting — you can create child TaskGroups within parent TaskGroups. This enables hierarchical task organization and selective error handling at different levels.
# filename: taskgroup_nesting.py
import asyncio
async def subtask(subtask_id, delay):
await asyncio.sleep(delay)
return f"Subtask {subtask_id} done"
async def parent_work(parent_id):
async with asyncio.TaskGroup() as child_tg:
results = []
for i in range(3):
task = child_tg.create_task(subtask(f"{parent_id}.{i}", 0.3))
results.append(task)
return [t.result() for t in results]
async def main():
async with asyncio.TaskGroup() as parent_tg:
p1 = parent_tg.create_task(parent_work("Parent1"))
p2 = parent_tg.create_task(parent_work("Parent2"))
print("Parent 1 results:", p1.result())
print("Parent 2 results:", p2.result())
asyncio.run(main())
Output:
Parent 1 results: ['Subtask Parent1.0 done', 'Subtask Parent1.1 done', 'Subtask Parent1.2 done']
Parent 2 results: ['Subtask Parent2.0 done', 'Subtask Parent2.1 done', 'Subtask Parent2.2 done']
Here, two parent tasks each spawn their own child TaskGroup with three subtasks. All subtasks run in parallel, and errors can be handled at the appropriate nesting level.
Error Propagation in Nested Groups
When a child TaskGroup raises an ExceptionGroup, it propagates up to the parent:
# filename: taskgroup_nested_errors.py
import asyncio
async def failing_subtask():
await asyncio.sleep(0.1)
raise RuntimeError("Subtask failed")
async def parent_work():
try:
async with asyncio.TaskGroup() as child_tg:
tg.create_task(failing_subtask())
except ExceptionGroup as eg:
print(f"Child caught: {eg}")
raise # Re-raise to parent
async def main():
try:
async with asyncio.TaskGroup() as parent_tg:
parent_tg.create_task(parent_work())
except ExceptionGroup as eg:
print(f"Parent caught: {eg}")
asyncio.run(main())
Output:
Child caught: group([RuntimeError('Subtask failed')])
Parent caught: group([RuntimeError('Subtask failed')])
Exceptions bubble up through nested TaskGroups, allowing you to handle them at the appropriate level or let them propagate to the top.
Timeouts and Cancellation with TaskGroup
You can apply timeouts to a TaskGroup using asyncio.timeout() (Python 3.11+) or asyncio.wait_for(). If a timeout occurs, all tasks in the group are cancelled.
# filename: taskgroup_timeout.py
import asyncio
async def slow_task(task_id):
try:
await asyncio.sleep(5)
return f"Task {task_id} completed"
except asyncio.CancelledError:
print(f"Task {task_id} was cancelled")
raise
async def main():
try:
async with asyncio.timeout(2): # 2 second timeout
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_task(1))
tg.create_task(slow_task(2))
tg.create_task(slow_task(3))
except TimeoutError:
print("TaskGroup timed out!")
asyncio.run(main())
Output:
Task 1 was cancelled
Task 2 was cancelled
Task 3 was cancelled
TaskGroup timed out!
The asyncio.timeout() context manager applies a deadline to the TaskGroup. When the timeout expires, all pending tasks receive a CancelledError.
Manual Cancellation
You can also manually cancel a TaskGroup by storing a reference to it and cancelling individual tasks:
# filename: taskgroup_manual_cancel.py
import asyncio
async def monitor_and_cancel(task_group_tasks):
await asyncio.sleep(1)
print("Cancelling remaining tasks...")
for task in task_group_tasks:
if not task.done():
task.cancel()
async def long_task(task_id):
try:
await asyncio.sleep(10)
return f"Task {task_id} done"
except asyncio.CancelledError:
print(f"Task {task_id} cancelled")
raise
async def main():
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(long_task(i)) for i in range(3)]
tg.create_task(monitor_and_cancel(tasks))
except ExceptionGroup as eg:
print(f"Got {len(eg.exceptions)} exceptions")
asyncio.run(main())
Output:
Cancelling remaining tasks...
Task 0 cancelled
Task 1 cancelled
Task 2 cancelled
Got 3 exceptions
Real-Life Example: Parallel API Fetcher
Let’s build a realistic example that fetches data from multiple API endpoints in parallel and handles errors gracefully:
# filename: parallel_api_fetcher.py
import asyncio
import json
from urllib.request import Request, urlopen
from urllib.error import URLError
async def fetch_json_data(url):
"""Fetch JSON from a URL asynchronously."""
loop = asyncio.get_event_loop()
def blocking_fetch():
try:
with urlopen(url, timeout=5) as response:
return json.loads(response.read().decode())
except URLError as e:
raise RuntimeError(f"Failed to fetch {url}: {e}")
# Run blocking I/O in a thread pool
return await loop.run_in_executor(None, blocking_fetch)
async def get_user_data(user_id):
"""Fetch user data from JSONPlaceholder API."""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
data = await fetch_json_data(url)
return {"user_id": user_id, "name": data.get("name")}
async def get_post_data(post_id):
"""Fetch post data from JSONPlaceholder API."""
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
data = await fetch_json_data(url)
return {"post_id": post_id, "title": data.get("title")}
async def get_comment_data(comment_id):
"""Fetch comment data from JSONPlaceholder API."""
url = f"https://jsonplaceholder.typicode.com/comments/{comment_id}"
data = await fetch_json_data(url)
return {"comment_id": comment_id, "body": data.get("body")[:50]}
async def main():
"""Fetch various data types in parallel."""
print("Starting parallel API fetches...")
try:
async with asyncio.TaskGroup() as tg:
# Fetch users
user_tasks = [
tg.create_task(get_user_data(i))
for i in range(1, 4)
]
# Fetch posts
post_tasks = [
tg.create_task(get_post_data(i))
for i in range(1, 4)
]
# Fetch comments
comment_tasks = [
tg.create_task(get_comment_data(i))
for i in range(1, 4)
]
print("\nUsers fetched:")
for task in user_tasks:
print(f" {task.result()}")
print("\nPosts fetched:")
for task in post_tasks:
print(f" {task.result()}")
print("\nComments fetched:")
for task in comment_tasks:
print(f" {task.result()}")
except ExceptionGroup as eg:
print(f"Errors occurred during fetching:")
for exc in eg.exceptions:
print(f" {exc}")
if __name__ == "__main__":
asyncio.run(main())
Output:
Starting parallel API fetches...
Users fetched:
{'user_id': 1, 'name': 'Leanne Graham'}
{'user_id': 2, 'name': 'Ervin Howell'}
{'user_id': 3, 'name': 'Clementine Bauch'}
Posts fetched:
{'post_id': 1, 'title': 'sunt aut facere repellat provident...'}
{'post_id': 2, 'title': 'qui est esse'}
{'post_id': 3, 'title': 'ea molestias quasi exercitationem...'}
Comments fetched:
{'comment_id': 1, 'body': 'laudantium enim quasi est quidem magn'}
{'comment_id': 2, 'body': 'est nisi doloremque illum quis sequi u'}
{'comment_id': 3, 'body': 'quia et suscipit suscipit recusandae c'}
This example demonstrates several key patterns: spawning multiple categories of tasks, handling network I/O asynchronously, collecting results, and grouping error handling. All three categories of requests execute in parallel, reducing total fetch time significantly compared to sequential requests.
Frequently Asked Questions
How does TaskGroup compare to asyncio.gather()?
asyncio.gather() collects coroutines and returns their results. TaskGroup is more powerful: it enforces structured concurrency, automatically cancels remaining tasks on failure, and collects all exceptions. Use TaskGroup for better control; use gather() if you just need simple result collection.
What happens if a task raises an exception in TaskGroup?
TaskGroup immediately cancels all remaining tasks and collects all exceptions (including from the cancelled tasks’ CancelledError) into an ExceptionGroup. You can catch this group with except ExceptionGroup or use except* for selective handling.
Can I nest TaskGroups and handle exceptions at different levels?
Yes. Each TaskGroup can have its own exception handler. Exceptions from child groups propagate to parent groups, allowing hierarchical error handling. You can catch and re-raise at any level.
How do I check if a task completed successfully in a TaskGroup?
After the TaskGroup context exits, all tasks are done. Use task.result() to get the return value or task.exception() to check for exceptions. Tasks that were cancelled will raise CancelledError when you call result().
What Python versions support TaskGroup?
TaskGroup is available in Python 3.11 and later. For older versions, use asyncio.gather(), asyncio.create_task(), or third-party libraries like anyio.
How do I return and access results from TaskGroup tasks?
Store references to tasks returned by create_task(). After the TaskGroup context exits, call task.result() to get the return value. If the task raised an exception, result() re-raises it (or it’s in the ExceptionGroup).
Conclusion
asyncio.TaskGroup is a powerful addition to Python’s async toolkit, bringing structured concurrency patterns to the standard library. By enforcing that tasks complete or are cancelled when their parent scope exits, TaskGroup eliminates entire classes of bugs — forgotten tasks, orphaned coroutines, and unhandled exceptions. The automatic error collection in ExceptionGroup makes it easy to detect and respond to failures in complex concurrent systems.
Whether you’re fetching data from multiple APIs, processing files in parallel, or coordinating distributed system operations, TaskGroup provides a clean, Pythonic way to write reliable async code. Combined with error handling via except* and support for timeouts and cancellation, TaskGroup should be your default choice for managing concurrent tasks in Python 3.11+.
Start using TaskGroup in your async projects today, and you’ll quickly find it becomes as indispensable as async/await itself.
Related Articles
- OpenAI API Python Tutorial — Learn to integrate OpenAI’s API with async code for intelligent applications.
- Build a RAG System in Python with LangChain — Create retrieval-augmented generation pipelines using async patterns.
- Python Free-Threaded (No GIL) — Explore the future of Python concurrency beyond asyncio.