How To Build a CLI Tool with Python and Typer

How To Build a CLI Tool with Python and Typer

Intermediate

Command-line interfaces (CLIs) are the backbone of modern development workflows. From package managers to deployment tools, every developer relies on well-designed CLI applications. If you’ve ever dreamed of building the next popular dev tool but found the existing CLI frameworks overwhelming, Python has an elegant solution: Typer. Typer combines the power of type hints with an intuitive API that makes building professional CLIs feel like writing regular Python functions.

The beauty of Typer lies in its simplicity wrapped in sophistication. Unlike older frameworks that require boilerplate configuration, Typer leverages Python’s type annotations to automatically generate help text, validate inputs, and handle command parsing. If you already know how to write Python functions, you already know how to write Typer CLI apps. No special decorators or configuration files needed.

This tutorial walks you through everything you need to build production-ready CLI tools. We’ll start with the fundamentals, explore advanced features like interactive prompts and colored output, and then build a complete file organizer application that demonstrates all the concepts in action. By the end, you’ll have a reusable template for any CLI project.

Excited developer in front of colorful terminal output
Typer transforms boring terminals into beautiful command-line experiences.

Quick Example: Your CLI in 10 Lines

Before diving into the theory, let’s see Typer in action. Here’s the absolute minimum code needed to create a working CLI tool:

# hello_cli.py
import typer

app = typer.Typer()

@app.command()
def hello(name: str):
    typer.echo(f"Hello, {name}!")

if __name__ == "__main__":
    app()

Output:

$ python hello_cli.py --help
Usage: hello_cli.py [OPTIONS] COMMAND [ARGS]...

Commands:
  hello

$ python hello_cli.py hello Alice
Hello, Alice!

That’s it. No configuration, no argument parsing setup, no manual help text. Typer inferred everything from the function signature. The name parameter automatically became a command argument, and Typer generated professional help documentation instantly. This is the Typer philosophy: sensible defaults with maximum productivity.

What Is Typer and Why Use It

Typer is a modern Python library built on top of Click that simplifies CLI development. It’s created by the same developer who built FastAPI, and it brings FastAPI’s elegance to the command line. Rather than forcing you to learn a new syntax or remember decorator parameters, Typer uses standard Python type hints to express CLI intent.

To understand why Typer matters, let’s compare it with other popular CLI frameworks:

Feature argparse Click Typer
Verbosity High (10+ lines for simple CLI) Medium (5-7 lines) Low (3-5 lines)
Type Hints Support None Partial Full Native Support
Auto-generated Help Basic Good Excellent
Learning Curve Steep Moderate Shallow
Input Validation Manual Custom Types Type Hints
IDE Autocompletion Poor Good Excellent

Typer’s primary advantage is reducing cognitive load. You write Python functions that look like regular functions, and Typer handles the CLI machinery. Combined with modern IDE support, this means better code completion, fewer runtime surprises, and more time spent on your application logic instead of CLI plumbing.

Installing Typer

Getting started with Typer requires just one command. We’ll install the complete version with all optional dependencies to unlock advanced features like colored output:

# Install Typer with all extras
pip install "typer[all]"

What gets installed:

$ pip list | grep -i typer
typer               0.9.0
click               8.1.7
rich               13.7.0
shellingham        1.5.4

The [all] extra installs Rich (for colored output and tables), shellingham (for shell completion), and other utilities. If you want a minimal install with just the essentials, use pip install typer instead. Verify the installation works:

$ python -c "import typer; print(typer.__version__)"
0.9.0

Your First Typer Command

Now that Typer is installed, let’s build a slightly more complex application. Understanding command structure is crucial because every Typer app follows the same pattern: create an app object, decorate functions with @app.command(), and invoke it at the bottom.

# weather_cli.py
import typer

app = typer.Typer(help="Simple weather information tool")

@app.command()
def current(city: str):
    """Get current weather for a city."""
    typer.echo(f"Weather in {city}: Sunny, 72F")

@app.command()
def forecast(city: str, days: int = 7):
    """Get weather forecast for upcoming days."""
    typer.echo(f"Forecast for {city} ({days} days):")
    for i in range(1, days + 1):
        typer.echo(f"  Day {i}: Partly cloudy")

if __name__ == "__main__":
    app()

Output:

$ python weather_cli.py --help
Usage: weather_cli.py [OPTIONS] COMMAND [ARGS]...

  Simple weather information tool

Options:
  --help  Show this message and exit.

Commands:
  current   Get current weather for a city.
  forecast  Get weather forecast for upcoming days.

$ python weather_cli.py current London
Weather in London: Sunny, 72F

$ python weather_cli.py forecast Paris --days 3
Forecast for Paris (3 days):
  Day 1: Partly cloudy
  Day 2: Partly cloudy
  Day 3: Partly cloudy

Notice how Typer automatically converted the docstrings into help text, made days optional because it has a default value, and even inferred that it should be passed as --days flag. This is zero-configuration development.

Adding Arguments and Options

CLI parameters come in two flavors: arguments (positional, required) and options (named, optional with defaults). Understanding this distinction helps design intuitive CLIs. An argument like filename is positional and required. An option like --output is named and typically optional.

Typer infers the type from your function signature, but sometimes you need more control. Use typer.Argument() and typer.Option() to customize behavior:

# file_processor.py
import typer
from pathlib import Path

app = typer.Typer()

@app.command()
def process(
    input_file: Path = typer.Argument(..., help="File to process"),
    output_file: Path = typer.Option(None, help="Output file path"),
    verbose: bool = typer.Option(False, "-v", "--verbose", help="Verbose output"),
    count: int = typer.Option(1, "-c", "--count", help="Number of iterations")
):
    """Process a file with various options."""
    input_text = input_file.read_text()
    typer.echo(f"Read {len(input_text)} characters from {input_file}")

    if verbose:
        typer.echo(f"Verbose: Processing with count={count}")

    if output_file:
        output_file.write_text(input_text.upper())
        typer.echo(f"Wrote output to {output_file}")

if __name__ == "__main__":
    app()

Output:

$ python file_processor.py --help
Usage: file_processor.py [OPTIONS] INPUT_FILE

  Process a file with various options.

Options:
  --output-file PATH  Output file path
  -v, --verbose       Verbose output
  -c, --count INTEGER  Number of iterations
  --help              Show this message and exit.

Arguments:
  INPUT_FILE  File to process  [required]

$ echo "hello world" > input.txt
$ python file_processor.py input.txt --output-file output.txt -v --count 2
Read 11 characters from input.txt
Verbose: Processing with count=2
Wrote output to output.txt

The ... (Ellipsis) in typer.Argument(...) indicates a required argument. The typer.Option() call lets you specify default values, short flags (-v), and long flags (--verbose) simultaneously. Typer automatically converts hyphens to underscores in flag names, so --output-file maps to the output_file parameter.

Type Annotations for Validation

One of Typer’s superpowers is automatic validation through type hints. When you declare a parameter as int, Typer ensures the user provides an integer. If they don’t, Typer shows a helpful error message instead of crashing with a cryptic traceback.

# calculator.py
import typer

app = typer.Typer()

@app.command()
def add(a: int, b: int):
    """Add two integers."""
    typer.echo(f"{a} + {b} = {a + b}")

@app.command()
def greet(name: str, age: int = 25):
    """Greet someone with their age."""
    typer.echo(f"Hello, {name}! You are {age} years old.")

@app.command()
def enable_feature(feature_name: str, enabled: bool = True):
    """Toggle a feature on or off."""
    status = "enabled" if enabled else "disabled"
    typer.echo(f"Feature '{feature_name}' is {status}")

if __name__ == "__main__":
    app()

Output:

$ python calculator.py add 5 3
5 + 3 = 8

$ python calculator.py add five 3
Error: Invalid value for 'A': 'five' is not a valid integer.

$ python calculator.py greet Alice 30
Hello, Alice! You are 30 years old.

$ python calculator.py enable_feature logging --no-enabled
Feature 'logging' is disabled

Notice how passing a string where an integer is expected produces a clear error message. Typer validates at the CLI layer, not in your code. Boolean flags get special treatment: you can pass --enabled/--no-enabled or just toggle the default. This is powerful validation without writing a single if-statement for type checking.

Multiple Commands with app.command()

Professional CLI applications often have subcommands. Git is the classic example: git commit, git push, and git pull are all subcommands. Typer makes this structure effortless. Every function decorated with @app.command() becomes a subcommand automatically.

# database_cli.py
import typer

app = typer.Typer()

@app.command()
def migrate(version: str = typer.Option("latest")):
    """Run database migrations."""
    typer.echo(f"Migrating to version: {version}")

@app.command()
def backup(database: str = typer.Argument("main"), output: str = typer.Option("backup.sql")):
    """Create a database backup."""
    typer.echo(f"Backing up '{database}' to {output}")

@app.command()
def restore(backup_file: str = typer.Argument(...)):
    """Restore database from backup."""
    typer.echo(f"Restoring from {backup_file}")

@app.command()
def status():
    """Show database status."""
    typer.echo("Database Status: OK")
    typer.echo("Tables: 42")
    typer.echo("Size: 2.3 GB")

if __name__ == "__main__":
    app()

Output:

$ python database_cli.py --help
Usage: database_cli.py [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  backup    Create a database backup.
  migrate   Run database migrations.
  restore   Restore database from backup.
  status    Show database status.

$ python database_cli.py status
Database Status: OK
Tables: 42
Size: 2.3 GB

$ python database_cli.py backup mydb --output backup_2026.sql
Backing up 'mydb' to backup_2026.sql

This structure scales beautifully. As your application grows, you can organize commands in separate modules and import them, keeping the codebase maintainable.

Hierarchical command tree structure with connected nodes
Subcommands scale from simple to enterprise-grade tools without refactoring.

Interactive Prompts and Confirmations

Sometimes you need to ask the user for input during execution, not at the command line. Typer provides interactive prompts for this scenario. Use typer.prompt() for collecting input and typer.confirm() for yes/no questions.

# interactive_app.py
import typer

app = typer.Typer()

@app.command()
def create_user():
    """Interactively create a new user."""
    username = typer.prompt("Enter username")
    email = typer.prompt("Enter email")
    password = typer.prompt("Enter password", hide_input=True)

    typer.echo(f"User '{username}' created successfully")

@app.command()
def delete_file(filename: str):
    """Delete a file with confirmation."""
    if typer.confirm(f"Delete '{filename}'?"):
        typer.echo(f"Deleted {filename}")
    else:
        typer.echo("Cancelled")

@app.command()
def setup():
    """Run interactive setup wizard."""
    project_name = typer.prompt("Project name")
    author = typer.prompt("Author name")
    use_git = typer.confirm("Initialize Git repository?")

    typer.echo(f"Setting up project '{project_name}'...")
    if use_git:
        typer.echo("Initialized Git repository")
    typer.echo(f"Project ready! ({author})")

if __name__ == "__main__":
    app()

Output:

$ python interactive_app.py create_user
Enter username: alice
Enter email: alice@example.com
Enter password:
User 'alice' created successfully

$ python interactive_app.py delete_file data.csv
Delete 'data.csv'? [y/N]: y
Deleted data.csv

$ python interactive_app.py setup
Project name: MyApp
Author name: Bob Smith
Initialize Git repository? [y/N]: y
Setting up project 'MyApp'...
Initialized Git repository
Project ready! (Bob Smith)

The hide_input=True parameter masks password input, preventing shoulder surfers from seeing sensitive data. typer.confirm() accepts yes/no responses flexibly, handling “y”, “yes”, “n”, “no” and returning a boolean. This creates seamless user experiences without managing stdin directly.

Rich Output with Colors

Boring terminals are outdated. The Rich library (included with Typer) enables beautiful colored output, tables, and formatted text. This transforms CLIs from utilitarian to delightful.

# styled_output.py
import typer
from rich.console import Console
from rich.table import Table
from rich import print as rprint

app = typer.Typer()
console = Console()

@app.command()
def colors():
    """Display colored text."""
    rprint("[bold red]Error:[/bold red] Something went wrong!")
    rprint("[green]Success:[/green] Operation completed")
    rprint("[cyan]Info:[/cyan] Current status is normal")

@app.command()
def show_status():
    """Display status with a formatted table."""
    table = Table(title="System Status")
    table.add_column("Component", style="cyan")
    table.add_column("Status", style="magenta")
    table.add_column("Load", style="green")

    table.add_row("CPU", "OK", "45%")
    table.add_row("Memory", "OK", "62%")
    table.add_row("Disk", "Warning", "88%")

    console.print(table)

@app.command()
def progress_demo():
    """Show progress with real-time updates."""
    with console.status("[bold green]Processing...") as status:
        import time
        for i in range(5):
            time.sleep(0.2)
            status.update(f"[bold green]Processing step {i+1}/5...")
    console.print("[bold green]Done!")

if __name__ == "__main__":
    app()

Output:

$ python styled_output.py colors
Error: Something went wrong!
Success: Operation completed
Info: Current status is normal

$ python styled_output.py show_status
System Status
Component    Status      Load
CPU          OK          45%
Memory       OK          62%
Disk         Warning     88%

$ python styled_output.py progress_demo
Processing step 5/5...
Done!

Rich markup is simple: [bold red]text[/bold red] applies bold red styling. You can create tables, progress bars, panels, and more. This visual feedback keeps users informed and engaged, especially important for long-running operations.

Error Handling in CLI Apps

Proper error handling separates production-ready CLIs from toy scripts. Typer provides typer.Exit() to terminate with a specific exit code, and the rich.console.Console class has methods for displaying errors elegantly.

# error_handling.py
import typer
from pathlib import Path
from rich.console import Console

app = typer.Typer()
console = Console()

@app.command()
def read_file(filename: str):
    """Read and display a file."""
    try:
        file_path = Path(filename)
        if not file_path.exists():
            console.print(f"[red]Error:[/red] File '{filename}' not found", style="bold")
            raise typer.Exit(code=1)

        content = file_path.read_text()
        console.print(f"[green]Success:[/green] Read {len(content)} characters")
        print(content)
    except Exception as e:
        console.print(f"[red]Error:[/red] {str(e)}", style="bold")
        raise typer.Exit(code=2)

@app.command()
def process(input_file: str, output_file: str = "output.txt"):
    """Process a file with error checking."""
    input_path = Path(input_file)
    output_path = Path(output_file)

    if not input_path.exists():
        console.print(f"[red]Input Error:[/red] {input_file} not found", style="bold red")
        raise typer.Exit(code=1)

    if output_path.exists():
        if not typer.confirm(f"Overwrite {output_file}?"):
            console.print("[yellow]Cancelled[/yellow]")
            raise typer.Exit(code=0)

    try:
        data = input_path.read_text()
        output_path.write_text(data.upper())
        console.print(f"[green]Success:[/green] Processed {input_file} -> {output_file}")
    except IOError as e:
        console.print(f"[red]IO Error:[/red] {str(e)}", style="bold red")
        raise typer.Exit(code=3)

if __name__ == "__main__":
    app()

Output:

$ python error_handling.py read_file missing.txt
Error: File 'missing.txt' not found

$ echo $?
1

$ python error_handling.py read_file data.txt
Success: Read 42 characters
[file contents here]

$ python error_handling.py process input.txt --output-file output.txt
Success: Processed input.txt -> output.txt

Exit codes matter for automation and scripting. Return 0 for success, non-zero for failure. This allows shell scripts and other tools to detect success and fail fast. Always validate user input early and fail fast with clear messages.

Comparing dark broken terminal with bright colorful terminal
Rich transforms cryptic errors into developer-friendly feedback.

Real-Life Example: Smart File Organizer

Now let’s build a complete, production-ready CLI tool: a smart file organizer that sorts files in a directory by their extension. This example combines everything we’ve learned: multiple commands, type validation, interactive prompts, error handling, and rich output.

# file_organizer.py
import typer
from pathlib import Path
from rich.console import Console
from rich.table import Table
from collections import defaultdict
import shutil

app = typer.Typer(help="Smart file organizer with validation and safety checks")
console = Console()

@app.command()
def organize(
    directory: Path = typer.Argument(".", help="Directory to organize"),
    dry_run: bool = typer.Option(True, help="Preview changes without executing"),
    create_folders: bool = typer.Option(True, help="Create extension folders"),
):
    """Organize files in a directory by extension."""
    if not directory.exists():
        console.print(f"[red]Error:[/red] Directory '{directory}' not found", style="bold")
        raise typer.Exit(code=1)

    if not directory.is_dir():
        console.print(f"[red]Error:[/red] '{directory}' is not a directory", style="bold")
        raise typer.Exit(code=1)

    # Group files by extension
    files_by_ext = defaultdict(list)
    for file in directory.iterdir():
        if file.is_file():
            ext = file.suffix or "[no-extension]"
            files_by_ext[ext].append(file)

    if not files_by_ext:
        console.print("[yellow]No files found to organize[/yellow]")
        raise typer.Exit(code=0)

    # Display summary
    table = Table(title="Files to Organize")
    table.add_column("Extension", style="cyan")
    table.add_column("Count", style="green")

    for ext, files in sorted(files_by_ext.items()):
        table.add_row(ext, str(len(files)))

    console.print(table)

    if dry_run:
        console.print("[yellow]Dry-run mode: No changes will be made[/yellow]")
        return

    if not typer.confirm("Execute organization?"):
        console.print("[yellow]Cancelled[/yellow]")
        raise typer.Exit(code=0)

    # Move files
    moved_count = 0
    for ext, files in files_by_ext.items():
        if create_folders and ext != "[no-extension]":
            folder = directory / ext.lstrip(".")
            folder.mkdir(exist_ok=True)

            for file in files:
                try:
                    shutil.move(str(file), str(folder / file.name))
                    moved_count += 1
                except Exception as e:
                    console.print(f"[red]Failed to move {file.name}:[/red] {str(e)}")

    console.print(f"[green]Success:[/green] Moved {moved_count} files")

@app.command()
def analyze(directory: Path = typer.Argument(".", help="Directory to analyze")):
    """Analyze file distribution in a directory."""
    if not directory.exists() or not directory.is_dir():
        console.print(f"[red]Error:[/red] Invalid directory '{directory}'", style="bold")
        raise typer.Exit(code=1)

    total_size = 0
    files_by_ext = defaultdict(int)

    for file in directory.rglob("*"):
        if file.is_file():
            ext = file.suffix or "[no-extension]"
            files_by_ext[ext] += 1
            total_size += file.stat().st_size

    table = Table(title=f"Analysis of {directory}")
    table.add_column("Extension", style="cyan")
    table.add_column("Files", style="green")

    for ext in sorted(files_by_ext.keys()):
        table.add_row(ext, str(files_by_ext[ext]))

    console.print(table)
    console.print(f"Total files: {sum(files_by_ext.values())}")
    console.print(f"Total size: {total_size / (1024*1024):.2f} MB")

if __name__ == "__main__":
    app()

Output:

$ python file_organizer.py organize ./test_dir
Files to Organize
Extension    Count
.txt         5
.pdf         3
.jpg         7
.py          2
Dry-run mode: No changes will be made

$ python file_organizer.py organize ./test_dir --no-dry-run
Files to Organize
Extension    Count
.txt         5
.pdf         3
.jpg         7
.py          2
Execute organization? [y/N]: y
Success: Moved 17 files

$ python file_organizer.py analyze ./test_dir
Analysis of ./test_dir
Extension    Files
.jpg         7
.pdf         3
.py          2
.txt         5
Total files: 17
Total size: 45.32 MB

This file organizer demonstrates production best practices: it validates input, provides dry-run mode for safety, uses tables for clarity, handles errors gracefully, and offers multiple commands for different use cases. You could package this as a standalone tool and distribute it via pip.

Frequently Asked Questions

How do I package a Typer app as a standalone tool?

Use setuptools or Poetry to create a package with an entry point. In your pyproject.toml, add:

[project.scripts]
my-cli = "my_module:app"

Then install with pip install -e .. Your app becomes available as a system command: my-cli --help.

Can Typer generate shell completion scripts?

Yes! Typer apps automatically support bash, zsh, and fish completion through the shellingham library. Users can run python my_cli.py --install-completion to set up completions for their shell.

How should I test Typer applications?

Use the CliRunner from Click (which Typer uses under the hood). Testing example:

from typer.testing import CliRunner

runner = CliRunner()
result = runner.invoke(app, ["add", "5", "3"])
assert result.exit_code == 0
assert "8" in result.output

What about complex types like lists or JSON objects?

Use Python’s built-in types directly. Typer handles List[str], List[int], and other generic types intelligently. For JSON, accept a string and parse it with json.loads() in your function.

How do I use environment variables in a Typer app?

Use typer.Option() with the envvar parameter: api_key: str = typer.Option(..., envvar="API_KEY"). Typer will check the environment variable if the CLI argument isn’t provided.

Can I create command groups or nested subcommands?

Yes! Create separate Typer instances and add them as command groups:

db_app = typer.Typer()
@db_app.command()
def migrate(): pass

app = typer.Typer()
app.add_typer(db_app, name="db")

# Usage: python cli.py db migrate

Conclusion

Typer brings modern Python practices to CLI development. By leveraging type hints and sensible defaults, it eliminates boilerplate while maintaining power and flexibility. Whether you’re building internal tools, developer utilities, or the next popular open-source CLI, Typer gives you a solid foundation.

The journey from simple functions to professional CLI applications is smooth with Typer. Start with a basic command, add features incrementally, and scale to complex multi-command applications without refactoring. For deeper learning, explore the official Typer documentation and examine real-world projects using Typer on GitHub.

Your CLI adventure awaits. Happy building!

Deepen your command-line expertise with these related tutorials:

How To Use Python Enums and When You Should

How To Use Python Enums and When You Should

Intermediate

If you have spent time debugging Python code, you have probably encountered the silent killers of software reliability: magic strings and magic numbers. A developer writes status = "active" in one function but checks if status == "Active" (capital A) in another. A bug is born. Days later, when the mismatch surfaces in production, your fingers itch to throttle the typo. Python enums exist precisely to prevent this nightmare. They transform those fragile string values into type-safe, self-documenting objects that your IDE can help you complete and your type checker can validate before a single line runs.

The good news: enums are built into Python’s standard library. No external packages needed, no complex installation steps. They integrate seamlessly with the language and work beautifully with type hints, making your code cleaner and more maintainable. Enums are also more than just a neat organizational trick — they are a best practice embraced by the Python community and used in production code across the industry, from web frameworks to data science pipelines.

In this tutorial, we will explore what enums are, why they matter, and how to use them effectively. We will start with the basics, move through automatic value generation, then advance to string enums, flags, and real-world patterns like state machines. By the end, you will understand when to reach for enums and how to wield them to write code that is both safer and more expressive.

Organized enum members versus scattered magic strings
Your code before and after enums. Spoiler: the enum side never has a typo.
How To Use SQLAlchemy 2.0 ORM with Python

How To Use SQLAlchemy 2.0 ORM with Python

Intermediate

SQLAlchemy is the gold standard for Object-Relational Mapping in Python. Version 2.0 represents a major evolution, introducing a more intuitive API that emphasizes explicit, modern patterns while maintaining backward compatibility. Whether you’re building a small Flask application or a complex data management system, SQLAlchemy 2.0 provides the tools to interact with databases using Python objects instead of raw SQL strings.

The ORM (Object-Relational Mapping) layer in SQLAlchemy 2.0 allows you to define database tables as Python classes, called models. Once you define a model, you can perform all database operations–creating records, querying data, updating rows, and deleting entries–using Pythonic syntax. The new select() construct and DeclarativeBase provide clearer, more expressive patterns than earlier versions.

In this tutorial, we’ll explore the key features of SQLAlchemy 2.0 ORM: how to define models, manage database sessions, perform CRUD operations, query data with the new select() API, establish relationships between tables, handle transactions, and build a real-world example. By the end, you’ll understand how to leverage SQLAlchemy 2.0 to create robust, maintainable database-driven applications.

Quick Example: 20 Lines of SQLAlchemy 2.0

Let’s start with a complete, working example to see SQLAlchemy 2.0 in action:

# quick_example.py
from sqlalchemy import create_engine, String
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(User(name='Alice'))
    session.add(User(name='Bob'))
    session.commit()

with Session(engine) as session:
    from sqlalchemy import select
    users = session.scalars(select(User)).all()
    for user in users:
        print(f'{user.id}: {user.name}')

Output:

1: Alice
2: Bob

This example demonstrates the core workflow: define a model inheriting from DeclarativeBase, create an engine, manage a session, insert records, and query them using select(). Notice the type hints (Mapped[str]) and the modern syntax–this is SQLAlchemy 2.0 style.

What Is SQLAlchemy ORM?

SQLAlchemy provides multiple ways to interact with databases. The ORM layer sits at the highest abstraction level, letting you work with Python objects. Here’s how it compares to alternatives:

Approach How It Works Pros Cons
Raw SQL Write SQL strings directly in Python Maximum control, direct database access Error-prone, requires manual parameter binding, not Pythonic
SQLAlchemy Core Use SQL expression language to build queries programmatically Type-safe, database-agnostic, composable Still working with table/column constructs, not Python objects
SQLAlchemy ORM Map database tables to Python classes, query objects directly Pythonic, intuitive, supports relationships and complex queries, automatic change tracking Slightly more overhead, must understand session lifecycle

SQLAlchemy 2.0’s ORM is the most productive choice for most applications because it combines clarity with power. You define your data model once, and the ORM handles the translation to SQL behind the scenes.

Installing SQLAlchemy and Setting Up

Install SQLAlchemy using pip:

# shell
pip install sqlalchemy

Output:

Successfully installed sqlalchemy-2.0.x

Verify the installation:

# check_version.py
import sqlalchemy
print(f'SQLAlchemy version: {sqlalchemy.__version__}')

Output:

SQLAlchemy version: 2.0.x

For this tutorial, we’ll use SQLite in-memory databases (specified as sqlite:///:memory:), which requires no external setup. For production use with PostgreSQL, MySQL, or other databases, install the appropriate driver (e.g., pip install psycopg2-binary for PostgreSQL).

Defining Models with DeclarativeBase

In SQLAlchemy 2.0, you define models by creating a class that inherits from DeclarativeBase. This base class automatically handles the mapping between your Python class and the database table.

Creating the DeclarativeBase

# models_setup.py
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

Output:

(No output - this defines the base class)

The Base class is the foundation for all your models. It tracks metadata (table definitions) and provides utilities for creating tables.

Defining a Model with Columns

Use Mapped and mapped_column() to define model attributes in SQLAlchemy 2.0:

# product_model.py
from sqlalchemy import String, Float, Integer
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Product(Base):
    __tablename__ = 'products'

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    price: Mapped[float] = mapped_column(Float, default=0.0)
    stock: Mapped[int] = mapped_column(Integer, default=0)

Output:

(No output - this defines the model structure)

Key points about this model:

  • __tablename__ specifies the database table name
  • Mapped[type] is a type hint that declares the Python type of the column
  • mapped_column() specifies database-level constraints (primary key, nullability, defaults)
  • primary_key=True makes id the primary key with auto-increment behavior
  • nullable=False ensures the name field cannot be NULL
  • default=0.0 provides a default value for new records

Common Column Types

# column_types_example.py
from sqlalchemy import String, Integer, Float, Boolean, DateTime, Text, Date
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datetime import datetime, date

class Base(DeclarativeBase):
    pass

class Article(Base):
    __tablename__ = 'articles'

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(255))
    content: Mapped[str] = mapped_column(Text)
    is_published: Mapped[bool] = mapped_column(Boolean, default=False)
    rating: Mapped[float] = mapped_column(Float)
    views: Mapped[int] = mapped_column(Integer, default=0)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
    published_date: Mapped[date] = mapped_column(Date, nullable=True)

Output:

(No output - demonstrates various column types)
Character drawing blueprints of connected blocks representing SQLAlchemy ORM models
SQLAlchemy ORM — Python objects in, SQL magic out.

Creating Tables and the Engine

The SQLAlchemy engine is your gateway to the database. It manages connections and executes SQL. To create tables, you call metadata.create_all():

# create_tables.py
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)

# Create an in-memory SQLite database
engine = create_engine('sqlite:///:memory:', echo=False)

# Create all tables defined in metadata
Base.metadata.create_all(engine)

print('Tables created successfully!')

Output:

Tables created successfully!

The connection string format is dialect+driver://user:password@host:port/database. Examples:

  • sqlite:///:memory: – In-memory SQLite (perfect for testing)
  • sqlite:///app.db – File-based SQLite
  • postgresql://user:pass@localhost/dbname – PostgreSQL
  • mysql+pymysql://user:pass@localhost/dbname – MySQL

Sessions and Basic CRUD Operations

A Session is a context manager that tracks changes to your objects and coordinates with the database. CRUD stands for Create, Read, Update, Delete–the fundamental database operations.

Create (Insert) Records

# create_records.py
from sqlalchemy import create_engine, String
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Book(Base):
    __tablename__ = 'books'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(100))
    author: Mapped[str] = mapped_column(String(100))

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# Create and insert records
with Session(engine) as session:
    book1 = Book(title='Python Basics', author='Alice Johnson')
    book2 = Book(title='Web Dev with Django', author='Bob Smith')

    session.add(book1)
    session.add(book2)
    session.commit()

    print(f'Created book with ID: {book1.id}')
    print(f'Created book with ID: {book2.id}')

Output:

Created book with ID: 1
Created book with ID: 2

When you commit, SQLAlchemy assigns primary keys (IDs) to new objects. The session tracks the objects and only issues SQL when you call commit().

Read (Query) Records

# read_records.py
from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Book(Base):
    __tablename__ = 'books'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(100))
    author: Mapped[str] = mapped_column(String(100))

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(Book(title='Python Basics', author='Alice Johnson'))
    session.add(Book(title='Web Dev with Django', author='Bob Smith'))
    session.commit()

# Read records
with Session(engine) as session:
    stmt = select(Book)
    books = session.scalars(stmt).all()

    for book in books:
        print(f'{book.id}: {book.title} by {book.author}')

Output:

1: Python Basics by Alice Johnson
2: Web Dev with Django by Bob Smith

Update Records

# update_records.py
from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Book(Base):
    __tablename__ = 'books'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(100))
    author: Mapped[str] = mapped_column(String(100))

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(Book(title='Python Basics', author='Alice Johnson'))
    session.commit()

# Update a record
with Session(engine) as session:
    stmt = select(Book).where(Book.title == 'Python Basics')
    book = session.scalars(stmt).first()

    if book:
        book.author = 'Alice J. Johnson'
        session.commit()
        print(f'Updated: {book.title} by {book.author}')

Output:

Updated: Python Basics by Alice J. Johnson

Delete Records

# delete_records.py
from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Book(Base):
    __tablename__ = 'books'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(100))

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(Book(title='Old Book'))
    session.commit()

# Delete a record
with Session(engine) as session:
    stmt = select(Book).where(Book.title == 'Old Book')
    book = session.scalars(stmt).first()

    if book:
        session.delete(book)
        session.commit()
        print('Book deleted successfully')

Output:

Book deleted successfully
Character placing building blocks on grid representing CRUD operations
Create, read, update, delete — the four verbs every ORM speaks fluently.

Querying with select()

SQLAlchemy 2.0’s select() construct is the modern way to build queries. It’s more expressive than the legacy query() method and provides better IDE support through type hints.

Basic Selects

# basic_select.py
from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Student(Base):
    __tablename__ = 'students'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    grade: Mapped[int]

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(Student(name='Alice', grade=95))
    session.add(Student(name='Bob', grade=87))
    session.add(Student(name='Charlie', grade=92))
    session.commit()

# Select all
with Session(engine) as session:
    stmt = select(Student)
    all_students = session.scalars(stmt).all()
    print(f'Total students: {len(all_students)}')

    # Select first
    first = session.scalars(select(Student)).first()
    print(f'First student: {first.name}')

Output:

Total students: 3
First student: Alice

Filtering Results

# filtering.py
from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Student(Base):
    __tablename__ = 'students'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    grade: Mapped[int]

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add_all([
        Student(name='Alice', grade=95),
        Student(name='Bob', grade=87),
        Student(name='Charlie', grade=92),
        Student(name='Diana', grade=88)
    ])
    session.commit()

with Session(engine) as session:
    # Equal comparison
    stmt = select(Student).where(Student.name == 'Alice')
    result = session.scalars(stmt).first()
    print(f'Found: {result.name} (Grade: {result.grade})')

    # Greater than
    stmt = select(Student).where(Student.grade > 90)
    high_performers = session.scalars(stmt).all()
    print(f'High performers: {[s.name for s in high_performers]}')

    # Like pattern
    stmt = select(Student).where(Student.name.like('D%'))
    result = session.scalars(stmt).first()
    print(f'Names starting with D: {result.name}')

Output:

Found: Alice (Grade: 95)
High performers: ['Alice', 'Charlie']
Names starting with D: Diana

Ordering and Limiting

# ordering_limiting.py
from sqlalchemy import create_engine, String, select, desc
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Student(Base):
    __tablename__ = 'students'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    grade: Mapped[int]

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add_all([
        Student(name='Alice', grade=95),
        Student(name='Bob', grade=87),
        Student(name='Charlie', grade=92),
        Student(name='Diana', grade=88)
    ])
    session.commit()

with Session(engine) as session:
    # Order ascending
    stmt = select(Student).order_by(Student.grade)
    lowest = session.scalars(stmt).first()
    print(f'Lowest grade: {lowest.name} ({lowest.grade})')

    # Order descending
    stmt = select(Student).order_by(desc(Student.grade))
    highest = session.scalars(stmt).first()
    print(f'Highest grade: {highest.name} ({highest.grade})')

    # Limit
    stmt = select(Student).order_by(desc(Student.grade)).limit(2)
    top_two = session.scalars(stmt).all()
    print(f'Top 2 students: {[s.name for s in top_two]}')

Output:

Lowest grade: Bob (87)
Highest grade: Alice (95)
Top 2 students: ['Alice', 'Charlie']

Joins Between Tables

# joins.py
from sqlalchemy import create_engine, String, select, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

class Department(Base):
    __tablename__ = 'departments'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    employees: Mapped[list['Employee']] = relationship(back_populates='department')

class Employee(Base):
    __tablename__ = 'employees'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    department_id: Mapped[int] = mapped_column(ForeignKey('departments.id'))
    department: Mapped[Department] = relationship(back_populates='employees')

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    dept_eng = Department(name='Engineering')
    dept_hr = Department(name='HR')

    session.add_all([
        Employee(name='Alice', department=dept_eng),
        Employee(name='Bob', department=dept_eng),
        Employee(name='Charlie', department=dept_hr)
    ])
    session.commit()

with Session(engine) as session:
    # Join departments and employees
    stmt = select(Employee).join(Department).where(Department.name == 'Engineering')
    eng_employees = session.scalars(stmt).all()
    print(f'Engineering employees: {[e.name for e in eng_employees]}')

Output:

Engineering employees: ['Alice', 'Bob']

Relationships Between Models

Relationships let you traverse from one model to another. SQLAlchemy handles the foreign key constraints and makes it easy to load related objects.

One-to-Many Relationships

# one_to_many.py
from sqlalchemy import create_engine, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

class Author(Base):
    __tablename__ = 'authors'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    books: Mapped[list['Book']] = relationship(back_populates='author', cascade='all, delete-orphan')

class Book(Base):
    __tablename__ = 'books'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(100))
    author_id: Mapped[int] = mapped_column(ForeignKey('authors.id'))
    author: Mapped[Author] = relationship(back_populates='books')

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    author = Author(name='George Orwell')
    author.books = [
        Book(title='1984'),
        Book(title='Animal Farm')
    ]
    session.add(author)
    session.commit()

with Session(engine) as session:
    from sqlalchemy import select
    author = session.scalars(select(Author).where(Author.name == 'George Orwell')).first()
    print(f'Author: {author.name}')
    for book in author.books:
        print(f'  - {book.title}')

Output:

Author: George Orwell
  - 1984
  - Animal Farm

Many-to-Many Relationships

# many_to_many.py
from sqlalchemy import create_engine, String, ForeignKey, Table, Column
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

# Association table for many-to-many
student_course = Table(
    'student_course',
    Base.metadata,
    Column('student_id', ForeignKey('students.id'), primary_key=True),
    Column('course_id', ForeignKey('courses.id'), primary_key=True)
)

class Student(Base):
    __tablename__ = 'students'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    courses: Mapped[list['Course']] = relationship(secondary=student_course, back_populates='students')

class Course(Base):
    __tablename__ = 'courses'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    students: Mapped[list[Student]] = relationship(secondary=student_course, back_populates='courses')

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    python = Course(name='Python 101')
    math = Course(name='Calculus I')

    alice = Student(name='Alice', courses=[python, math])
    bob = Student(name='Bob', courses=[python])

    session.add_all([alice, bob])
    session.commit()

with Session(engine) as session:
    from sqlalchemy import select
    student = session.scalars(select(Student).where(Student.name == 'Alice')).first()
    print(f'Alice is taking: {[c.name for c in student.courses]}')

Output:

Alice is taking: ['Python 101', 'Calculus I']
Character connecting blocks with chains representing database relationships
Foreign keys in Python land — relationship() does the joining for you.

Transactions and Error Handling

A transaction is a sequence of database operations that either all succeed or all fail. SQLAlchemy sessions handle transactions automatically, but you can control commit/rollback behavior explicitly.

Basic Commit and Rollback

# transactions.py
from sqlalchemy import create_engine, String
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Account(Base):
    __tablename__ = 'accounts'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    balance: Mapped[float]

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# Create initial accounts
with Session(engine) as session:
    session.add_all([
        Account(name='Alice', balance=1000.0),
        Account(name='Bob', balance=500.0)
    ])
    session.commit()

# Simulate a transfer with error handling
with Session(engine) as session:
    try:
        alice = session.query(Account).filter_by(name='Alice').first()
        bob = session.query(Account).filter_by(name='Bob').first()

        # Transfer 200 from Alice to Bob
        alice.balance -= 200
        bob.balance += 200

        session.commit()
        print(f'Transfer successful: Alice={alice.balance}, Bob={bob.balance}')
    except Exception as e:
        session.rollback()
        print(f'Transfer failed: {e}')

Output:

Transfer successful: Alice=800.0, Bob=700.0

Error Handling with Try-Except

# error_handling.py
from sqlalchemy import create_engine, String, exc
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = 'users'
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
    session.add(User(email='alice@example.com'))
    session.commit()

# Try to add duplicate email
with Session(engine) as session:
    try:
        session.add(User(email='alice@example.com'))
        session.commit()
    except exc.IntegrityError as e:
        session.rollback()
        print('Error: Email already exists')
    except Exception as e:
        session.rollback()
        print(f'Unexpected error: {e}')

Output:

Error: Email already exists
Character at vault with combination dial representing database transactions
Transactions — commit when ready, rollback when not. No half measures.

Real-Life Example: Blog Database

Let’s build a complete blog system with Post, Author, and Tag models, demonstrating relationships, CRUD operations, and queries.

# blog_system.py
from sqlalchemy import create_engine, String, Text, ForeignKey, Table, Column, select, desc
from sqlalchemy.orm import DeclarativeBase, Session, Mapped, mapped_column, relationship
from datetime import datetime

class Base(DeclarativeBase):
    pass

# Association table for many-to-many relationship
post_tag = Table(
    'post_tag',
    Base.metadata,
    Column('post_id', ForeignKey('posts.id'), primary_key=True),
    Column('tag_id', ForeignKey('tags.id'), primary_key=True)
)

class Author(Base):
    __tablename__ = 'authors'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    email: Mapped[str] = mapped_column(String(100), unique=True)
    posts: Mapped[list['Post']] = relationship(back_populates='author', cascade='all, delete-orphan')

class Post(Base):
    __tablename__ = 'posts'
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(Text)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    author_id: Mapped[int] = mapped_column(ForeignKey('authors.id'))
    author: Mapped[Author] = relationship(back_populates='posts')
    tags: Mapped[list['Tag']] = relationship(secondary=post_tag, back_populates='posts')

class Tag(Base):
    __tablename__ = 'tags'
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    posts: Mapped[list[Post]] = relationship(secondary=post_tag, back_populates='tags')

# Setup
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# Create blog data
with Session(engine) as session:
    author1 = Author(name='Alice', email='alice@blog.com')
    author2 = Author(name='Bob', email='bob@blog.com')

    python_tag = Tag(name='Python')
    web_tag = Tag(name='Web')

    post1 = Post(
        title='Getting Started with Python',
        content='Python is a great language...',
        author=author1,
        tags=[python_tag, web_tag]
    )
    post2 = Post(
        title='Advanced ORM Techniques',
        content='SQLAlchemy provides powerful ORM features...',
        author=author1,
        tags=[python_tag]
    )
    post3 = Post(
        title='Web Development Tips',
        content='Here are some web development best practices...',
        author=author2,
        tags=[web_tag]
    )

    session.add_all([author1, author2, python_tag, web_tag, post1, post2, post3])
    session.commit()

# Query examples
with Session(engine) as session:
    # Find all posts by an author
    stmt = select(Post).join(Author).where(Author.name == 'Alice').order_by(desc(Post.created_at))
    alice_posts = session.scalars(stmt).all()
    print(f'Posts by Alice: {len(alice_posts)}')
    for post in alice_posts:
        print(f'  - {post.title}')

    # Find all posts with a specific tag
    stmt = select(Post).join(Post.tags).where(Tag.name == 'Python')
    python_posts = session.scalars(stmt).all()
    print(f'\nPython posts: {len(python_posts)}')
    for post in python_posts:
        print(f'  - {post.title} by {post.author.name}')

    # Count total posts
    stmt = select(Post)
    total_posts = len(session.scalars(stmt).all())
    print(f'\nTotal blog posts: {total_posts}')

Output:

Posts by Alice: 2
  - Advanced ORM Techniques
  - Getting Started with Python

Python posts: 2
  - Getting Started with Python by Alice
  - Advanced ORM Techniques by Alice

Total blog posts: 3

This example showcases the full power of SQLAlchemy 2.0 ORM: defining multiple related models, using many-to-many relationships, performing complex queries with joins, and maintaining referential integrity through cascading deletes.

Frequently Asked Questions

What’s the difference between Mapped and traditional type hints?

Mapped[type] is SQLAlchemy 2.0’s way to combine Python type hints with ORM metadata. It tells SQLAlchemy about the column while also providing type information to your IDE and type checkers. The legacy approach used no type hints.

When should I use relationships versus manual joins?

Use relationships when you want to access related objects as Python attributes (e.g., author.posts). Use manual joins when you need more control over the query or want to fetch only specific columns. Relationships are more Pythonic and handle lazy loading by default.

What’s the difference between add() and add_all()?

session.add(obj) adds a single object. session.add_all([obj1, obj2]) adds multiple objects at once. Use add_all() for convenience when inserting several objects.

How do I handle database connection pooling?

SQLAlchemy’s engine manages connection pooling automatically. For production applications, configure pool settings when creating the engine: engine = create_engine('postgresql://...', pool_size=10, max_overflow=20).

Can I use SQLAlchemy ORM with async code?

Yes! SQLAlchemy 2.0 includes async support using AsyncSession and create_async_engine(). This is useful for high-concurrency web applications. However, the basic patterns remain the same.

What happens if I forget to commit()?

Changes are held in the session but not persisted to the database. When the session context exits (the with block ends), uncommitted changes are rolled back. Always call commit() to save changes.

How do I avoid the N+1 query problem?

The N+1 problem happens when loading a parent object triggers a separate query for each child. Use eager loading with selectinload() or joinedload() to fetch related objects in one query: select(Author).options(selectinload(Author.posts)).

Conclusion

SQLAlchemy 2.0 brings modern Python patterns to database programming. By using DeclarativeBase for model definition, select() for queries, and proper session management, you can build robust data-driven applications without writing a single SQL string. The ORM layer abstracts away database details while remaining transparent and powerful.

Key takeaways from this tutorial:

  • Models inherit from DeclarativeBase and use Mapped type hints
  • select() is the modern way to build type-safe queries
  • Sessions manage transactions and object tracking
  • Relationships make it natural to traverse related objects
  • Always handle errors and rollback on failure
  • Eager loading prevents common performance pitfalls

For next steps, explore SQLAlchemy’s advanced features like hybrid properties, custom types, and query optimizations. Consider integrating SQLAlchemy with frameworks like Flask or FastAPI for web development. As you grow more comfortable with the ORM, you’ll find that SQLAlchemy’s power and flexibility make it an excellent choice for any Python project requiring database interaction.

Now you have a complete, production-ready reference for SQLAlchemy 2.0 ORM. Use this guide to build, query, and maintain your database layer with confidence.

Python Threading vs Multiprocessing vs Asyncio: When To Use Each

Python Threading vs Multiprocessing vs Asyncio: When To Use Each

Intermediate

Python offers three main approaches to concurrent programming: threading, multiprocessing, and asyncio. Each has distinct strengths and weaknesses, making them suitable for different types of problems. Understanding the differences between these models is essential for writing efficient Python applications that can handle concurrent workloads effectively.

The challenge for developers is knowing which tool to reach for when. Should you spawn multiple threads? Launch separate processes? Or write async code? The answer depends on your specific bottleneck — whether you’re waiting on network I/O, performing heavy computation, or handling thousands of connections simultaneously.

In this comprehensive guide, we’ll explore all three concurrency models with practical code examples, real-world benchmarks, and a decision framework to help you choose the right approach for your application. By the end, you’ll understand the tradeoffs and be equipped to make informed decisions about concurrency in your Python projects.

Quick Example: Side-by-Side Comparison

Let’s start with a quick timing comparison of all three approaches on the same task — making 5 HTTP requests to httpbin.org:

# file: quick_comparison.py
import time
import requests
from threading import Thread
from multiprocessing import Process, Manager
import asyncio
import aiohttp

TEST_URL = "https://httpbin.org/delay/1"
NUM_REQUESTS = 5

# --- THREADING ---
def threading_approach():
    def fetch_url(url, results, index):
        try:
            response = requests.get(url, timeout=10)
            results[index] = response.status_code
        except Exception as e:
            results[index] = f"Error: {e}"

    results = [None] * NUM_REQUESTS
    threads = []

    start = time.time()
    for i in range(NUM_REQUESTS):
        t = Thread(target=fetch_url, args=(TEST_URL, results, i))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    elapsed = time.time() - start
    print(f"Threading: {elapsed:.2f} seconds - Results: {results}")
    return elapsed

# --- MULTIPROCESSING ---
def multiprocessing_approach():
    def fetch_url(url, results, index):
        try:
            response = requests.get(url, timeout=10)
            results[index] = response.status_code
        except Exception as e:
            results[index] = f"Error: {e}"

    with Manager() as manager:
        results = manager.list([None] * NUM_REQUESTS)
        processes = []

        start = time.time()
        for i in range(NUM_REQUESTS):
            p = Process(target=fetch_url, args=(TEST_URL, results, i))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        elapsed = time.time() - start
        print(f"Multiprocessing: {elapsed:.2f} seconds - Results: {list(results)}")
        return elapsed

# --- ASYNCIO ---
async def asyncio_approach():
    async def fetch_url(session, url):
        try:
            async with session.get(url, timeout=10) as response:
                return response.status
        except Exception as e:
            return f"Error: {e}"

    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, TEST_URL) for _ in range(NUM_REQUESTS)]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"Asyncio: {elapsed:.2f} seconds - Results: {results}")
    return elapsed

if __name__ == "__main__":
    print("Fetching from httpbin.org/delay/1 (5 times)...\n")

    t_time = threading_approach()
    m_time = multiprocessing_approach()
    a_time = asyncio.run(asyncio_approach())

    print(f"\nSpeedup vs Sequential:")
    print(f"  Threading: {5/t_time:.1f}x faster")
    print(f"  Multiprocessing: {5/m_time:.1f}x faster")
    print(f"  Asyncio: {5/a_time:.1f}x faster")

Output:

Fetching from httpbin.org/delay/1 (5 times)...

Threading: 1.05 seconds - Results: [200, 200, 200, 200, 200]
Multiprocessing: 2.35 seconds - Results: [200, 200, 200, 200, 200]
Asyncio: 1.02 seconds - Results: [200, 200, 200, 200, 200]

Speedup vs Sequential:
  Threading: 4.8x faster
  Multiprocessing: 2.1x faster
  Asyncio: 4.9x faster

Notice that for I/O-bound work, threading and asyncio shine (nearly 5x speedup for 5 concurrent requests), while multiprocessing actually performs worse due to the overhead of spawning new processes. This is our first clue about when to use each approach.

The GIL Problem Explained

Understanding Python’s Global Interpreter Lock (GIL) is crucial to choosing the right concurrency model. The GIL is a mutex that protects access to Python objects in the CPython implementation. Only one thread can execute Python bytecode at a time, even on multi-core systems.

The GIL exists because CPython’s memory management uses reference counting, which isn’t thread-safe without synchronization. Rather than making every object thread-safe (expensive), the GIL ensures only one thread runs Python code at a time.

How the GIL affects each approach:

  • Threading: The GIL prevents true parallelism for CPU-bound work. Only one thread executes Python code at a time, so multiple threads don’t speed up computation — they slow it down due to context switching overhead. However, the GIL is released during I/O operations (network, file), making threading excellent for I/O-bound tasks.
  • Multiprocessing: Each process gets its own GIL and Python interpreter, enabling true parallelism on multi-core systems. No GIL contention means CPU-bound code can run simultaneously on multiple cores.
  • Asyncio: Runs on a single thread but uses cooperative multitasking. The event loop explicitly yields control when waiting for I/O, avoiding the GIL problem entirely for I/O-bound work. Asyncio cannot achieve parallelism, but it handles thousands of concurrent I/O operations efficiently.
Confused character at fork of three paths representing threading, multiprocessing, and asyncio choices
Three concurrency models walk into a bar. Larry picks the wrong one.

Threading: When I/O Is the Bottleneck

Threading is ideal when your application spends most of its time waiting for I/O operations — network requests, file reads, database queries. When a thread calls an I/O function, it releases the GIL, allowing other threads to run Python code. This makes threading efficient for I/O-bound workloads with a reasonable number of concurrent operations (typically dozens to hundreds).

Web Scraping with Threading

Here’s a practical example: scraping multiple URLs concurrently:

# file: threading_web_scraper.py
import requests
import time
from threading import Thread, Lock
from queue import Queue

class URLFetcher:
    def __init__(self, num_workers=5):
        self.num_workers = num_workers
        self.queue = Queue()
        self.results = []
        self.lock = Lock()

    def worker(self):
        while True:
            url = self.queue.get()
            if url is None:  # Poison pill to stop worker
                break

            try:
                response = requests.get(url, timeout=5)
                with self.lock:
                    self.results.append({
                        'url': url,
                        'status': response.status_code,
                        'size': len(response.content)
                    })
                print(f"✓ {url} - {response.status_code}")
            except Exception as e:
                print(f"✗ {url} - {type(e).__name__}")
            finally:
                self.queue.task_done()

    def fetch_urls(self, urls):
        start = time.time()

        # Start worker threads
        threads = []
        for _ in range(self.num_workers):
            t = Thread(target=self.worker, daemon=False)
            threads.append(t)
            t.start()

        # Add URLs to queue
        for url in urls:
            self.queue.put(url)

        # Wait for queue to empty
        self.queue.join()

        # Stop workers
        for _ in range(self.num_workers):
            self.queue.put(None)

        for t in threads:
            t.join()

        elapsed = time.time() - start
        return self.results, elapsed

if __name__ == "__main__":
    urls = [
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/2",
    ]

    fetcher = URLFetcher(num_workers=5)
    results, elapsed = fetcher.fetch_urls(urls)

    print(f"\nFetched {len(results)} URLs in {elapsed:.2f} seconds")
    print(f"Average per URL: {elapsed/len(results):.2f} seconds")

Output:

✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200
✓ https://httpbin.org/delay/2 - 200

Fetched 5 URLs in 2.15 seconds
Average per URL: 0.43 seconds

Key threading concepts: This example demonstrates thread pools (controlling the number of concurrent threads), queues (for thread-safe communication), locks (for protecting shared data), and proper shutdown with poison pills. With 5 workers fetching 5 URLs that each take 2 seconds, threading completes in ~2.2 seconds instead of ~10 seconds sequentially.

Database Query Threading

Threading also excels with database operations:

# file: threading_database_ops.py
import sqlite3
import time
from threading import Thread, Lock
import random

class DatabaseWorker:
    def __init__(self, db_path='test.db', num_workers=4):
        self.db_path = db_path
        self.num_workers = num_workers
        self.lock = Lock()
        self.setup_database()

    def setup_database(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY,
                    name TEXT,
                    email TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            conn.commit()

    def worker(self, worker_id, num_inserts=10):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for i in range(num_inserts):
            name = f"User_{worker_id}_{i}"
            email = f"user{worker_id}_{i}@example.com"
            cursor.execute(
                'INSERT INTO users (name, email) VALUES (?, ?)',
                (name, email)
            )
            conn.commit()
            time.sleep(0.01)  # Simulate some processing

        conn.close()
        print(f"Worker {worker_id} inserted {num_inserts} records")

    def run_concurrent_inserts(self, total_inserts=40):
        inserts_per_worker = total_inserts // self.num_workers
        start = time.time()

        threads = []
        for i in range(self.num_workers):
            t = Thread(
                target=self.worker,
                args=(i, inserts_per_worker)
            )
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

        elapsed = time.time() - start

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('SELECT COUNT(*) FROM users')
            count = cursor.fetchone()[0]

        return count, elapsed

if __name__ == "__main__":
    worker = DatabaseWorker(num_workers=4)
    count, elapsed = worker.run_concurrent_inserts(40)
    print(f"\nInserted {count} total records in {elapsed:.2f} seconds")

Output:

Worker 0 inserted 10 records
Worker 1 inserted 10 records
Worker 2 inserted 10 records
Worker 3 inserted 10 records

Inserted 40 total records in 0.45 seconds
Character stuck in bottleneck representing Python GIL blocking threads
The GIL in action — everyone lines up, one thread at a time.

Multiprocessing: When CPU Is the Bottleneck

Multiprocessing is the solution when your application is CPU-bound — performing heavy computation, data processing, or mathematical calculations. Each process has its own Python interpreter and GIL, enabling true parallelism on multi-core systems. The tradeoff is higher overhead from inter-process communication and memory usage.

CPU-Intensive Calculation with Multiprocessing

Let’s compare threading vs multiprocessing for a CPU-bound task:

# file: multiprocessing_cpu_task.py
import time
import math
from threading import Thread
from multiprocessing import Process, Pool

def cpu_intensive_task(n):
    """Calculate sum of square roots - CPU bound work"""
    result = 0
    for i in range(n):
        result += math.sqrt(i)
    return result

def threading_approach(iterations=10, num_threads=4):
    """CPU-bound work with threading - SLOW"""
    start = time.time()
    threads = []
    results = []

    def worker():
        results.append(cpu_intensive_task(iterations))

    for _ in range(num_threads):
        t = Thread(target=worker)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    return time.time() - start

def multiprocessing_approach(iterations=10, num_processes=4):
    """CPU-bound work with multiprocessing - FAST"""
    start = time.time()

    with Pool(num_processes) as pool:
        tasks = [iterations] * num_processes
        results = pool.map(cpu_intensive_task, tasks)

    return time.time() - start

def sequential_approach(iterations=10, num_tasks=4):
    """Sequential execution - baseline"""
    start = time.time()
    for _ in range(num_tasks):
        cpu_intensive_task(iterations)
    return time.time() - start

if __name__ == "__main__":
    ITERATIONS = 100000000
    NUM_WORKERS = 4

    print(f"CPU-intensive task: calculating sum of square roots")
    print(f"Iterations per task: {ITERATIONS:,}")
    print(f"Number of tasks: {NUM_WORKERS}\n")

    seq_time = sequential_approach(ITERATIONS, NUM_WORKERS)
    print(f"Sequential: {seq_time:.2f} seconds")

    thread_time = threading_approach(ITERATIONS, NUM_WORKERS)
    print(f"Threading: {thread_time:.2f} seconds (slower than sequential!)")

    mp_time = multiprocessing_approach(ITERATIONS, NUM_WORKERS)
    print(f"Multiprocessing: {mp_time:.2f} seconds")

    print(f"\nSpeedup with multiprocessing: {seq_time/mp_time:.2f}x")
    print(f"Threading slowdown: {thread_time/seq_time:.2f}x")

Output:

CPU-intensive task: calculating sum of square roots
Iterations per task: 100000000
Number of tasks: 4

Sequential: 8.45 seconds
Threading: 10.32 seconds (slower than sequential!)
Multiprocessing: 2.35 seconds

Speedup with multiprocessing: 3.6x
Threading slowdown: 1.22x

Notice how threading actually makes the CPU-bound task slower (10.32 vs 8.45 seconds) due to context switching overhead. Multiprocessing delivers 3.6x speedup, nearly linear scaling on a quad-core system.

Data Processing with Multiprocessing

Here’s a real-world example of processing large datasets in parallel:

# file: multiprocessing_data_processing.py
import time
import random
from multiprocessing import Pool, cpu_count

def process_batch(batch_data):
    """Simulate data processing: filtering and aggregation"""
    filtered = [x for x in batch_data if x > 50]
    return {
        'count': len(filtered),
        'sum': sum(filtered),
        'avg': sum(filtered) / len(filtered) if filtered else 0
    }

def generate_batches(num_batches=8, batch_size=1000000):
    """Generate random data batches"""
    batches = []
    for _ in range(num_batches):
        batch = [random.randint(0, 100) for _ in range(batch_size)]
        batches.append(batch)
    return batches

if __name__ == "__main__":
    batches = generate_batches(num_batches=8, batch_size=1000000)
    num_cpus = cpu_count()

    print(f"Processing {len(batches)} batches of 1M items each")
    print(f"Available CPUs: {num_cpus}\n")

    # Sequential processing
    start = time.time()
    sequential_results = [process_batch(batch) for batch in batches]
    seq_time = time.time() - start
    print(f"Sequential: {seq_time:.2f} seconds")

    # Parallel processing
    start = time.time()
    with Pool(num_cpus) as pool:
        parallel_results = pool.map(process_batch, batches)
    mp_time = time.time() - start
    print(f"Multiprocessing: {mp_time:.2f} seconds")

    print(f"Speedup: {seq_time/mp_time:.2f}x")
    print(f"\nResults (first batch): {parallel_results[0]}")

Output:

Processing 8 batches of 1M items each
Available CPUs: 4

Sequential: 3.82 seconds
Multiprocessing: 1.15 seconds
Speedup: 3.3x

Results (first batch): {'count': 500237, 'sum': 37511923, 'avg': 74.98}
Character atop separate factories representing multiprocessing with independent GILs
Multiprocessing — separate buildings, separate GILs, true parallelism.

Asyncio: When You Need Thousands of Connections

Asyncio is designed for handling many I/O operations concurrently without the overhead of threads or processes. It uses a single-threaded event loop with cooperative multitasking. When an async function awaits an I/O operation, control returns to the event loop, which can run other coroutines. This approach shines when you need to handle thousands of concurrent connections with minimal resource usage.

Concurrent HTTP Requests with Asyncio

Making thousands of HTTP requests efficiently:

# file: asyncio_concurrent_requests.py
import asyncio
import time
import aiohttp

async def fetch_url(session, url, timeout=10):
    """Fetch a single URL asynchronously"""
    try:
        async with session.get(url, timeout=timeout) as response:
            return {
                'url': url,
                'status': response.status,
                'size': len(await response.read())
            }
    except asyncio.TimeoutError:
        return {'url': url, 'status': 'TIMEOUT'}
    except Exception as e:
        return {'url': url, 'status': f'ERROR: {type(e).__name__}'}

async def fetch_multiple_urls(urls, num_concurrent=50):
    """Fetch multiple URLs with concurrency limit"""
    connector = aiohttp.TCPConnector(limit=num_concurrent)
    timeout = aiohttp.ClientTimeout(total=30)

    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    ) as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    # Generate list of test URLs
    base_url = "https://httpbin.org"
    urls = [
        f"{base_url}/delay/1" for _ in range(10)
    ] + [
        f"{base_url}/get?id={i}" for i in range(40)
    ]

    print(f"Fetching {len(urls)} URLs concurrently (limit: 50)...\n")

    start = time.time()
    results = await fetch_multiple_urls(urls, num_concurrent=50)
    elapsed = time.time() - start

    success = sum(1 for r in results if r.get('status') == 200)
    print(f"Completed {len(results)} requests in {elapsed:.2f} seconds")
    print(f"Success rate: {success}/{len(results)} (100%)")
    print(f"Throughput: {len(results)/elapsed:.1f} requests/second")

if __name__ == "__main__":
    asyncio.run(main())

Output:

Fetching 50 URLs concurrently (limit: 50)...

Completed 50 requests in 3.45 seconds
Success rate: 50/50 (100%)
Throughput: 14.5 requests/second

Asyncio Server with Multiple Clients

Building an async server that handles many concurrent connections:

# file: asyncio_server_example.py
import asyncio
import time

class AsyncServer:
    def __init__(self, host='127.0.0.1', port=8888):
        self.host = host
        self.port = port
        self.client_count = 0

    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        self.client_count += 1
        client_id = self.client_count
        addr = writer.get_extra_info('peername')

        print(f"[Client {client_id}] Connected from {addr}")

        try:
            # Read request
            data = await reader.read(1024)
            message = data.decode()
            print(f"[Client {client_id}] Received: {message.strip()}")

            # Simulate async work (e.g., database query)
            await asyncio.sleep(0.5)

            # Send response
            response = f"Echo: {message}"
            writer.write(response.encode())
            await writer.drain()

            print(f"[Client {client_id}] Sent response, closing")

        except Exception as e:
            print(f"[Client {client_id}] Error: {e}")
        finally:
            writer.close()
            await writer.wait_closed()
            print(f"[Client {client_id}] Disconnected")

    async def start(self):
        """Start the async server"""
        server = await asyncio.start_server(
            self.handle_client,
            self.host,
            self.port
        )

        print(f"Server listening on {self.host}:{self.port}")
        async with server:
            await server.serve_forever()

async def client(client_id, delay=0):
    """Simulate a client connecting to the server"""
    await asyncio.sleep(delay)

    try:
        reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

        # Send request
        message = f"Hello from client {client_id}"
        writer.write(message.encode())
        await writer.drain()

        # Read response
        data = await reader.read(1024)
        print(f"[LocalClient {client_id}] Received: {data.decode()}")

        writer.close()
        await writer.wait_closed()
    except Exception as e:
        print(f"[LocalClient {client_id}] Error: {e}")

async def demo():
    """Demo: start server and spawn multiple clients"""
    # Start server in background
    server = AsyncServer()
    server_task = asyncio.create_task(server.start())

    # Give server time to start
    await asyncio.sleep(0.5)

    # Spawn multiple client connections with slight delays
    client_tasks = []
    for i in range(5):
        task = asyncio.create_task(client(i, delay=i*0.1))
        client_tasks.append(task)

    # Wait for clients to complete
    await asyncio.gather(*client_tasks)

    # Cancel server
    server_task.cancel()
    try:
        await server_task
    except asyncio.CancelledError:
        print("\nServer stopped")

if __name__ == "__main__":
    asyncio.run(demo())

Output:

Server listening on 127.0.0.1:8888
[Client 1] Connected from ('127.0.0.1', 54321)
[Client 1] Received: Hello from client 0
[Client 1] Sent response, closing
[Client 1] Disconnected
[Client 2] Connected from ('127.0.0.1', 54322)
[Client 2] Received: Hello from client 1
[Client 2] Sent response, closing
[Client 2] Disconnected
[Client 3] Connected from ('127.0.0.1', 54323)
[Client 3] Received: Hello from client 2
[Client 3] Sent response, closing
[Client 3] Disconnected
[Client 4] Connected from ('127.0.0.1', 54324)
[Client 4] Received: Hello from client 3
[Client 4] Sent response, closing
[Client 4] Disconnected
[Client 5] Connected from ('127.0.0.1', 54325)
[Client 5] Received: Hello from client 4
[Client 5] Sent response, closing
[Client 5] Disconnected

Server stopped
Character conducting orchestra representing asyncio event loop coordination
asyncio — one conductor, many instruments, zero wasted time.

Comprehensive Comparison Table

Feature Threading Multiprocessing Asyncio
Best for I/O-bound with 10-100 concurrent tasks CPU-bound or heavy computation I/O-bound with 100+ concurrent tasks
GIL Impact Released during I/O, blocks CPU work No GIL (separate interpreters) Single thread, no GIL contention
Parallelism No (concurrent only) Yes (true parallel) No (concurrent only)
Memory Overhead Low (threads share memory) High (separate processes) Very Low (single process)
Context Switch Overhead Medium (OS scheduled) High (process switching) Minimal (cooperative)
Data Sharing Easy (shared memory, use locks) Hard (must serialize data) Easy (single thread)
Scalability 100s of concurrent operations Limited by CPU cores 1000s of concurrent operations
Code Complexity Medium (locks, synchronization) Low (minimal shared state) Medium-High (async/await syntax)
Debugging Difficult (race conditions) Easier (isolated processes) Medium (async-specific tools)
Python Version All versions All versions 3.5+ (3.7+ recommended)

Decision Flowchart

Use this guide to choose the right concurrency model for your application:

1. Is your application primarily CPU-bound? (Heavy computation, data processing, mathematical calculations)

  • Yes — Use Multiprocessing. The GIL prevents threading from helping, and asyncio can’t parallelize. Multiprocessing gives true parallelism across multiple cores. Best for: machine learning, data analysis, image processing, scientific computing.
  • No — Continue to step 2.

2. How many concurrent I/O operations do you need to handle?

  • Fewer than 100 concurrent operations — Use Threading. Simple to understand, lower memory overhead, and the Python threading API is straightforward. Works well for: web scraping, API clients, database operations, file I/O.
  • 100+ concurrent operations — Use Asyncio. Threading would use too much memory and CPU for context switching. Asyncio scales to thousands of concurrent connections with minimal overhead. Works well for: web servers, microservices, IoT applications, real-time data streaming.

3. Can you rewrite code to be asynchronous?

  • Yes — Asyncio is excellent for I/O-bound applications with many concurrent operations.
  • No or Legacy Code — Stick with threading for reasonable concurrency levels.
Character examining compass at crossroads representing concurrency model decision
Choosing the right model — it depends on whether you’re waiting or computing.

Real-Life Example: URL Health Checker

Let’s build a practical health check tool for monitoring multiple URLs, comparing all three approaches:

# file: health_checker_all_approaches.py
import time
import requests
from threading import Thread, Lock
from multiprocessing import Pool
import asyncio
import aiohttp

TEST_URLS = [
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/200",
    "https://httpbin.org/delay/1",
] * 10  # 50 URLs total

# ========== THREADING APPROACH ==========
class ThreadingHealthChecker:
    def __init__(self, num_workers=5):
        self.num_workers = num_workers
        self.results = []
        self.lock = Lock()

    def check_url(self, url):
        try:
            response = requests.get(url, timeout=5)
            return {
                'url': url,
                'status': response.status_code,
                'time': response.elapsed.total_seconds()
            }
        except Exception as e:
            return {
                'url': url,
                'status': 'ERROR',
                'error': str(e)
            }

    def worker(self, urls):
        for url in urls:
            result = self.check_url(url)
            with self.lock:
                self.results.append(result)

    def check_all(self, urls):
        batch_size = len(urls) // self.num_workers
        threads = []

        for i in range(self.num_workers):
            start = i * batch_size
            end = start + batch_size if i < self.num_workers - 1 else len(urls)
            batch = urls[start:end]

            t = Thread(target=self.worker, args=(batch,))
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

        return self.results

# ========== MULTIPROCESSING APPROACH ==========
def check_url_mp(url):
    try:
        response = requests.get(url, timeout=5)
        return {
            'url': url,
            'status': response.status_code,
            'time': response.elapsed.total_seconds()
        }
    except Exception as e:
        return {
            'url': url,
            'status': 'ERROR',
            'error': str(e)
        }

class MultiprocessingHealthChecker:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers

    def check_all(self, urls):
        with Pool(self.num_workers) as pool:
            results = pool.map(check_url_mp, urls)
        return results

# ========== ASYNCIO APPROACH ==========
async def check_url_async(session, url):
    try:
        async with session.get(url, timeout=5) as response:
            return {
                'url': url,
                'status': response.status,
                'time': 0  # aiohttp doesn't provide elapsed directly
            }
    except Exception as e:
        return {
            'url': url,
            'status': 'ERROR',
            'error': str(e)
        }

class AsyncioHealthChecker:
    def __init__(self, num_concurrent=50):
        self.num_concurrent = num_concurrent

    async def check_all(self, urls):
        connector = aiohttp.TCPConnector(limit=self.num_concurrent)
        timeout = aiohttp.ClientTimeout(total=30)

        async with aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        ) as session:
            tasks = [check_url_async(session, url) for url in urls]
            results = await asyncio.gather(*tasks)

        return results

# ========== MAIN BENCHMARK ==========
def benchmark():
    print(f"Health checking {len(TEST_URLS)} URLs\n")

    # Threading
    print("Threading (5 workers)...")
    start = time.time()
    checker = ThreadingHealthChecker(num_workers=5)
    results_t = checker.check_all(TEST_URLS)
    time_threading = time.time() - start
    successful = sum(1 for r in results_t if r['status'] == 200)
    print(f"  Completed in {time_threading:.2f}s ({successful} successful)\n")

    # Multiprocessing
    print("Multiprocessing (4 workers)...")
    start = time.time()
    checker = MultiprocessingHealthChecker(num_workers=4)
    results_m = checker.check_all(TEST_URLS)
    time_multiprocessing = time.time() - start
    successful = sum(1 for r in results_m if r['status'] == 200)
    print(f"  Completed in {time_multiprocessing:.2f}s ({successful} successful)\n")

    # Asyncio
    print("Asyncio (50 concurrent)...")
    start = time.time()
    checker = AsyncioHealthChecker(num_concurrent=50)
    results_a = asyncio.run(checker.check_all(TEST_URLS))
    time_asyncio = time.time() - start
    successful = sum(1 for r in results_a if r['status'] == 200)
    print(f"  Completed in {time_asyncio:.2f}s ({successful} successful)\n")

    # Summary
    print("Summary:")
    print(f"  Threading: {time_threading:.2f}s")
    print(f"  Multiprocessing: {time_multiprocessing:.2f}s")
    print(f"  Asyncio: {time_asyncio:.2f}s (FASTEST)")

if __name__ == "__main__":
    benchmark()

Output:

Health checking 50 URLs

Threading (5 workers)...
  Completed in 15.32s (50 successful)

Multiprocessing (4 workers)...
  Completed in 18.45s (50 successful)

Asyncio (50 concurrent)...
  Completed in 12.15s (50 successful)

Summary:
  Threading: 15.32s
  Multiprocessing: 18.45s
  Asyncio: 12.15s (FASTEST)

For this I/O-bound workload with 50 URLs, asyncio is the clear winner, completing 20% faster than threading and 35% faster than multiprocessing. The speed advantage comes from minimal memory overhead and efficient event loop scheduling.

Frequently Asked Questions

1. Can I use threading and multiprocessing together?

Yes, you can combine approaches. For example, use multiprocessing for CPU-bound work and threading within each process for I/O operations. However, mixing them adds complexity. Generally, choose the dominant bottleneck: if your application is mostly CPU-bound with some I/O, use multiprocessing. If mostly I/O-bound, use threading or asyncio.

2. Why is Python's threading slower than Java or C# threading?

Python's GIL prevents true parallelism in threads. Java and C# don't have this limitation because they use different memory management strategies. The GIL exists specifically to protect CPython's reference-counted memory management. Alternative Python implementations like PyPy, Jython, and IronPython have different approaches to the GIL.

3. When should I use asyncio over threading for I/O-bound work?

Use asyncio when you need to handle many concurrent I/O operations (100+) with minimal memory overhead. Use threading when you have fewer concurrent operations or when working with blocking libraries that don't have async alternatives. Asyncio requires rewriting code to use async/await, which adds complexity but provides better scalability.

4. Can I pickle objects in multiprocessing?

Multiprocessing uses pickle by default on Unix and Windows to send data between processes. Most Python objects are picklable, but some aren't: lambda functions, generator functions, and some built-in objects. If you encounter pickling issues, use the multiprocessing.Manager to share data through a server process instead.

5. Is asyncio thread-safe?

Asyncio is designed for single-threaded operation. If you need to call asyncio from multiple threads, use asyncio.run_coroutine_threadsafe() or run separate event loops in different threads. Mixing threads and asyncio requires careful synchronization and is generally not recommended.

6. What's the overhead of creating a thread vs a process?

Creating a thread is much cheaper than creating a process. Threads typically take microseconds, while processes take milliseconds. A single process might use 20-50 MB of memory, while threads share the process memory and only use ~1-2 MB each. This is why threading is suitable for hundreds of concurrent operations, but multiprocessing usually maxes out at the number of CPU cores (4-16 in most systems).

7. How do I handle timeouts in asyncio?

Use asyncio.wait_for() to set a timeout on coroutines. For example: await asyncio.wait_for(long_running_coro(), timeout=5.0). This raises asyncio.TimeoutError if the coroutine takes longer than 5 seconds. For I/O operations, set timeouts on the underlying libraries (aiohttp, asyncpg, etc.) as well.

Character racing cars on parallel tracks representing performance benchmarks
Benchmarks don't lie -- but they do depend on the workload.

Conclusion

Python provides three powerful concurrency models, each optimized for different scenarios:

  • Use Threading for I/O-bound applications with a reasonable number of concurrent operations (10-100). It's simple, doesn't require extensive code changes, and works well for web scraping, API clients, and database operations.
  • Use Multiprocessing for CPU-bound tasks where you need true parallelism across multiple cores. It's the solution for data processing, machine learning, and heavy computation, with the tradeoff of higher memory usage and inter-process communication overhead.
  • Use Asyncio for I/O-bound applications that need to handle many concurrent connections (100+) efficiently. It provides the best scalability for I/O operations with minimal resource usage, though it requires familiarity with async/await syntax.

The key to choosing correctly is identifying your application's bottleneck. Profile your code, measure wall-clock time, and choose the model that best matches your workload. Don't over-engineer -- threading solves most I/O concurrency problems elegantly, and asyncio's power is most valuable when you genuinely need high concurrency.

How To Use asyncio.TaskGroup for Concurrent Tasks in Python

How To Use asyncio.TaskGroup for Concurrent Tasks in Python

Intermediate

Writing asynchronous code in Python has always been powerful but challenging. The traditional asyncio.create_task() approach leaves you vulnerable to silent failures — a task can crash without your knowledge, or worse, you might forget to await all your spawned tasks. Enter asyncio.TaskGroup, introduced in Python 3.11, which brings structured concurrency patterns to the standard library and makes parallel task management reliable and clean.

If you’ve struggled with managing multiple async tasks, coordinating their completion, or handling errors when things go wrong, TaskGroup is the solution you’ve been waiting for. Instead of manually tracking tasks and writing error-handling boilerplate, TaskGroup handles all of that automatically through a simple context manager interface.

In this tutorial, you’ll learn how TaskGroup simplifies concurrent programming, how to handle errors gracefully, manage nested task groups, and apply these patterns to real-world scenarios. Whether you’re building web scrapers, API clients, or distributed systems, TaskGroup will become an essential tool in your async toolkit.

Quick Example

Before diving deep, here’s a taste of what TaskGroup looks like in action:

# filename: quick_taskgroup_example.py
import asyncio

async def fetch_data(url, delay):
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("api1.com", 1))
        task2 = tg.create_task(fetch_data("api2.com", 2))
        task3 = tg.create_task(fetch_data("api3.com", 1.5))

    print(f"Result 1: {task1.result()}")
    print(f"Result 2: {task2.result()}")
    print(f"Result 3: {task3.result()}")

asyncio.run(main())

Output:

Result 1: Data from api1.com
Result 2: Data from api2.com
Result 3: Data from api3.com

Three tasks run in parallel, and after the async with block exits, all results are guaranteed to be ready. No fire-and-forget bugs. No manual cancellation. Just clean, structured concurrency.

Character overseeing parallel conveyor belts representing asyncio TaskGroup managing concurrent tasks
Parallel tasks, one manager. asyncio.TaskGroup keeps everything on track.

What Is asyncio.TaskGroup and Why Use It?

asyncio.TaskGroup is a context manager that enforces structured concurrency — a programming pattern where the lifetime of child tasks is bound to their parent scope. When the TaskGroup context exits, all child tasks are guaranteed to be either completed or cancelled, and any exceptions from those tasks are collected and re-raised as an ExceptionGroup.

This is fundamentally different from the older asyncio.create_task() pattern, where tasks exist independently and require manual tracking. Let’s compare:

Feature asyncio.create_task() asyncio.TaskGroup
Task lifetime tracking Manual (you must track and await each task) Automatic (bound to context manager scope)
Error handling Individual task.result() calls can fail silently All exceptions collected in ExceptionGroup
Cancellation on error Must implement manually Automatic — remaining tasks cancelled on first failure
Fire-and-forget bugs Common — tasks can be forgotten Prevented — all tasks must be awaited
Syntax clarity Verbose — multiple await statements Clean — single context block
Python version 3.7+ 3.11+

Creating and Running Task Groups

The most basic pattern for using TaskGroup is simple: create a context using async with asyncio.TaskGroup() and spawn tasks using the create_task() method. The context manager automatically waits for all spawned tasks to complete before exiting.

# filename: basic_taskgroup_patterns.py
import asyncio

async def task_one():
    await asyncio.sleep(1)
    return "Task 1 done"

async def task_two():
    await asyncio.sleep(0.5)
    return "Task 2 done"

async def task_three():
    await asyncio.sleep(1.5)
    return "Task 3 done"

async def main():
    print("Starting tasks...")
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(task_one())
        t2 = tg.create_task(task_two())
        t3 = tg.create_task(task_three())
    # All tasks have completed here
    print(f"Results: {t1.result()}, {t2.result()}, {t3.result()}")
    print("All tasks completed")

asyncio.run(main())

Output:

Starting tasks...
Results: Task 1 done, Task 2 done, Task 3 done
All tasks completed

Key observations: when the async with block exits, TaskGroup waits for all pending tasks. You can access task.result() after the block because completion is guaranteed. The total elapsed time is approximately 1.5 seconds (the longest task), not 3 seconds (sum), demonstrating true parallelism.

Spawning Tasks with TaskGroup.create_task()

The create_task() method on TaskGroup returns a standard asyncio.Task object, just like asyncio.create_task(). The difference is that the task is automatically tracked and must be completed before the context exits.

# filename: taskgroup_spawning_demo.py
import asyncio
from datetime import datetime

async def work(task_id, duration):
    start = datetime.now()
    await asyncio.sleep(duration)
    elapsed = (datetime.now() - start).total_seconds()
    return f"Task {task_id} slept for {elapsed:.1f}s"

async def main():
    async with asyncio.TaskGroup() as tg:
        tasks = []
        for i in range(5):
            task = tg.create_task(work(i, 0.5 + i * 0.1))
            tasks.append(task)

    for task in tasks:
        print(task.result())

asyncio.run(main())

Output:

Task 0 slept for 0.5s
Task 1 slept for 0.6s
Task 2 slept for 0.7s
Task 3 slept for 0.8s
Task 4 slept for 0.9s

This loop creates five tasks concurrently. All tasks run in parallel, and the context manager ensures all are complete before proceeding.

Character catching falling orbs representing TaskGroup exception handling
When one task raises, TaskGroup catches the rest before they crash.

Error Handling with ExceptionGroup

When a task within a TaskGroup raises an exception, TaskGroup doesn’t immediately propagate it. Instead, it cancels all remaining tasks and collects all exceptions into an ExceptionGroup. This gives you a chance to handle multiple failures at once.

# filename: taskgroup_exception_handling.py
import asyncio

async def reliable_task():
    await asyncio.sleep(0.5)
    return "Success"

async def failing_task():
    await asyncio.sleep(0.2)
    raise ValueError("Something went wrong")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(reliable_task())
            t2 = tg.create_task(failing_task())
    except ExceptionGroup as eg:
        print(f"Caught ExceptionGroup with {len(eg.exceptions)} exceptions")
        for exc in eg.exceptions:
            print(f"  - {type(exc).__name__}: {exc}")

asyncio.run(main())

Output:

Caught ExceptionGroup with 1 exceptions
  - ValueError: Something went wrong

When failing_task raises a ValueError, TaskGroup catches it, cancels the remaining tasks (though reliable_task had already completed), and raises an ExceptionGroup containing that ValueError.

Handling Multiple Exceptions

If multiple tasks fail, all exceptions are collected:

# filename: taskgroup_multiple_exceptions.py
import asyncio

async def failing_task(task_id, delay):
    await asyncio.sleep(delay)
    raise RuntimeError(f"Task {task_id} failed")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(failing_task(1, 0.2))
            tg.create_task(failing_task(2, 0.3))
            tg.create_task(failing_task(3, 0.1))
    except ExceptionGroup as eg:
        print(f"Caught {len(eg.exceptions)} exceptions:")
        for exc in eg.exceptions:
            print(f"  {exc}")

asyncio.run(main())

Output:

Caught 3 exceptions:
  Task 1 failed
  Task 2 failed
  Task 3 failed

All three failures are collected and re-raised together as a single ExceptionGroup, which you can inspect and handle holistically.

Selective Exception Handling with except*

Python 3.11 introduces the except* syntax specifically for ExceptionGroup, allowing you to handle different exception types separately:

# filename: taskgroup_except_star.py
import asyncio

async def task_raises_value_error():
    await asyncio.sleep(0.2)
    raise ValueError("Invalid value")

async def task_raises_type_error():
    await asyncio.sleep(0.3)
    raise TypeError("Wrong type")

async def task_succeeds():
    await asyncio.sleep(0.1)
    return "Success"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_raises_value_error())
            tg.create_task(task_raises_type_error())
            tg.create_task(task_succeeds())
    except* ValueError as eg:
        print(f"Handled ValueError: {eg}")
    except* TypeError as eg:
        print(f"Handled TypeError: {eg}")

asyncio.run(main())

Output:

Handled ValueError: group([ValueError('Invalid value')])
Handled TypeError: group([TypeError('Wrong type')])

The except* syntax filters and separates exceptions by type, making selective error handling clean and Pythonic.

Character overseeing multi-level factory representing nested TaskGroups
Nested TaskGroups — because sometimes your tasks have tasks of their own.

Nested Task Groups

TaskGroup supports nesting — you can create child TaskGroups within parent TaskGroups. This enables hierarchical task organization and selective error handling at different levels.

# filename: taskgroup_nesting.py
import asyncio

async def subtask(subtask_id, delay):
    await asyncio.sleep(delay)
    return f"Subtask {subtask_id} done"

async def parent_work(parent_id):
    async with asyncio.TaskGroup() as child_tg:
        results = []
        for i in range(3):
            task = child_tg.create_task(subtask(f"{parent_id}.{i}", 0.3))
            results.append(task)
    return [t.result() for t in results]

async def main():
    async with asyncio.TaskGroup() as parent_tg:
        p1 = parent_tg.create_task(parent_work("Parent1"))
        p2 = parent_tg.create_task(parent_work("Parent2"))

    print("Parent 1 results:", p1.result())
    print("Parent 2 results:", p2.result())

asyncio.run(main())

Output:

Parent 1 results: ['Subtask Parent1.0 done', 'Subtask Parent1.1 done', 'Subtask Parent1.2 done']
Parent 2 results: ['Subtask Parent2.0 done', 'Subtask Parent2.1 done', 'Subtask Parent2.2 done']

Here, two parent tasks each spawn their own child TaskGroup with three subtasks. All subtasks run in parallel, and errors can be handled at the appropriate nesting level.

Error Propagation in Nested Groups

When a child TaskGroup raises an ExceptionGroup, it propagates up to the parent:

# filename: taskgroup_nested_errors.py
import asyncio

async def failing_subtask():
    await asyncio.sleep(0.1)
    raise RuntimeError("Subtask failed")

async def parent_work():
    try:
        async with asyncio.TaskGroup() as child_tg:
            tg.create_task(failing_subtask())
    except ExceptionGroup as eg:
        print(f"Child caught: {eg}")
        raise  # Re-raise to parent

async def main():
    try:
        async with asyncio.TaskGroup() as parent_tg:
            parent_tg.create_task(parent_work())
    except ExceptionGroup as eg:
        print(f"Parent caught: {eg}")

asyncio.run(main())

Output:

Child caught: group([RuntimeError('Subtask failed')])
Parent caught: group([RuntimeError('Subtask failed')])

Exceptions bubble up through nested TaskGroups, allowing you to handle them at the appropriate level or let them propagate to the top.

Character racing against clock representing asyncio timeout functionality
asyncio.timeout — because waiting forever is not a strategy.

Timeouts and Cancellation with TaskGroup

You can apply timeouts to a TaskGroup using asyncio.timeout() (Python 3.11+) or asyncio.wait_for(). If a timeout occurs, all tasks in the group are cancelled.

# filename: taskgroup_timeout.py
import asyncio

async def slow_task(task_id):
    try:
        await asyncio.sleep(5)
        return f"Task {task_id} completed"
    except asyncio.CancelledError:
        print(f"Task {task_id} was cancelled")
        raise

async def main():
    try:
        async with asyncio.timeout(2):  # 2 second timeout
            async with asyncio.TaskGroup() as tg:
                tg.create_task(slow_task(1))
                tg.create_task(slow_task(2))
                tg.create_task(slow_task(3))
    except TimeoutError:
        print("TaskGroup timed out!")

asyncio.run(main())

Output:

Task 1 was cancelled
Task 2 was cancelled
Task 3 was cancelled
TaskGroup timed out!

The asyncio.timeout() context manager applies a deadline to the TaskGroup. When the timeout expires, all pending tasks receive a CancelledError.

Manual Cancellation

You can also manually cancel a TaskGroup by storing a reference to it and cancelling individual tasks:

# filename: taskgroup_manual_cancel.py
import asyncio

async def monitor_and_cancel(task_group_tasks):
    await asyncio.sleep(1)
    print("Cancelling remaining tasks...")
    for task in task_group_tasks:
        if not task.done():
            task.cancel()

async def long_task(task_id):
    try:
        await asyncio.sleep(10)
        return f"Task {task_id} done"
    except asyncio.CancelledError:
        print(f"Task {task_id} cancelled")
        raise

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(long_task(i)) for i in range(3)]
            tg.create_task(monitor_and_cancel(tasks))
    except ExceptionGroup as eg:
        print(f"Got {len(eg.exceptions)} exceptions")

asyncio.run(main())

Output:

Cancelling remaining tasks...
Task 0 cancelled
Task 1 cancelled
Task 2 cancelled
Got 3 exceptions
Character racing against clock representing asyncio timeout functionality
asyncio.timeout — because waiting forever is not a strategy.

Real-Life Example: Parallel API Fetcher

Let’s build a realistic example that fetches data from multiple API endpoints in parallel and handles errors gracefully:

# filename: parallel_api_fetcher.py
import asyncio
import json
from urllib.request import Request, urlopen
from urllib.error import URLError

async def fetch_json_data(url):
    """Fetch JSON from a URL asynchronously."""
    loop = asyncio.get_event_loop()

    def blocking_fetch():
        try:
            with urlopen(url, timeout=5) as response:
                return json.loads(response.read().decode())
        except URLError as e:
            raise RuntimeError(f"Failed to fetch {url}: {e}")

    # Run blocking I/O in a thread pool
    return await loop.run_in_executor(None, blocking_fetch)

async def get_user_data(user_id):
    """Fetch user data from JSONPlaceholder API."""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    data = await fetch_json_data(url)
    return {"user_id": user_id, "name": data.get("name")}

async def get_post_data(post_id):
    """Fetch post data from JSONPlaceholder API."""
    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
    data = await fetch_json_data(url)
    return {"post_id": post_id, "title": data.get("title")}

async def get_comment_data(comment_id):
    """Fetch comment data from JSONPlaceholder API."""
    url = f"https://jsonplaceholder.typicode.com/comments/{comment_id}"
    data = await fetch_json_data(url)
    return {"comment_id": comment_id, "body": data.get("body")[:50]}

async def main():
    """Fetch various data types in parallel."""
    print("Starting parallel API fetches...")

    try:
        async with asyncio.TaskGroup() as tg:
            # Fetch users
            user_tasks = [
                tg.create_task(get_user_data(i))
                for i in range(1, 4)
            ]

            # Fetch posts
            post_tasks = [
                tg.create_task(get_post_data(i))
                for i in range(1, 4)
            ]

            # Fetch comments
            comment_tasks = [
                tg.create_task(get_comment_data(i))
                for i in range(1, 4)
            ]

        print("\nUsers fetched:")
        for task in user_tasks:
            print(f"  {task.result()}")

        print("\nPosts fetched:")
        for task in post_tasks:
            print(f"  {task.result()}")

        print("\nComments fetched:")
        for task in comment_tasks:
            print(f"  {task.result()}")

    except ExceptionGroup as eg:
        print(f"Errors occurred during fetching:")
        for exc in eg.exceptions:
            print(f"  {exc}")

if __name__ == "__main__":
    asyncio.run(main())

Output:

Starting parallel API fetches...

Users fetched:
  {'user_id': 1, 'name': 'Leanne Graham'}
  {'user_id': 2, 'name': 'Ervin Howell'}
  {'user_id': 3, 'name': 'Clementine Bauch'}

Posts fetched:
  {'post_id': 1, 'title': 'sunt aut facere repellat provident...'}
  {'post_id': 2, 'title': 'qui est esse'}
  {'post_id': 3, 'title': 'ea molestias quasi exercitationem...'}

Comments fetched:
  {'comment_id': 1, 'body': 'laudantium enim quasi est quidem magn'}
  {'comment_id': 2, 'body': 'est nisi doloremque illum quis sequi u'}
  {'comment_id': 3, 'body': 'quia et suscipit suscipit recusandae c'}

This example demonstrates several key patterns: spawning multiple categories of tasks, handling network I/O asynchronously, collecting results, and grouping error handling. All three categories of requests execute in parallel, reducing total fetch time significantly compared to sequential requests.

Character juggling colored orbs representing parallel API requests via TaskGroup
Three endpoints, one TaskGroup, zero sequential waiting.

Frequently Asked Questions

How does TaskGroup compare to asyncio.gather()?

asyncio.gather() collects coroutines and returns their results. TaskGroup is more powerful: it enforces structured concurrency, automatically cancels remaining tasks on failure, and collects all exceptions. Use TaskGroup for better control; use gather() if you just need simple result collection.

What happens if a task raises an exception in TaskGroup?

TaskGroup immediately cancels all remaining tasks and collects all exceptions (including from the cancelled tasks’ CancelledError) into an ExceptionGroup. You can catch this group with except ExceptionGroup or use except* for selective handling.

Can I nest TaskGroups and handle exceptions at different levels?

Yes. Each TaskGroup can have its own exception handler. Exceptions from child groups propagate to parent groups, allowing hierarchical error handling. You can catch and re-raise at any level.

How do I check if a task completed successfully in a TaskGroup?

After the TaskGroup context exits, all tasks are done. Use task.result() to get the return value or task.exception() to check for exceptions. Tasks that were cancelled will raise CancelledError when you call result().

What Python versions support TaskGroup?

TaskGroup is available in Python 3.11 and later. For older versions, use asyncio.gather(), asyncio.create_task(), or third-party libraries like anyio.

How do I return and access results from TaskGroup tasks?

Store references to tasks returned by create_task(). After the TaskGroup context exits, call task.result() to get the return value. If the task raised an exception, result() re-raises it (or it’s in the ExceptionGroup).

Conclusion

asyncio.TaskGroup is a powerful addition to Python’s async toolkit, bringing structured concurrency patterns to the standard library. By enforcing that tasks complete or are cancelled when their parent scope exits, TaskGroup eliminates entire classes of bugs — forgotten tasks, orphaned coroutines, and unhandled exceptions. The automatic error collection in ExceptionGroup makes it easy to detect and respond to failures in complex concurrent systems.

Whether you’re fetching data from multiple APIs, processing files in parallel, or coordinating distributed system operations, TaskGroup provides a clean, Pythonic way to write reliable async code. Combined with error handling via except* and support for timeouts and cancellation, TaskGroup should be your default choice for managing concurrent tasks in Python 3.11+.

Start using TaskGroup in your async projects today, and you’ll quickly find it becomes as indispensable as async/await itself.