How To Use Python cattrs for Structured Data Conversion

How To Use Python cattrs for Structured Data Conversion

Intermediate

When your Python application receives JSON from an API, reads a config file, or loads records from a database, you end up with plain dictionaries. You then write code to manually extract keys, validate types, and convert values into your domain objects. This conversion layer is tedious to write, easy to get wrong, and even easier to forget to update when your data model changes.

cattrs automates this conversion. It converts plain Python structures — dictionaries, lists, and primitives — into typed attrs classes or dataclasses, and back again. It handles nested structures, optional fields, unions, and custom type coercion. When the input doesn’t match the expected type, it raises clear, structured errors that tell you exactly which field failed and why.

This article covers how to install cattrs, how to structure and unstructure objects, how to handle nested and optional types, how to add custom hooks for special conversion logic, and how to use the validation capabilities. By the end you will be able to replace fragile manual conversion code with a clean, reliable structuring layer.

Structuring Data with cattrs: Quick Example

Here is a minimal example converting a dictionary from a JSON API response into a typed Python object:

# quick_cattrs_example.py
import attrs
import cattrs

@attrs.define
class User:
    name: str
    age: int
    email: str

# Simulated API response (as a dict)
raw_data = {"name": "Alice", "age": 30, "email": "alice@example.com"}

# Structure: dict -> typed object
user = cattrs.structure(raw_data, User)
print(user)
print(type(user))

# Unstructure: typed object -> dict (for serialization)
back_to_dict = cattrs.unstructure(user)
print(back_to_dict)

Output:

User(name='Alice', age=30, email='alice@example.com')
<class '__main__.User'>
{'name': 'Alice', 'age': 30, 'email': 'alice@example.com'}

cattrs.structure(data, MyClass) converts raw data into your class. cattrs.unstructure(obj) converts it back to a plain dict. The sections below cover nested types, optional fields, custom converters, and error handling.

What Is cattrs and Why Use It?

cattrs is a library for structuring and unstructuring data. “Structuring” means converting untyped data (dicts, lists) into typed Python objects. “Unstructuring” means the reverse — serializing objects back into plain data suitable for JSON or storage. It is designed to work with attrs classes but also supports standard dataclasses.

TaskManual codecattrs
dict to classCustom __init__ parsingcattrs.structure(d, MyClass)
class to dictManual __dict__ extractioncattrs.unstructure(obj)
Nested objectsRecursive manual codeAutomatic via type hints
Type coercionManual int(), str() callsConfigurable hooks
Error messagesGeneric KeyError/TypeErrorStructured exceptions with path

Install cattrs along with attrs:

# pip install cattrs attrs

import cattrs
print(cattrs.__version__)
23.2.3
API Alice sorting plain dicts into typed containers
cattrs.structure(): because raw dicts are just data pretending to have a schema.

Handling Nested Structures

Real data is rarely flat. APIs return nested objects — a user with an address, an order with line items. cattrs handles nesting automatically by following type hints recursively.

# cattrs_nested.py
import attrs
import cattrs
from typing import List

@attrs.define
class Address:
    street: str
    city: str
    country: str

@attrs.define
class User:
    name: str
    age: int
    address: Address
    tags: List[str]

# Deeply nested raw data (e.g., from JSON API)
raw = {
    "name": "Bob",
    "age": 25,
    "address": {
        "street": "123 Main St",
        "city": "Melbourne",
        "country": "Australia"
    },
    "tags": ["admin", "verified"]
}

user = cattrs.structure(raw, User)
print(user.name)
print(user.address.city)
print(user.tags)

# Unstructure back to nested dict
print(cattrs.unstructure(user))

Output:

Bob
Melbourne
['admin', 'verified']
{'name': 'Bob', 'age': 25, 'address': {'street': '123 Main St', 'city': 'Melbourne', 'country': 'Australia'}, 'tags': ['admin', 'verified']}

The Address field is structured automatically because its type annotation is Address, which cattrs recognizes as an attrs class. The List[str] annotation tells cattrs to structure the list and coerce each element to a string. Deeply nested structures work the same way — cattrs follows the type tree recursively without any extra configuration.

Optional Fields and Union Types

Real-world data has missing fields. A user’s phone number might be null, a legacy record might omit a new field, or an API might return different types depending on context. cattrs handles Optional and Union types cleanly.

# cattrs_optional.py
import attrs
import cattrs
from typing import Optional, List, Union

@attrs.define
class Product:
    id: int
    name: str
    price: float
    description: Optional[str] = None
    tags: Optional[List[str]] = None

# Missing optional fields default to None
raw_minimal = {"id": 1, "name": "Widget", "price": 9.99}
product = cattrs.structure(raw_minimal, Product)
print(product)

# Full data with optional fields populated
raw_full = {
    "id": 2,
    "name": "Gadget",
    "price": 49.99,
    "description": "A useful gadget",
    "tags": ["electronics", "sale"]
}
product2 = cattrs.structure(raw_full, Product)
print(product2.description)
print(product2.tags)

Output:

Product(id=1, name='Widget', price=9.99, description=None, tags=None)
A useful gadget
['electronics', 'sale']

Optional[str] is equivalent to Union[str, None]. When the raw data contains None or the key is absent, cattrs uses the default value from the class definition. If you need to handle more complex unions — like a field that can be either a string or an integer depending on the context — you can register a custom hook (covered in the next section).

Debug Dee examining Optional[str] and None puzzle pieces
Optional[str] = None: the polite way to say ‘this field might ghost you’.

Custom Structuring Hooks

Sometimes the raw data does not match your type annotations exactly. A date might arrive as a string "2026-01-15" instead of a datetime object. A boolean might come as "true" or 1 instead of True. Custom hooks let you intercept the structuring process for specific types and apply your own conversion logic.

# cattrs_custom_hooks.py
import attrs
import cattrs
from datetime import datetime

@attrs.define
class Event:
    title: str
    start_date: datetime
    attendees: int

# Create a custom converter (don't modify the global default)
converter = cattrs.Converter()

# Register a hook: convert string to datetime
def structure_datetime(value, _type):
    if isinstance(value, datetime):
        return value
    if isinstance(value, str):
        return datetime.fromisoformat(value)
    raise ValueError(f"Cannot convert {value!r} to datetime")

converter.register_structure_hook(datetime, structure_datetime)

# Now structure data where date is a string
raw = {
    "title": "PyCon AU",
    "start_date": "2026-08-14T09:00:00",
    "attendees": 500
}

event = converter.structure(raw, Event)
print(event.title)
print(event.start_date)
print(type(event.start_date))

Output:

PyCon AU
2026-08-14 09:00:00
<class 'datetime.datetime'>

Notice the use of cattrs.Converter() to create an isolated converter instance rather than using the global default. This keeps your custom hooks scoped to specific parts of your application without affecting other code that uses cattrs. The hook signature is always hook(value, type)value is the raw input and type is the target Python type.

Validation and Error Handling

When structuring fails because the data does not match the expected types, cattrs raises a ClassValidationError. Unlike a bare TypeError, this exception contains structured information about exactly which fields failed and why — even for nested structures.

# cattrs_validation.py
import attrs
import cattrs

@attrs.define
class Config:
    host: str
    port: int
    debug: bool

# Bad data -- port is a string that cann't be cast to int
bad_data = {"host": "localhost", "port": "not-a-number", "debug": True}

try:
    config = cattrs.structure(bad_data, Config)
except cattrs.ClassValidationError as e:
    print("Validation failed!")
    for error in e.exceptions:
        print(f"  Field error: {error}")

Output:

Validation failed!
  Field error: invalid literal for int() with base 10: 'not-a-number' (AttributeError) @ $.port

The @ $.port path notation shows exactly which field caused the problem using JSONPath-style notation. For deeply nested structures, you might see @ $.address.zip_code. This structured error reporting makes debugging data issues much faster than hunting through a generic traceback. In production, you can catch ClassValidationError, log the structured errors, and return a meaningful validation response to the API caller.

Stack Trace Steve holding ClassValidationError report
ClassValidationError @ $.port — not ‘something went wrong’, but where and what.

Real-Life Example: Parsing a Paginated API Response

Here is a complete example structuring a paginated API response with nested objects, optional fields, and a custom datetime hook:

# cattrs_api_parser.py
import attrs
import cattrs
from datetime import datetime
from typing import List, Optional

# Domain models
@attrs.define
class Author:
    id: int
    username: str
    email: Optional[str] = None

@attrs.define
class Post:
    id: int
    title: str
    body: str
    author: Author
    created_at: datetime
    tags: List[str]
    published: bool

@attrs.define
class PaginatedPosts:
    total: int
    page: int
    per_page: int
    posts: List[Post]

# Set up converter with datetime hook
converter = cattrs.Converter()

def parse_datetime(value, _):
    if isinstance(value, datetime):
        return value
    return datetime.fromisoformat(value.replace("Z", "+00:00"))

converter.register_structure_hook(datetime, parse_datetime)

# Simulated response from jsonplaceholder.typicode.com (enriched)
api_response = {
    "total": 100,
    "page": 1,
    "per_page": 2,
    "posts": [
        {
            "id": 1,
            "title": "Getting Started with Python",
            "body": "Python is a versatile language...",
            "author": {"id": 10, "username": "alice", "email": "alice@example.com"},
            "created_at": "2026-01-15T10:30:00",
            "tags": ["python", "beginner"],
            "published": True
        },
        {
            "id": 2,
            "title": "Advanced asyncio Patterns",
            "body": "Structured concurrency changes everything...",
            "author": {"id": 11, "username": "bob"},
            "created_at": "2026-02-20T14:00:00",
            "tags": ["python", "async", "advanced"],
            "published": True
        }
    ]
}

# One line to structure the entire response
result = converter.structure(api_response, PaginatedPosts)

print(f"Page {result.page} of {result.total // result.per_page}")
for post in result.posts:
    print(f"\n[{post.id}] {post.title}")
    print(f"  Author: {post.author.username} ({post.author.email or 'no email'})")
    print(f"  Posted: {post.created_at.strftime('%b %d, %Y')}")
    print(f"  Tags: {', '.join(post.tags)}")

Output:

Page 1 of 50

[1] Getting Started with Python
  Author: alice (alice@example.com)
  Posted: Jan 15, 2026
  Tags: python, beginner

[2] Advanced asyncio Patterns
  Author: bob (no email)
  Posted: Feb 20, 2026
  Tags: python, async, advanced

The entire multi-level API response — paginated wrapper, list of posts, nested author objects, datetime strings, optional email — structures in a single converter.structure() call. Adding a new field to the model automatically includes it in both structuring and unstructuring without changing any conversion code.

Frequently Asked Questions

How does cattrs compare to Pydantic?

Pydantic models are self-contained — validation logic lives in the class itself. cattrs is a separate converter layer that works with attrs classes or dataclasses. Pydantic V2 is extremely fast and has first-class FastAPI integration. cattrs is better suited for code that already uses attrs, needs highly customizable conversion behavior, or wants to separate the data model from the conversion logic. For new API-heavy projects, Pydantic is often the default choice; for attrs-based codebases, cattrs fits naturally.

Does cattrs work with standard dataclasses?

Yes. cattrs supports Python’s built-in dataclasses in addition to attrs classes. Use @dataclass from the standard library and cattrs will structure and unstructure them using the same API. The main difference is that attrs classes have more features (validators, converters, slots) that cattrs can leverage. For simple cases, dataclasses work fine.

Is cattrs fast enough for high-volume use?

cattrs generates optimized structuring code at registration time rather than inspecting types on every call. For high-volume scenarios, use the cattrs.gen.make_dict_structure_fn() pattern to pre-generate and cache structuring functions. In benchmarks, cattrs is significantly faster than manual dict parsing loops and competitive with Pydantic V1. For extreme performance requirements, consider Pydantic V2 with its Rust-based core.

How do I customize unstructuring (object to dict)?

Use converter.register_unstructure_hook(MyType, lambda obj: ...) to register a custom unstructuring function. For example, to serialize datetime as ISO format: converter.register_unstructure_hook(datetime, lambda dt: dt.isoformat()). You can also use cattrs.gen.make_dict_unstructure_fn() to generate an unstructure function with field overrides — useful for renaming keys or excluding fields from serialization.

What happens if a required field is missing?

If a required field (no default value) is absent from the raw data, cattrs raises a ClassValidationError with the path of the missing field. If you want to allow missing fields and use a fallback, either add a default value in your class definition (field: str = "default") or use Optional[str] = None. You can also register a structure hook that fills in defaults before structuring if the data source is known to be incomplete.

Conclusion

The cattrs library eliminates the repetitive, error-prone conversion code between raw data and typed Python objects. You learned how to structure and unstructure objects, handle nested types and optional fields, register custom hooks for special type conversions, and interpret structured validation errors. The paginated API parser showed how all these features combine into a clean, maintainable data layer.

The logical next step is to apply cattrs at the boundary of your application — wherever external data enters your system (API responses, config files, database results) — and convert it to typed objects immediately. From that point, the rest of your application works with typed, validated data and never touches raw dicts again. The catt.rs documentation covers advanced patterns including generating optimized converters, handling forward references, and customizing the global converter.

How To Use Python invoke for Task Automation

How To Use Python invoke for Task Automation

Intermediate

Every project accumulates a collection of repetitive commands — running tests, building documentation, deploying to staging, linting code, cleaning build artifacts. Most teams handle this with a Makefile or a pile of shell scripts that nobody fully understands six months later. Makefiles are powerful but cryptic. Shell scripts lack documentation and type safety. There is a better way.

invoke is a Python task runner that lets you define automation tasks as regular Python functions decorated with @task. Your team runs inv test instead of remembering a long pytest command. You run inv deploy --env=staging instead of sourcing an environment file and running three scripts in sequence. Tasks are documented, composable, and written in the language your whole team already knows.

This article covers how to install and configure invoke, how to define tasks with arguments and dependencies, how to run shell commands from tasks, how to group tasks into namespaces, and how to build a complete project automation workflow. By the end you will have a tasks.py file that replaces your project’s Makefile entirely.

Task Automation with invoke: Quick Example

Here is a minimal tasks.py file with two tasks you can run immediately:

# tasks.py
from invoke import task

@task
def hello(c):
    """Say hello and show the current directory."""
    print("Hello from invoke!")
    c.run("pwd")

@task
def clean(c):
    """Remove Python bytecode and cache files."""
    c.run("find . -type f -name '*.pyc' -delete")
    c.run("find . -type d -name '__pycache__' -exec rm -rf {} + 2>/dev/null || true")
    print("Cleaned up bytecode files.")

Run with:

inv hello
# Hello from invoke!
# /home/user/myproject

inv clean
# Cleaned up bytecode files.

inv --list
# Available tasks:
#   clean   Remove Python bytecode and cache files.
#   hello   Say hello and show the current directory.

Each task is a Python function decorated with @task. The first argument c is the Context object that provides the c.run() method for running shell commands. The function’s docstring becomes the task description shown in inv --list. The sections below go deeper into arguments, dependencies, namespaces, and real-world patterns.

What Is invoke and Why Use It?

invoke was created by the same developer who built the Fabric deployment library. It provides a clean framework for defining, documenting, and running project tasks without leaving Python. Unlike Makefiles, invoke tasks are regular Python functions with full access to Python’s standard library, third-party packages, and IDE support including type checking and autocompletion.

FeatureMakefileShell Scriptinvoke
LanguageMake syntaxBashPython
DocumentationComments onlyComments onlyDocstrings + –list
ArgumentsVariables$1, $2, …Named params with types
DependenciesTarget prereqsManual callspre/post decorators
Cross-platformLimitedNo (bash-specific)Yes (Python)
IDE supportNoneLimitedFull

Install invoke with pip, then create a tasks.py file in your project root. The inv command automatically discovers this file when run from that directory or any subdirectory.

# terminal -- run this to install
# pip install invoke

import invoke
print(invoke.__version__)
2.2.0
Sudo Sam replacing messy Makefile with neat Python function boxes
A Makefile is just a Python script that hates you.

Tasks with Arguments and Defaults

Tasks accept keyword arguments that users pass on the command line. Default values make arguments optional. Boolean flags become --flag / --no-flag switches automatically. This gives you a proper CLI for free.

# tasks.py
from invoke import task

@task
def test(c, module="", verbose=False, coverage=False):
    """Run the test suite.

    Args:
        module: Specific module to test (default: all)
        verbose: Show verbose output
        coverage: Generate a coverage report
    """
    cmd = "pytest"
    if verbose:
        cmd += " -v"
    if coverage:
        cmd += " --cov=src --cov-report=html"
    if module:
        cmd += f" tests/test_{module}.py"
    else:
        cmd += " tests/"
    c.run(cmd)

Usage:

inv test
inv test --verbose
inv test --module=utils --verbose
inv test --coverage

Each Python function argument becomes a CLI flag. Strings become --arg=value options. Booleans become --flag / --no-flag pairs. Integers accept numeric values. invoke infers the type from the default value, so verbose=False creates a boolean toggle and module="" creates a string option.

Task Dependencies

Tasks often need to run in sequence — clean before build, build before deploy. invoke handles this with the pre and post arguments to the @task decorator. Pre-tasks run before the main task; post-tasks run after it completes successfully.

# tasks.py
from invoke import task

@task
def clean(c):
    """Remove build artifacts."""
    c.run("rm -rf dist/ build/ *.egg-info")
    print("Build directory cleaned.")

@task
def lint(c):
    """Run linting checks."""
    c.run("ruff check src/")
    print("Linting passed.")

@task(pre=[clean, lint])
def build(c):
    """Build the package (runs clean and lint first)."""
    c.run("python -m build")
    print("Build complete.")

@task(pre=[build])
def publish(c, test=False):
    """Publish to PyPI (or TestPyPI with --test)."""
    if test:
        c.run("twine upload --repository testpypi dist/*")
    else:
        c.run("twine upload dist/*")

Output when running inv publish –test:

Build directory cleaned.
Linting passed.
Build complete.
Uploading distributions to https://test.pypi.org/legacy/

The dependency chain runs automatically: inv publish triggers build, which first runs clean and lint. You do not have to remember the order — it is encoded in the task definitions. Tasks are deduplicated if multiple paths would run the same task twice.

Cache Katie managing a production pipeline flowchart
pre=[clean, lint]: dependency management that actually explains itself.

Using the Context and c.run()

The Context object c is the core of every invoke task. Its c.run() method executes shell commands and gives you control over output, errors, and environment. Understanding c.run() options is essential for writing robust tasks.

# tasks.py
from invoke import task

@task
def deploy(c, env="staging"):
    """Deploy the application to the specified environment."""
    print(f"Deploying to {env}...")

    # hide=True suppresses output (returns it instead)
    result = c.run("git rev-parse HEAD", hide=True)
    commit = result.stdout.strip()
    print(f"Deploying commit: {commit[:8]}")

    # warn=True prevents exception on non-zero exit code
    result = c.run("systemctl is-active myapp", warn=True, hide=True)
    if result.ok:
        print("Service is running -- will restart after deploy")
    else:
        print("Service is stopped -- will start after deploy")

    # Run with a specific working directory
    with c.cd("/var/www/myapp"):
        c.run("git pull origin main")
        c.run("pip install -r requirements.txt -q")
        c.run("python manage.py migrate --noinput")
        c.run("systemctl restart myapp")

    print(f"Deploy to {env} complete!")

Key c.run() options:

OptionDefaultEffect
hide=TrueFalseSuppress output; returns it in result.stdout
warn=TrueFalseReturn result instead of raising on non-zero exit
echo=TrueFalsePrint the command before running it
env={}NoneMerge extra env vars into the subprocess
pty=TrueFalseAllocate a pseudo-terminal (for interactive programs)

Organizing Tasks with Namespaces

As projects grow, a flat list of tasks becomes hard to navigate. invoke supports namespaces to group related tasks. You define tasks in separate modules and combine them into a namespace tree.

# tasks/db.py
from invoke import task

@task
def migrate(c):
    """Run database migrations."""
    c.run("python manage.py migrate")

@task
def seed(c):
    """Seed the database with sample data."""
    c.run("python manage.py loaddata fixtures/sample.json")

@task
def backup(c):
    """Backup the database to a timestamped file."""
    import datetime
    stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    c.run(f"pg_dump mydb > backups/db_{stamp}.sql")
    print(f"Backup created: db_{stamp}.sql")
# tasks.py (root)
from invoke import Collection
from tasks import db

namespace = Collection()
namespace.add_collection(Collection.from_module(db), name="db")

Running namespaced tasks:

inv --list
# db.migrate    Run database migrations.
# db.seed       Seed the database with sample data.
# db.backup     Backup the database to a timestamped file.

inv db.migrate
inv db.backup

Namespaces keep your task list organized as it grows. You can nest namespaces arbitrarily deep. A common pattern is to have a top-level tasks.py that imports and organizes sub-modules for different concerns: db, deploy, docs, test.

Loop Larry organizing namespace boxes into shelf sections
inv db.migrate vs inv migrate — namespaces before your team ships confusion.

Real-Life Example: Full Project Automation Workflow

Here is a complete tasks.py for a typical Python project covering testing, linting, docs, and deployment:

# tasks.py -- full project automation
from invoke import task, Collection
import os

@task
def clean(c):
    """Remove all build artifacts and cache files."""
    patterns = [
        "dist/", "build/", "*.egg-info",
        ".pytest_cache/", ".ruff_cache/",
        "htmlcov/", ".coverage",
    ]
    for pattern in patterns:
        c.run(f"rm -rf {pattern}", warn=True)
    c.run("find . -type d -name '__pycache__' -exec rm -rf {} + 2>/dev/null || true")
    print("Clean complete.")

@task
def lint(c):
    """Run ruff linter and formatter check."""
    c.run("ruff check src/ tests/")
    c.run("ruff format --check src/ tests/")
    print("Lint passed.")

@task
def test(c, verbose=False, coverage=False, module=""):
    """Run the test suite."""
    cmd = "pytest"
    if verbose:
        cmd += " -v"
    if coverage:
        cmd += " --cov=src --cov-report=term-missing --cov-report=html"
    if module:
        cmd += f" tests/test_{module}.py"
    else:
        cmd += " tests/"
    c.run(cmd)

@task(pre=[lint, test])
def check(c):
    """Run all quality checks (lint + tests)."""
    print("All checks passed!")

@task(pre=[clean, check])
def build(c):
    """Build distribution packages."""
    c.run("python -m build")
    result = c.run("ls dist/", hide=True)
    print("Built:", result.stdout.strip())

@task(pre=[build])
def release(c, test=False):
    """Release to PyPI (use --test for TestPyPI)."""
    repo = "testpypi" if test else "pypi"
    c.run(f"twine upload --repository {repo} dist/*")
    print(f"Released to {repo}!")

# Set up namespace
ns = Collection(clean, lint, test, check, build, release)

Developer workflow:

inv check          # lint + test before committing
inv build          # clean, check, then build
inv release --test # test release to TestPyPI
inv release        # production release

This single file replaces a Makefile, a set of shell scripts, and institutional knowledge about command flags. New team members run inv --list and immediately understand what automation is available. The dependency chain ensures nothing is skipped by accident.

Frequently Asked Questions

How is invoke different from Make?

Makefiles are primarily designed for build systems with file dependency tracking (only rebuild what changed). invoke is a general-purpose task runner without file dependency semantics. invoke tasks are Python functions with full language support, IDE integration, and proper argument parsing. For Python projects, invoke is almost always more readable and maintainable than an equivalent Makefile.

How do I share configuration across tasks?

Use invoke’s configuration system. Create a invoke.yaml file in your project root with shared settings, or pass a config object to your collection. Inside tasks, access config via c.config.my_setting. You can also layer configs: project defaults overridden by user settings overridden by environment variables. This is much cleaner than a pile of global constants at the top of your tasks file.

How do I run interactive commands (like a REPL) from invoke?

Use c.run("command", pty=True) to allocate a pseudo-terminal. This is required for programs that detect whether they are running in a terminal and change behavior accordingly — such as python, psql, or vim. Without pty=True, these commands may behave unexpectedly or refuse to start in interactive mode.

Can I use invoke in CI pipelines?

Yes. invoke works identically in CI because it is just Python. Add invoke to your requirements-dev.txt and call inv test in your CI config the same way developers do locally. This eliminates the common problem of CI running different commands than local development. Your pipeline becomes a single source of truth: inv lint && inv test && inv build.

Can invoke run tasks in parallel?

invoke does not have built-in parallel task execution in the standard way. However, you can use Python’s concurrent.futures or threading inside a task to parallelize work. For parallel lint and test runs, a common pattern is to use c.run() with asynchronous=True to start background subprocesses and then wait for them. For most project automation, sequential execution is fast enough that parallelism is unnecessary.

Conclusion

The invoke library turns repetitive project commands into discoverable, documented, composable Python tasks. You learned how to define tasks with @task, add CLI arguments and defaults, chain tasks with pre/post dependencies, use the Context for shell commands, organize tasks into namespaces, and build a complete project automation workflow.

The next step is to replace your project’s Makefile or script folder with a tasks.py. Start with your three most-used commands and convert them first. Once your team experiences inv --list, they rarely want to go back to memorizing command flags. The official invoke documentation at pyinvoke.org covers configuration files, collection inheritance, and the executor API for advanced use cases.

How To Use Python sh for Shell Commands in Python

How To Use Python sh for Shell Commands in Python

Intermediate

Every Python developer reaches the point where they need to run a shell command from inside their script. Maybe you need to call git, compress a file with tar, check disk usage with df, or trigger a build script. The standard approach is subprocess — and while it works, it is notoriously verbose. A simple ls -la turns into five lines of boilerplate just to get the output back as a string.

The sh library solves this with a completely different philosophy: shell commands become Python functions. Instead of importing subprocess and constructing Popen objects, you import sh and call sh.ls("-la") the same way you call any Python function. Arguments map naturally, pipes work, and output comes back as a string-like object you can iterate, slice, or print directly.

In this article you will learn how to install and use sh, how to pass arguments and flags, how to capture output and handle errors, how to use piping and background processes, and how sh compares to subprocess. By the end you will be able to replace most of your subprocess boilerplate with clean, readable sh calls.

Running Shell Commands with sh: Quick Example

If you just want to see sh in action before diving into the details, here is a minimal working example that lists files in the current directory:

# quick_sh_example.py
import sh

# Call ls just like a Python function
result = sh.ls("-la", "/tmp")
print(result)

Output:

total 48
drwxrwxrwt 12 root root 4096 May 10 08:00 .
drwxr-xr-x 20 root root 4096 Apr 15 10:00 ..
-rw-r--r--  1 user user  123 May 10 07:55 example.txt

The call sh.ls("-la", "/tmp") runs ls -la /tmp in the shell and returns its output as a string-like RunningCommand object. You pass flags as separate string arguments — the same way you would type them on the command line, just split at each space. No shell=True, no Popen, no .communicate(). The sections below cover everything from basic calls to piping, streaming, and error handling.

What Is sh and Why Use It?

sh is a third-party Python library that wraps subprocesses in a function-call interface. Every program on your system’s PATH becomes importable as a callable. When you call that callable, sh launches the program, passes your arguments, captures stdout and stderr, and returns the result — all in one line of Python.

Compare the same task using subprocess versus sh:

Tasksubprocesssh
Run a commandsubprocess.run(["ls", "-la"])sh.ls("-la")
Capture outputresult = subprocess.run([...], capture_output=True, text=True)result = sh.ls("-la")
Pipe commandsMultiple Popen calls with stdout=PIPEsh.grep(sh.ls("-la"), "txt")
Check exit coderesult.returncodeException raised automatically on non-zero
Stream outputManual iteration over stdout_out=callback parameter

sh is best suited for scripting tasks where you want to glue together shell commands in Python without writing infrastructure code. For security-sensitive production code that validates user input, subprocess with explicit argument lists remains the safer choice. For developer tooling, build scripts, and automation, sh saves significant time.

Sudo Sam at terminal with subprocess boilerplate replaced by sh library
Five lines of subprocess boilerplate replaced by one. The shell called. Python answered.

Installing sh

sh works on Linux and macOS. It does not support Windows (Windows does not have POSIX-style subprocess semantics). If you are on Windows, use subprocess or Windows Subsystem for Linux instead.

Install sh with pip:

# install_sh.py -- run this in your terminal, not as Python
# pip install sh

import sh
print(sh.__version__)

Output:

2.0.7

Once installed, any command available on your PATH is immediately callable via sh. No registration, no configuration. If git is on your path, sh.git("status") works. If ffmpeg is installed, sh.ffmpeg(...) works. The library discovers commands dynamically at call time.

Basic Command Calls

The simplest use of sh is calling a command with no arguments. The return value is a RunningCommand object that behaves like a string — you can print it, iterate over its lines, or compare it to a string.

# basic_sh_calls.py
import sh

# No arguments
hostname = sh.hostname()
print("Hostname:", hostname.strip())

# With arguments
files = sh.ls("-1", "/etc")
for line in files:
    print(line.strip())

Output:

Hostname: mycomputer
apt
bash.bashrc
environment
fstab
hosts
passwd

Each positional argument you pass to the sh command maps to a command-line argument. Flags like -1 are just strings. If a flag takes a value (like --timeout=30), pass it as a single string or use keyword argument style (see the next section). The RunningCommand object has a .strip() method, splits on newlines when iterated, and has a .exit_code attribute.

Flags and Keyword Arguments

sh supports a keyword argument syntax for long flags. A keyword argument with a single underscore prefix maps to a short flag, and double underscore maps to a long flag with a dash. This keeps your Python code readable while matching shell conventions exactly.

# sh_keyword_flags.py
import sh

# curl with long flags using double-underscore prefix
# sh.curl maps --silent to _silent=True and --max-time to _max_time=5
result = sh.curl(
    "https://httpbin.org/get",
    _silent=True,
    _timeout=10
)
print(result[:200])

Output:

{
  "args": {},
  "headers": {
    "Accept": "*/*",
    "Host": "httpbin.org",
    "User-Agent": "curl/7.81.0"
  },
  ...
}

The underscore prefix is a Python convention that sh intercepts before passing to the subprocess. This way --output becomes _output="file.txt" and --verbose becomes _verbose=True. For flags with dashes in the name like --max-time, replace dashes with underscores: _max_time=10.

Debug Dee studying double-underscore to double-dash conversion chart
__max_time=10. Because Python hates hyphens in argument names.

Piping Commands Together

Piping is one of the most powerful features in shell scripting. In sh, you pipe by passing one command’s output as the first argument to another command. This mirrors the shell pipe operator | in a Python-native way.

# sh_piping.py
import sh

# Equivalent to: ls /etc | grep "conf" | wc -l
conf_count = sh.wc(
    sh.grep(
        sh.ls("/etc"),
        "conf"
    ),
    "-l"
)
print("Files matching 'conf':", conf_count.strip())

Output:

Files matching 'conf': 12

Read the nested calls inside-out: sh.ls("/etc") runs first and its output feeds into sh.grep(..., "conf"), which then feeds into sh.wc(..., "-l"). This is equivalent to ls /etc | grep conf | wc -l in bash. The nesting looks different from shell pipes but the execution order is the same. For complex pipelines, assign intermediate results to variables to keep the code readable.

Error Handling

By default, sh raises an ErrorReturnCode exception whenever a command exits with a non-zero status. This is the opposite of subprocess where you must check returncode manually. The exception carries the exit code, stdout, and stderr, so you have full context for debugging.

# sh_error_handling.py
import sh

try:
    # Try to list a directory that doesn't exist
    output = sh.ls("/nonexistent_directory_xyz")
except sh.ErrorReturnCode as e:
    print("Command failed!")
    print("Exit code:", e.exit_code)
    print("stderr:", e.stderr.decode().strip())

Output:

Command failed!
Exit code: 2
stderr: ls: cannot access '/nonexistent_directory_xyz': No such file or directory

You can suppress the exception and check the exit code yourself using _ok_code. Pass a list of exit codes that should be treated as success: sh.grep("pattern", "file.txt", _ok_code=[0, 1]) treats exit code 1 (no matches) as non-fatal. This is useful for commands like grep that use non-zero exit codes for semantic results rather than errors.

API Alice catching falling exception object from sh command
exit_code=2 means ‘file not found’. Not ‘everything is on fire’. Catch accordingly.

Streaming Output in Real Time

For long-running commands like builds or log watchers, waiting for all output before printing is frustrating. sh supports streaming via the _out callback parameter. You pass a function, and sh calls it with each line of output as it arrives.

# sh_streaming.py
import sh

def handle_line(line):
    """Called for each line of output as it arrives."""
    print(f"[stream] {line}", end="")

# Stream output of a long-running command
# Using 'ping' with count=3 as a demo of streaming
sh.ping("-c", "3", "8.8.8.8", _out=handle_line)

Output:

[stream] PING 8.8.8.8 (8.8.8.8) 56(84) bytes of data.
[stream] 64 bytes from 8.8.8.8: icmp_seq=1 ttl=118 time=14.2 ms
[stream] 64 bytes from 8.8.8.8: icmp_seq=2 ttl=118 time=13.9 ms
[stream] 64 bytes from 8.8.8.8: icmp_seq=3 ttl=118 time=14.1 ms

The _out parameter accepts any callable. You can use a lambda for simple cases or a class method for stateful processing. There is also an _err parameter for stderr. If you want to stream both stdout and stderr to the same handler, use _err_to_out=True to merge them before passing to _out.

Running Commands in the Background

By default, sh blocks until the command finishes. To run a command in the background and continue Python execution immediately, use the _bg=True parameter. The call returns a RunningCommand object immediately, and you can call .wait() on it later.

# sh_background.py
import sh
import time

# Start a background process
process = sh.sleep("5", _bg=True)
print("Sleep started in background")

# Do other work while the command runs
for i in range(3):
    print(f"Working... {i+1}")
    time.sleep(1)

# Wait for the background process to finish
process.wait()
print("Background process completed")

Output:

Sleep started in background
Working... 1
Working... 2
Working... 3
Background process completed

Background processes are useful for launching servers or watchers while your main script continues with setup. You can also call process.kill() to terminate a background process early, or check process.is_alive() to see if it is still running. Combine _bg=True with _out=callback for non-blocking streaming output.

Loop Larry juggling multiple background terminal processes
_bg=True: start it, forget it, wait() when you actually need the result.

Real-Life Example: Git Repository Health Check

Here is a practical script that uses sh to check the health of a Git repository — reporting uncommitted changes, recent commits, and branch status:

# git_health_check.py
import sh
import sys

def check_git_repo(path="."):
    """Check health of a git repository using sh."""
    try:
        # Verify it's a git repo
        sh.git("-C", path, "rev-parse", "--is-inside-work-tree", _silent=True)
    except sh.ErrorReturnCode:
        print(f"Error: {path} is not a git repository")
        sys.exit(1)

    print(f"=== Git Health Check: {path} ===\n")

    # Current branch
    branch = sh.git("-C", path, "rev-parse", "--abbrev-ref", "HEAD").strip()
    print(f"Branch: {branch}")

    # Uncommitted changes
    try:
        sh.git("-C", path, "diff", "--quiet")
        sh.git("-C", path, "diff", "--cached", "--quiet")
        print("Working tree: Clean")
    except sh.ErrorReturnCode:
        changed = sh.git("-C", path, "diff", "--name-only").strip()
        staged = sh.git("-C", path, "diff", "--cached", "--name-only").strip()
        if changed:
            print(f"Modified (unstaged):\n  {changed.replace(chr(10), chr(10)+'  ')}")
        if staged:
            print(f"Staged for commit:\n  {staged.replace(chr(10), chr(10)+'  ')}")

    # Last 3 commits
    print("\nLast 3 commits:")
    log = sh.git("-C", path, "log", "--oneline", "-3")
    for line in log:
        print(f"  {line}", end="")

    # Ahead/behind remote
    try:
        status = sh.git("-C", path, "status", "-sb").split("\n")[0]
        print(f"\nRemote status: {status.strip()}")
    except sh.ErrorReturnCode:
        print("\nRemote status: No remote configured")

if __name__ == "__main__":
    repo_path = sys.argv[1] if len(sys.argv) > 1 else "."
    check_git_repo(repo_path)

Output:

=== Git Health Check: . ===

Branch: main
Modified (unstaged):
  src/utils.py
  tests/test_main.py
Staged for commit:
  README.md

Last 3 commits:
  a1b2c3d Fix pagination bug in API client
  e4f5a6b Add retry logic with exponential backoff
  c7d8e9f Initial project structure

Remote status: ## main...origin/main [ahead 1]

This script shows how sh makes Git scripting feel natural. Each sh.git() call is a clear, readable expression of the underlying command. Error handling via try/except matches how you would handle git exit codes in bash. You could extend this to compare branches, check for merge conflicts, or send a Slack notification when the working tree is dirty.

Stack Trace Steve reading git log on receipt-paper scroll
git log –oneline: because full commit hashes are for people who enjoy suffering.

Frequently Asked Questions

Does sh work on Windows?

sh does not support Windows. It relies on POSIX semantics for subprocess management that do not exist on Windows. If you are developing on Windows, use subprocess directly, or consider plumbum, which is a cross-platform alternative. On macOS and Linux, sh works fully.

When should I use sh instead of subprocess?

Use sh for developer tooling, automation scripts, and one-off tasks where readability matters more than security hardening. Use subprocess when you need precise control over the environment, when you are processing untrusted user input (to avoid injection risks), or when you are building a library that others will use. Both libraries ultimately launch child processes — sh just wraps the mechanics in a cleaner API.

How do I pass stdin to a command?

Use the _in parameter to pipe a string or file-like object to the command’s stdin. For example, sh.grep("error", _in=open("logfile.txt")) pipes the file to grep. You can also pass a string directly: sh.wc("-l", _in="line1\nline2\nline3"). For interactive commands that expect user input, use the _tty_in=True option.

How do I set environment variables for a command?

Pass a dictionary to the _env parameter: sh.mycommand(_env={"API_KEY": "abc123", "DEBUG": "true"}). Note that this replaces the entire environment, so if you want to extend the current environment, merge it first: import os; env = {**os.environ, "MY_VAR": "value"}; sh.mycommand(_env=env). This keeps all existing environment variables while adding or overriding specific ones.

How do I set a timeout on a command?

Use the _timeout parameter: sh.curl("https://example.com", _timeout=10) raises a TimeoutException if the command runs longer than 10 seconds. You can catch it with except sh.TimeoutException. This is cleaner than using _bg=True and manually monitoring a timer. For streaming commands, combine _timeout with _out=callback to process partial output before the timeout fires.

Conclusion

The sh library transforms shell command execution from a boilerplate chore into expressive Python code. You learned how to install sh, call commands with positional and keyword arguments, capture and stream output, pipe commands together, handle errors cleanly, and run background processes. The git health check project showed how these features combine into practical automation.

The next step is to take the health check script and extend it — add a check for dependency freshness using sh.pip("list", "--outdated"), or send alerts when certain conditions are met using sh.curl() to call a webhook. The sh documentation at sh.readthedocs.io covers advanced features including baking default arguments into reusable command objects using sh.Command.

How To Use Python orjson for Fast JSON Processing

How To Use Python orjson for Fast JSON Processing

Intermediate

You have a Python service that parses JSON responses from an API thousands of times per second, and the standard json module is quietly becoming a bottleneck. At low traffic volumes this goes unnoticed, but once you scale up, milliseconds of serialization overhead compound into real latency. If you have ever profiled a Python web service and found json.dumps or json.loads sitting near the top of the flame graph, you already know this pain.

orjson is a fast, correct JSON library for Python written in Rust. It drops into nearly any codebase as a replacement for the standard json module and typically runs 2-10x faster on both serialization and deserialization. It also natively supports types the standard library forces you to handle manually — datetime, UUID, numpy arrays, and dataclasses.

In this article you will learn how to install orjson, serialize and deserialize JSON with it, use its built-in support for Python-native types, benchmark it against the standard library, and integrate it into a real-world FastAPI project. By the end you will have a working understanding of when and why to choose orjson over the alternatives.

orjson Quick Example

Before diving deep, here is a self-contained example that shows the core pattern. orjson is nearly a drop-in replacement for the standard json module, but returns and accepts bytes instead of str.

# quick_example.py
import orjson
from datetime import datetime

data = {
    "name": "Alice",
    "score": 98.6,
    "logged_in": True,
    "joined": datetime(2024, 3, 15, 9, 30, 0),
    "tags": ["python", "backend","fast"]
}

# Serialize to bytes (not str like the standard json module)
encoded = orjson.dumps(data)
print(encoded)
print(type(encoded))

# Deserialize back to a Python dict
decoded = orjson.loads(encoded)
print(decoded["joined"])  # datetime is serialized as ISO 8601 string
print(type(decoded))

Output:

b'{"name":"Alice","score":98.6,"logged_in":true,"joined":"2024-03-15T09:30:00","tags":["python","backend","fast"]}'
<class 'bytes'>
2024-03-15T09:30:00
<class 'dict'>

Two things stand out right away. First, orjson.dumps() returns bytes, not a string — this is intentional and saves an unnecessary encoding step when writing to network sockets or files. Second, the datetime object is automatically serialized to ISO 8601 format without any extra work, which the standard json module would refuse to handle at all.

orjson serializes JSON at native speed
orjson goes brrr. stdlib json takes a nap.

What Is orjson and Why Use It?

orjson is a Python JSON library implemented in Rust using the Serde framework. It was created specifically to address the performance limitations of Python’s built-in json module, which is implemented in C but still shows its age when processing large payloads at high throughput.

The key differences between orjson and the standard library are:

FeatureStandard jsonorjson
Output type of dumps()strbytes
datetime supportRaises TypeErrorNative ISO 8601
UUID supportRaises TypeErrorNative string
dataclass supportRaises TypeErrorNative dict-like
numpy array supportNot supportedNative (optional dep)
Performance (typical)Baseline2-10x faster
Strict UTF-8 validationNoYes

The Rust implementation takes advantage of SIMD instructions and a highly optimized Serde-based serialization pipeline. For applications doing heavy JSON processing — API gateways, caching layers, log aggregators — the improvement is measurable and often significant.

Installing orjson

orjson is available on PyPI and installs with a single command:

# install_orjson.sh
pip install orjson

Output:

Collecting orjson
  Downloading orjson-3.10.x-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (144 kB)
Successfully installed orjson-3.10.x

orjson ships as a pre-compiled binary for most platforms (Linux, macOS, Windows on x86-64 and ARM), so there is no Rust toolchain required. If you are on a less common platform you may need Rust installed to build from source. Verify the installation with a quick import check:

# verify_install.py
import orjson
print(orjson.__version__)

Output:

3.10.x

Serializing Python Objects with orjson.dumps()

The orjson.dumps() function converts Python objects to JSON bytes. The most important thing to remember is that it always returns bytes, not str. If you need a string, call .decode() on the result.

# serialization_basics.py
import orjson
from datetime import datetime, date
from uuid import UUID
from dataclasses import dataclass

@dataclass
class User:
    id: UUID
    name: str
    created: datetime
    active: bool

user = User(
    id=UUID("12345678-1234-5678-1234-567812345678"),
    name="Bob Smith",
    created=datetime(2025, 1, 10, 14, 30),
    active=True
)

# Serialize the dataclass directly -- no custom encoder needed
result = orjson.dumps(user)
print(result)

# Decode to string if needed
print(result.decode("utf-8"))

Output:

b'{"id":"12345678-1234-5678-1234-567812345678","name":"Bob Smith","created":"2025-01-10T14:30:00","active":true}'
{"id":"12345678-1234-5678-1234-567812345678","name":"Bob Smith","created":"2025-01-10T14:30:00","active":true}

Notice that the UUID, datetime, and dataclass are all handled automatically with zero configuration. With the standard json module, each of these would raise a TypeError: Object of type X is not JSON serializable error, requiring a custom default function.

orjson handles datetime UUID numpy types natively
datetime, UUID, numpy — orjson just handles them. No __str__ hacks.

orjson Options and Flags

orjson supports serialization options passed via the option parameter as bitwise-OR combinations of constants. These let you control formatting, sorting, and type handling:

# orjson_options.py
import orjson

data = {
    "z_key": "last",
    "a_key": "first",
    "count": 42,
    "ratio": 3.14159
}

# Pretty-print with indented output
pretty = orjson.dumps(data, option=orjson.OPT_INDENT_2)
print("Pretty:")
print(pretty.decode())

# Sort keys alphabetically
sorted_output = orjson.dumps(data, option=orjson.OPT_SORT_KEYS)
print("\nSorted keys:")
print(sorted_output.decode())

# Combine options with bitwise OR
both = orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS)
print("\nPretty + Sorted:")
print(both.decode())

Output:

Pretty:
{
  "z_key": "last",
  "a_key": "first",
  "count": 42,
  "ratio": 3.14159
}

Sorted keys:
{"a_key":"first","count":42,"ratio":3.14159,"z_key":"last"}

Pretty + Sorted:
{
  "a_key": "first",
  "count": 42,
  "ratio": 3.14159,
  "z_key": "last"
}

The most useful options in practice are OPT_INDENT_2 for human-readable output during debugging, OPT_SORT_KEYS for deterministic output in tests or caches, OPT_NON_STR_KEYS for dicts with integer or float keys, and OPT_UTC_Z to use Z suffix instead of +00:00 for UTC datetimes.

Deserializing with orjson.loads()

The orjson.loads() function accepts both bytes and str input and returns Python objects. Unlike the standard library, it performs strict UTF-8 validation on input, which means malformed data fails loudly rather than silently corrupting your data.

# deserialization.py
import orjson

# From bytes (most common in API and network scenarios)
json_bytes = b'{"name": "Charlie", "score": 99.5, "tags": ["fast", "correct"]}'
data = orjson.loads(json_bytes)
print(data)
print(type(data["score"]))

# From string also works
json_str = '{"status": "ok", "count": 1000}'
data2 = orjson.loads(json_str)
print(data2)

# Error handling -- orjson raises JSONDecodeError for invalid input
try:
    orjson.loads(b'{"broken": }')
except orjson.JSONDecodeError as e:
    print(f"Parse error: {e}")

Output:

{'name': 'Charlie', 'score': 99.5, 'tags': ['fast', 'correct']}
<class 'float'>
{'status': 'ok', 'count': 1000}
Parse error: expected value at line 1 column 12

One important detail: orjson.JSONDecodeError is a subclass of json.JSONDecodeError, so any existing except blocks using json.JSONDecodeError will still catch orjson errors without modification. This makes the migration path from the standard library seamless.

orjson raises error for incompatible key types
TypeError without OPT_NON_STR_KEYS. Your integer key is not welcome here.

Benchmarking orjson vs Standard json

Let us run a concrete benchmark so you can see the actual performance difference on your hardware. We test serializing and deserializing a moderately complex nested dictionary 100,000 times:

# benchmark_orjson.py
import json
import orjson
import time
from datetime import datetime

# Test data -- similar to a typical API response
sample_data = {
    "users": [
        {"id": i, "name": f"User{i}", "email": f"user{i}@example.com",
         "score": i * 1.5, "active": i % 2 == 0, "tags": ["python", "backend"]}
        for i in range(50)
    ],
    "total": 50,
    "page": 1
}

ITERATIONS = 100_000

# Benchmark json.dumps
start = time.perf_counter()
for _ in range(ITERATIONS):
    json.dumps(sample_data)
json_dumps_time = time.perf_counter() - start

# Benchmark orjson.dumps (returns bytes)
start = time.perf_counter()
for _ in range(ITERATIONS):
    orjson.dumps(sample_data)
orjson_dumps_time = time.perf_counter() - start

# Benchmark json.loads
json_str = json.dumps(sample_data)
start = time.perf_counter()
for _ in range(ITERATIONS):
    json.loads(json_str)
json_loads_time = time.perf_counter() - start

# Benchmark orjson.loads
orjson_bytes = orjson.dumps(sample_data)
start = time.perf_counter()
for _ in range(ITERATIONS):
    orjson.loads(orjson_bytes)
orjson_loads_time = time.perf_counter() - start

print(f"json.dumps:   {json_dumps_time:.3f}s")
print(f"orjson.dumps: {orjson_dumps_time:.3f}s  ({json_dumps_time/orjson_dumps_time:.1f}x faster)")
print(f"json.loads:   {json_loads_time:.3f}s")
print(f"orjson.loads: {orjson_loads_time:.3f}s  ({json_loads_time/orjson_loads_time:.1f}x faster)")

Output (typical results on a modern CPU):

json.dumps:   2.841s
orjson.dumps: 0.482s  (5.9x faster)
json.loads:   2.103s
orjson.loads: 0.631s  (3.3x faster)

Actual speedups vary based on payload size, nesting depth, and hardware, but 3-6x faster on both operations is typical. For a service handling 1,000 requests per second with 100KB payloads each, this translates to substantial CPU savings that compound at scale.

Benchmark comparison showing orjson performance
Benchmark chart says orjson wins. Every time.

Real-Life Example: FastAPI Response Caching with orjson

Here is a practical example that integrates orjson into a FastAPI application. We use orjson for both serializing API responses and caching them in memory, demonstrating a common production pattern:

# fastapi_orjson_cache.py
"""
FastAPI app with orjson-powered response serialization and in-memory caching.
Run with: uvicorn fastapi_orjson_cache:app --reload
"""
import orjson
from fastapi import FastAPI
from fastapi.responses import Response
from datetime import datetime, timezone
from dataclasses import dataclass, field
from typing import Optional
import hashlib

app = FastAPI()

# Simple in-memory cache using orjson bytes as values
_cache: dict[str, bytes] = {}


@dataclass
class ProductRecord:
    id: int
    name: str
    price: float
    in_stock: bool
    last_updated: datetime
    tags: list[str] = field(default_factory=list)


def get_product_from_db(product_id: int) -> Optional[ProductRecord]:
    """Simulates a database lookup."""
    if product_id > 100:
        return None
    return ProductRecord(
        id=product_id,
        name=f"Product {product_id}",
        price=round(product_id * 9.99, 2),
        in_stock=product_id % 3 != 0,
        last_updated=datetime.now(timezone.utc),
        tags=["electronics", "featured"] if product_id < 50 else ["clearance"]
    )


@app.get("/products/{product_id}")
async def get_product(product_id: int):
    cache_key = f"product:{product_id}"

    # Check cache first
    if cache_key in _cache:
        # Return cached bytes directly -- no re-serialization needed
        return Response(content=_cache[cache_key], media_type="application/json")

    product = get_product_from_db(product_id)
    if product is None:
        error = orjson.dumps({"error": "Product not found", "id": product_id})
        return Response(content=error, media_type="application/json", status_code=404)

    # Serialize with orjson -- handles dataclass and datetime natively
    encoded = orjson.dumps(product, option=orjson.OPT_INDENT_2)
    _cache[cache_key] = encoded

    return Response(content=encoded, media_type="application/json")


@app.get("/cache/stats")
async def cache_stats():
    stats = {
        "cached_keys": len(_cache),
        "cache_size_bytes": sum(len(v) for v in _cache.values()),
        "timestamp": datetime.now(timezone.utc)
    }
    return Response(content=orjson.dumps(stats), media_type="application/json")

Example curl output:

$ curl http://localhost:8000/products/42
{
  "id": 42,
  "name": "Product 42",
  "price": 419.58,
  "in_stock": true,
  "last_updated": "2025-03-15T10:22:41.123456+00:00",
  "tags": ["electronics", "featured"]
}

The power here is that the serialized bytes are stored in the cache and served directly as the HTTP response body without deserialization or re-serialization. orjson's native datetime handling means the UTC-aware datetime in last_updated is serialized to a full ISO 8601 string with timezone offset -- exactly what frontend clients expect.

orjson returns bytes for efficient cache storage
orjson returns bytes. Redis is happy. Your latency is happy.

Frequently Asked Questions

Why does orjson return bytes instead of str?

orjson returns bytes because JSON data in Python is almost always immediately encoded to bytes for network transport or file writing. Returning bytes directly avoids an extra .encode("utf-8") step. If you need a string, just call result.decode(). This is a deliberate performance decision -- the bytes representation is the final form that gets sent over the wire.

Is orjson a drop-in replacement for the json module?

Almost, but not completely. The function signatures are similar, but orjson.dumps() returns bytes while json.dumps() returns str. Any code that does f.write(json.dumps(data)) will break because you cannot write bytes to a text-mode file. The fix is either f.write(orjson.dumps(data).decode()) or opening the file in binary mode "wb". The default= parameter also works slightly differently in edge cases.

How do I serialize custom types that orjson doesn't support natively?

Use the default parameter with a callback function, just like the standard library. The function receives the object and should return a JSON-serializable value. For example, to serialize a Decimal: orjson.dumps(data, default=lambda x: float(x) if isinstance(x, Decimal) else TypeError). orjson's native type support is broad enough that custom default handlers are rarely needed for modern Python code.

Is orjson thread-safe?

Yes. orjson functions are stateless -- each call to dumps() or loads() is entirely independent. There is no global mutable state, so multiple threads can call orjson simultaneously without any synchronization. This makes it a natural fit for multi-threaded web servers like gunicorm or uvicorn workers.

How does orjson compare to ujson?

Both are faster than the standard library, but orjson is consistently faster than ujson in benchmarks and has better correctness guarantees. ujson has a history of silently dropping or corrupting data in edge cases (very large integers, NaN values, deeply nested structures). orjson prioritizes correctness alongside speed. For production code where data integrity matters, orjson is the better choice.

Conclusion

orjson delivers a simple, high-value upgrade to any Python codebase that does significant JSON processing. The Rust-based implementation provides 3-6x faster serialization and deserialization, native support for datetime, UUID, dataclasses, and numpy arrays, and correct strict UTF-8 validation -- all with an API close enough to the standard library that migration is usually a matter of replacing the import and handling the bytes return type.

Try extending the FastAPI caching example to use Redis as a backend instead of in-memory storage, or add a Cache-Control header to the response based on the product's last_updated timestamp. These are natural next steps that reinforce how orjson fits into production API patterns.

For the full API reference and advanced options like OPT_PASSTHROUGH_DATETIME, see the orjson GitHub repository.

How To Use Python glom for Nested Data Access

How To Use Python glom for Nested Data Access

Intermediate

You are working with a JSON API response. You need a value that is three levels deep — something like response["data"]["users"][0]["profile"]["email"]. Any key in that chain might be missing, any list might be empty, and the response structure might change between API versions. So you add a try-except block, or a series of .get() calls with defaults, and suddenly five lines of boilerplate are doing the work of one conceptually simple operation.

Glom is a Python library that replaces this pattern with a clean, declarative approach. You describe the path to the data you want as a spec, and glom walks the structure and extracts it — raising a clear error if something is missing, or returning a default you specify. It handles dicts, lists, objects, and nested combinations. The same spec language also supports data transformation, aggregation, and restructuring in place.

This article covers installation, basic path access, list traversal, coalesce for fallbacks, nested restructuring with Spec, the T object for method calls, and a real-world API response processing example. By the end you will have a practical toolkit for working with complex nested data without defensive boilerplate cluttering your code.

Accessing Nested Python Data with glom: Quick Example

Here is the same data access done the naive way vs. the glom way:

# quick_glom.py
from glom import glom

data = {
    "user": {
        "profile": {
            "name": "Alice",
            "contact": {
                "email": "alice@example.com"
            }
        }
    }
}

# Naive way - crashes if any key is missing
email_naive = data["user"]["profile"]["contact"]["email"]

# Glom way - clear path spec
email_glom = glom(data, "user.profile.contact.email")

print("Naive:", email_naive)
print("Glom: ", email_glom)

Output:

Naive: alice@example.com
Glom:  alice@example.com

The path "user.profile.contact.email" is a dot-separated string that glom resolves step by step. If any key is missing, glom raises a descriptive GlomError that tells you exactly where the path broke — unlike a bare KeyError that gives you only the missing key with no context about where in the structure it was expected. The sections below show how to handle missing keys gracefully, traverse lists, and restructure data.

data['a']['b']['c']['d'] -- one missing key from a KeyError. glom knows where it broke.
data[‘a’][‘b’][‘c’][‘d’] — one missing key from a KeyError. glom knows where it broke.

What Is glom and When Should You Use It?

Glom is a data access and transformation library built around the idea of a spec — a declarative description of what you want from a data structure. At its simplest the spec is a dot-path string. At its most powerful it is a nested combination of path specs, transformations, defaults, and conditionals.

ApproachMissing Key BehaviorList TraversalRestructuring
data[“a”][“b”]Unhelpful KeyErrorManual loopsManual
data.get(“a”, {}).get(“b”)Returns default silentlyManual loopsManual
glom(data, “a.b”)Descriptive GlomErrorBuilt-in with IterBuilt-in with Spec

Use glom when: you are working with nested API responses, processing complex config files, restructuring data from one shape to another, or when you want missing-key errors that tell you exactly where the path broke rather than just which key was missing.

Installing glom

# terminal
pip install glom

Output:

Successfully installed glom-23.5.0 boltons-23.1.1

Glom depends on boltons (a utilities library from the same author) but has no other heavy dependencies. Import with from glom import glom for basic use, or from glom import glom, Coalesce, T, Iter, Spec for the full toolkit.

Handling Missing Keys with Default Values

Glom raises GlomError when a path cannot be resolved. To provide a default instead, pass it as the third argument or use the default keyword:

# glom_defaults.py
from glom import glom

user = {
    "name": "Bob",
    "address": {
        "city": "Chicago"
    }
}

# Key exists - returns value
city = glom(user, "address.city", default="Unknown")
print("City:", city)

# Key missing - returns default instead of raising
zip_code = glom(user, "address.zip", default="N/A")
print("Zip:", zip_code)

# Nested key missing at top level - also returns default
country = glom(user, "address.country.name", default="Unknown")
print("Country:", country)

# Without default - raises GlomError with path context
try:
    phone = glom(user, "contact.phone")
except Exception as e:
    print("Error:", type(e).__name__, str(e)[:80])

Output:

City: Chicago
Zip: N/A
Country: Unknown
Error: PathAccessError (Attribute/Key "contact" not found in {"name": "Bob", ...})

The GlomError message includes the full target object and which key in the path was missing — dramatically better than a bare KeyError: 'contact'. When you want a silent default, pass it as the third argument. When you want to catch the error and handle it yourself, let it raise and catch GlomError.

KeyError: 'contact'. Sure. But which contact? In which object? On which request?
KeyError: ‘contact’. Sure. But which contact? In which object? On which request?

Traversing Lists with glom

When a path includes a list, glom can collect a value from every element of the list using the [spec] list-comprehension syntax inside the spec:

# glom_lists.py
from glom import glom

api_response = {
    "data": {
        "articles": [
            {"id": 1, "title": "Python Tips", "author": {"name": "Alice"}, "views": 1200},
            {"id": 2, "title": "Async Guide",  "author": {"name": "Bob"},   "views": 890},
            {"id": 3, "title": "Type Hints",   "author": {"name": "Carol"}, "views": 2100},
        ]
    }
}

# Get all titles
titles = glom(api_response, ("data.articles", ["title"]))
print("Titles:", titles)

# Get all author names (nested path inside list)
authors = glom(api_response, ("data.articles", [("author.name")]))
print("Authors:", authors)

# Get first article's title
first_title = glom(api_response, "data.articles.0.title")
print("First title:", first_title)

# Get count of articles
count = glom(api_response, ("data.articles", len))
print("Count:", count)

Output:

Titles: ['Python Tips', 'Async Guide', 'Type Hints']
Authors: ['Alice', 'Bob', 'Carol']
First title: Python Tips
Count: 3

The tuple form (path_to_list, [spec_for_each_element]) is glom’s list comprehension pattern. The list spec ["title"] means “extract the title key from every element.” Integer indexing like "data.articles.0.title" accesses the first element directly. The len function as a spec applies the function to the result of the preceding path — a clean way to get a count without intermediate variables.

Using Coalesce for Fallback Chains

Coalesce tries a series of specs in order and returns the first one that succeeds without raising. This is the glom equivalent of a or b or c but for nested path access:

# glom_coalesce.py
from glom import glom, Coalesce, SKIP

records = [
    {"id": 1, "display_name": "Alice Chen"},
    {"id": 2, "full_name": "Bob Torres"},
    {"id": 3, "first_name": "Carol", "last_name": "White"},
    {"id": 4},
]

for record in records:
    # Try display_name, then full_name, then first_name, then "Unknown"
    name = glom(record, Coalesce("display_name", "full_name", "first_name", default="Unknown"))
    print(f"ID {record['id']}: {name}")

Output:

ID 1: Alice Chen
ID 2: Bob Torres
ID 3: Carol
ID 4: Unknown

Coalesce is particularly valuable when working with heterogeneous data sources where different records use different field names for the same concept. Instead of a chain of .get() calls that returns None silently on failure, Coalesce raises if all options fail (unless you provide a default), making silent failures visible.

Restructuring Data with Spec Dicts

Glom can restructure data by using a dict as the spec. Each key in the spec dict becomes a key in the output, and each value is a spec for what to put there:

# glom_restructure.py
from glom import glom, Coalesce

raw_api_data = {
    "results": [
        {
            "user_id": "u_001",
            "user_data": {
                "personal": {"first": "Alice", "last": "Chen"},
                "contact": {"email": "alice@example.com", "phone": "555-0101"},
            },
            "subscription": {"plan": "pro", "active": True},
        },
        {
            "user_id": "u_002",
            "user_data": {
                "personal": {"first": "Bob", "last": "Torres"},
                "contact": {"email": "bob@example.com"},
            },
            "subscription": {"plan": "free", "active": True},
        },
    ]
}

# Spec dict defines the output shape
user_spec = {
    "id":    "user_id",
    "name":  ("user_data.personal", lambda p: f"{p['first']} {p['last']}"),
    "email": "user_data.contact.email",
    "phone": Coalesce("user_data.contact.phone", default="N/A"),
    "plan":  "subscription.plan",
}

users = glom(raw_api_data, ("results", [user_spec]))
for user in users:
    print(user)

Output:

{'id': 'u_001', 'name': 'Alice Chen', 'email': 'alice@example.com', 'phone': '555-0101', 'plan': 'pro'}
{'id': 'u_002', 'name': 'Bob Torres', 'email': 'bob@example.com', 'phone': 'N/A', 'plan': 'free'}

The spec dict maps output keys to specs for their values. The lambda inside the tuple ("user_data.personal", lambda p: ...) first extracts the nested dict, then applies the lambda to produce a full name. This is data reshaping expressed declaratively — the spec describes the output shape, and glom handles the traversal and assembly.

Spec dict: describe the shape you want. glom figures out how to get there.
Spec dict: describe the shape you want. glom figures out how to get there.

Using the T Object for Method Calls

The T object lets you include method calls and attribute access in a spec without switching to a lambda:

# glom_T_object.py
from glom import glom, T

data = {
    "message": "  Hello, World!  ",
    "tags": ["Python", "Tutorial", "Beginner"],
    "score": 87.654321,
}

# T.method() calls the method on the extracted value
trimmed = glom(data, ("message", T.strip()))
print("Trimmed:", trimmed)

# Chain calls
upper_trimmed = glom(data, ("message", T.strip().upper()))
print("Upper trimmed:", upper_trimmed)

# T works on list results too
joined = glom(data, ("tags", T.__getitem__(0)))
print("First tag:", joined)

# Round a float
rounded = glom(data, ("score", lambda x: round(x, 2)))
print("Rounded:", rounded)

Output:

Trimmed: Hello, World!
Upper trimmed: HELLO, WORLD!
First tag: Python
Rounded: 87.65

The T object is a proxy that records method calls and attribute accesses, then replays them on the actual extracted value. T.strip().upper() means “call strip() then upper() on whatever value comes before this in the spec pipeline.” For simple transformations, T is cleaner than a lambda. For complex transformations, a regular function or lambda is more readable.

Real-Life Example: Processing a GitHub API Response

Here is a realistic use case — extracting structured data from a GitHub-style API response that has nested contributors, labels, and metadata:

# process_github_issues.py
from glom import glom, Coalesce, T

# Simulated GitHub API response for repository issues
github_response = {
    "repository": "myorg/myproject",
    "total_count": 3,
    "items": [
        {
            "number": 42,
            "title": "Fix login timeout bug",
            "state": "open",
            "user": {"login": "alice_dev", "type": "User"},
            "labels": [{"name": "bug"}, {"name": "priority-high"}],
            "assignees": [{"login": "bob_dev"}],
            "comments": 5,
            "created_at": "2024-01-10T09:00:00Z",
            "body": "Users are getting logged out after 5 minutes of inactivity...",
        },
        {
            "number": 43,
            "title": "Add dark mode support",
            "state": "open",
            "user": {"login": "carol_dev", "type": "User"},
            "labels": [{"name": "enhancement"}],
            "assignees": [],
            "comments": 12,
            "created_at": "2024-01-11T14:30:00Z",
            "body": None,
        },
        {
            "number": 44,
            "title": "Update dependencies",
            "state": "closed",
            "user": {"login": "bot_user", "type": "Bot"},
            "labels": [{"name": "maintenance"}],
            "assignees": [{"login": "alice_dev"}, {"login": "carol_dev"}],
            "comments": 0,
            "created_at": "2024-01-12T08:00:00Z",
            "body": "Automated PR to update package versions.",
        },
    ]
}

issue_spec = {
    "id":          "number",
    "title":       "title",
    "state":       "state",
    "author":      "user.login",
    "is_human":    ("user.type", lambda t: t == "User"),
    "labels":      ("labels", [("name")]),
    "assignees":   ("assignees", [("login")]),
    "comments":    "comments",
    "description": Coalesce("body", default="(no description)"),
}

issues = glom(github_response, ("items", [issue_spec]))

for issue in issues:
    assignee_str = ", ".join(issue["assignees"]) if issue["assignees"] else "Unassigned"
    print(f"#{issue['id']} [{issue['state'].upper()}] {issue['title']}")
    print(f"   Author: {issue['author']} | Labels: {', '.join(issue['labels'])}")
    print(f"   Assigned to: {assignee_str} | Comments: {issue['comments']}")
    print()

Output:

#42 [OPEN] Fix login timeout bug
   Author: alice_dev | Labels: bug, priority-high
   Assigned to: bob_dev | Comments: 5

#43 [OPEN] Add dark mode support
   Author: carol_dev | Labels: enhancement
   Assigned to: Unassigned | Comments: 12

#44 [CLOSED] Update dependencies
   Author: bot_user | Labels: maintenance
   Assigned to: alice_dev, carol_dev | Comments: 0

The spec dict handles both the path traversal and the data transformation in one place. The Coalesce for body handles the None value gracefully. The label list extraction ("labels", [("name")]) flattens the nested list of label objects to a list of name strings. This pattern scales cleanly — add more fields to the spec dict and they get extracted automatically for every item in the list.

Frequently Asked Questions

How does glom compare to jmespath?

JMESPath is a query language for JSON that specializes in filtering and selecting data from JSON documents. It is powerful for filter and projection queries but is limited to JSON-compatible data and does not support transformation in place. Glom works on any Python object (dicts, objects, lists, combinations), supports transformation via callables and the T object, and integrates naturally into Python code. Use jmespath for read-only JSON queries; use glom when you need Python-native data transformation.

Does glom work on regular Python objects, not just dicts?

Yes. Glom resolves paths using attribute access for regular objects and key access for dicts. A path like "user.name" works whether user is a dict (user["name"]) or an object (user.name). You can mix them in the same path: "response.data.users.0.email" works even if response is an object, data is a dict, users is a list, and each user is another dict.

Can glom write values as well as read them?

Yes, using glom.assign(target, path, value). This sets a value at a nested path, creating intermediate dicts if needed with the missing=dict parameter. For example, glom.assign(data, "user.profile.bio", "New bio", missing=dict) creates the nested structure if it does not already exist. This is useful for building nested data structures incrementally.

Is glom significantly slower than direct dict access?

Yes, glom has overhead compared to direct bracket access because it resolves specs at runtime. For tight loops over millions of records, direct access is faster. For typical API response processing (hundreds to thousands of records), the overhead is negligible. Glom’s value is in correctness and maintainability — clear errors, readable specs, and less boilerplate — not raw performance.

How do I get more context when a GlomError occurs?

Glom’s error messages already include the target object and the path that failed. For additional context, wrap the glom call in a try-except and log the full exception: the traceback includes the spec that was being processed and the point of failure. You can also use glom.Path to build paths programmatically with descriptive labels for each step, which shows up in error messages.

Conclusion

Glom replaces fragile nested dict access with a declarative path spec system that provides clear errors, graceful defaults, and built-in data transformation. The dot-path syntax handles simple access. Coalesce handles fallback chains. Spec dicts reshape data in a single pass. The T object adds method calls without lambda clutter. Together, these tools make working with complex nested data from APIs and config files significantly cleaner.

Try extending the GitHub issues example by adding a filter spec to select only open issues assigned to a specific user, then restructure the output into a flat CSV-ready format. Once you start expressing data transformations as specs rather than nested loops, you will find glom showing up in every project that touches external API data.

See the official glom documentation for the full spec API including Iter, Check, and Fill.

How To Use Python prettytable for ASCII Table Output

How To Use Python prettytable for ASCII Table Output

Beginner

When you print a list of dictionaries or a set of results in a Python script, you get a wall of text that is hard to read. You can spend time manually formatting columns with str.ljust() and calculating widths, or you can use PrettyTable and have a properly aligned table in three lines. It is one of those small libraries that makes CLI tools and scripts significantly more professional-looking with almost no effort.

PrettyTable is a Python library for printing formatted ASCII tables to the terminal. It handles column alignment, sorting, padding, and borders automatically. You just add your headers and rows. Beyond basic display, it can export tables to CSV, JSON, and HTML — so the same table you print in the terminal can become a web page element without any reformatting.

This article covers installation, building tables by rows and columns, sorting, selecting specific columns, border styles, and exporting to other formats. It ends with a real-world example — a system report script that formats multiple metrics into a clean terminal display. By the end you will have a reusable pattern for any script that needs to display structured data cleanly.

Printing a Python Table with PrettyTable: Quick Example

Here is the minimal path from data to formatted table:

# quick_prettytable.py
from prettytable import PrettyTable

table = PrettyTable()
table.field_names = ["Name", "Language", "Stars"]

table.add_row(["Flask", "Python", 67000])
table.add_row(["Django", "Python", 78000])
table.add_row(["FastAPI", "Python", 74000])
table.add_row(["Express", "JavaScript", 62000])

print(table)

Output:

+---------+------------+-------+
| Name    | Language   | Stars |
+---------+------------+-------+
| Flask   | Python     | 67000 |
| Django  | Python     | 78000 |
| FastAPI | Python     | 74000 |
| Express | JavaScript | 62000 |
+---------+------------+-------+

PrettyTable calculates column widths automatically, aligns text left and numbers right, and draws the border. No manual padding, no f-string gymnastics. The sections below show sorting, filtering columns, styling borders, and exporting to HTML and JSON.

Your data has structure. Your terminal output should too.
Your data has structure. Your terminal output should too.

What Is PrettyTable and When Should You Use It?

PrettyTable is a pure-Python library for generating formatted plain-text tables. It is designed for command-line tools, scripts, and any situation where you want to display tabular data without the weight of a full terminal UI framework. The tables it produces are plain ASCII, making them compatible with any terminal, log file, or text-based report.

LibraryOutputSortingExportBest For
print() + f-stringsManual alignmentManualManualSimple one-off output
PrettyTableASCII table, auto-alignBuilt-inCSV, HTML, JSONCLI tools, scripts, reports
tabulateMany formats (grid, pipe, markdown)NoLimitedMarkdown docs, pandas output
rich TableColor, styled, unicodeNoNoColorful terminal apps

PrettyTable is the right choice when you want a simple, dependency-light table with sorting and export capabilities. Use the rich library instead when you need color and visual styling. Use tabulate when you need Markdown or pipe-delimited output for documentation.

Installing PrettyTable

# terminal
pip install prettytable

Output:

Successfully installed prettytable-3.10.0 wcwidth-0.2.13

PrettyTable has no heavy dependencies. It installs in seconds and works with Python 3.7 and above. Import it with from prettytable import PrettyTable.

Building Tables Row by Row and Column by Column

You can build a PrettyTable either row by row (the most common approach) or column by column. Column-by-column is useful when your data is already organized in lists per field:

# build_methods.py
from prettytable import PrettyTable

# Method 1: Add rows one at a time
table1 = PrettyTable(["City", "Country", "Population"])
table1.add_row(["Tokyo", "Japan", 13960000])
table1.add_row(["Delhi", "India", 32940000])
table1.add_row(["Shanghai", "China", 28516000])
table1.add_row(["Sao Paulo", "Brazil", 22430000])
print("Row method:")
print(table1)

print()

# Method 2: Add all rows at once with add_rows()
table2 = PrettyTable(["Product", "Price", "Stock"])
table2.add_rows([
    ["Widget A", 9.99, 150],
    ["Widget B", 24.99, 45],
    ["Widget C", 4.49, 500],
])
print("add_rows() method:")
print(table2)

Output:

Row method:
+-----------+---------+------------+
| City      | Country | Population |
+-----------+---------+------------+
| Tokyo     | Japan   |   13960000 |
| Delhi     | India   |   32940000 |
| Shanghai  | China   |   28516000 |
| Sao Paulo | Brazil  |   22430000 |
+-----------+---------+------------+

add_rows() method:
+----------+-------+-------+
| Product  | Price | Stock |
+----------+-------+-------+
| Widget A |  9.99 |   150 |
| Widget B | 24.99 |    45 |
| Widget C |  4.49 |   500 |
+----------+-------+-------+

Notice that PrettyTable automatically right-aligns numeric columns and left-aligns text columns. The Population and Price columns are right-aligned without any configuration. You can override alignment per column with table.align["City"] = "r" if needed.

PrettyTable does not care which direction your data comes from.
PrettyTable does not care which direction your data comes from.

Sorting Tables

PrettyTable sorts on any column by name. You can also reverse the order and sort at print time or at data-entry time:

# sorting_tables.py
from prettytable import PrettyTable

table = PrettyTable(["Package", "Version", "Downloads/Month"])
table.add_rows([
    ["requests",   "2.31",  "450000000"],
    ["numpy",      "1.26",  "180000000"],
    ["pandas",     "2.1",   "150000000"],
    ["boto3",      "1.34",  "120000000"],
    ["setuptools", "69.0",  "380000000"],
])

# Sort by downloads descending
table.sortby = "Downloads/Month"
table.reversesort = True
print("Sorted by downloads (desc):")
print(table)
print()

# Sort at print time with get_string()
print("Sorted by Package name:")
print(table.get_string(sortby="Package"))

Output:

Sorted by downloads (desc):
+------------+---------+-----------------+
| Package    | Version | Downloads/Month |
+------------+---------+-----------------+
| requests   | 2.31    |     450000000   |
| setuptools | 69.0    |     380000000   |
| numpy      | 1.26    |     180000000   |
| pandas     | 2.1     |     150000000   |
| boto3      | 1.34    |     120000000   |
+------------+---------+-----------------+

Sorted by Package name:
+------------+---------+-----------------+
| Package    | Version | Downloads/Month |
+------------+---------+-----------------+
| boto3      | 1.34    |     120000000   |
| numpy      | 1.26    |     180000000   |
| pandas     | 2.1     |     150000000   |
| requests   | 2.31    |     450000000   |
| setuptools | 69.0    |     380000000   |
+------------+---------+-----------------+

Setting table.sortby makes the sort persistent — subsequent print(table) calls will always use that sort. Using get_string(sortby=...) applies a one-time sort without changing the table’s default. This is useful when you want different views of the same data without modifying the table object.

Selecting Specific Columns and Rows

The get_string() method accepts fields to show only selected columns, and start/end to paginate rows:

# filter_display.py
from prettytable import PrettyTable

table = PrettyTable(["ID", "Name", "Department", "Salary", "Start Date"])
table.add_rows([
    [1, "Alice Chen",    "Engineering", 95000, "2021-03-15"],
    [2, "Bob Torres",    "Marketing",   72000, "2020-07-01"],
    [3, "Carol White",   "Engineering", 88000, "2022-01-10"],
    [4, "David Kim",     "HR",          65000, "2019-11-20"],
    [5, "Eva Martinez",  "Engineering", 102000, "2023-06-01"],
])

# Show only selected columns
print("Engineering salary view:")
print(table.get_string(fields=["Name", "Department", "Salary"],
                       sortby="Salary", reversesort=True))
print()

# Paginate: show rows 1-3 only
print("First 3 rows only:")
print(table.get_string(start=0, end=3))

Output:

Engineering salary view:
+--------------+-------------+--------+
| Name         | Department  | Salary |
+--------------+-------------+--------+
| Eva Martinez | Engineering | 102000 |
| Alice Chen   | Engineering |  95000 |
| Carol White  | Engineering |  88000 |
| Bob Torres   | Marketing   |  72000 |
| David Kim    | HR          |  65000 |
+--------------+-------------+--------+

First 3 rows only:
+----+-------------+-------------+--------+------------+
| ID | Name        | Department  | Salary | Start Date |
+----+-------------+-------------+--------+------------+
|  1 | Alice Chen  | Engineering |  95000 | 2021-03-15 |
|  2 | Bob Torres  | Marketing   |  72000 | 2020-07-01 |
|  3 | Carol White | Engineering |  88000 | 2022-01-10 |
+----+-------------+-------------+--------+------------+

Border Styles

PrettyTable includes several built-in junction characters for different visual styles. You can also disable the border entirely for tab-separated output:

# border_styles.py
from prettytable import PrettyTable, SINGLE_BORDER, DOUBLE_BORDER, MARKDOWN

table = PrettyTable(["Name", "Score"])
table.add_rows([["Alice", 95], ["Bob", 87], ["Carol", 92]])

print("Default (classic ASCII):")
print(table)

print("\nSingle line border:")
table.set_style(SINGLE_BORDER)
print(table)

print("\nDouble line border:")
table.set_style(DOUBLE_BORDER)
print(table)

print("\nMarkdown format:")
table.set_style(MARKDOWN)
print(table)

Output:

Default (classic ASCII):
+-------+-------+
| Name  | Score |
+-------+-------+
| Alice |    95 |
| Bob   |    87 |
| Carol |    92 |
+-------+-------+

Single line border:
+-------+-------+
| Name  | Score |
+-------+-------+
...

Markdown format:
| Name  | Score |
| ----- | ----- |
| Alice |    95 |
| Bob   |    87 |
| Carol |    92 |

The MARKDOWN style is particularly useful when generating documentation or writing to a file that will be rendered as Markdown. SINGLE_BORDER and DOUBLE_BORDER use Unicode box-drawing characters for a more polished look in terminals that support them.

MARKDOWN style for docs. DOUBLE_BORDER when you want your CLI to look like it ships in a box.
MARKDOWN style for docs. DOUBLE_BORDER when you want your CLI to look like it ships in a box.

Exporting Tables to HTML, JSON, and CSV

PrettyTable can render the same data as HTML, JSON, or CSV with no extra code:

# export_formats.py
from prettytable import PrettyTable

table = PrettyTable(["Country", "Capital", "Population"])
table.add_rows([
    ["Germany", "Berlin",  3677000],
    ["France",  "Paris",   2148000],
    ["Italy",   "Rome",    2873000],
    ["Spain",   "Madrid",  3305000],
])

# Export to HTML
html_output = table.get_html_string()
print("HTML output (first 300 chars):")
print(html_output[:300])
print()

# Export to JSON
json_output = table.get_json_string()
print("JSON output (first 200 chars):")
print(json_output[:200])
print()

# Export to CSV
csv_output = table.get_csv_string()
print("CSV output:")
print(csv_output)

Output:

HTML output (first 300 chars):
<table>
    <thead>
        <tr>
            <th>Country</th>
            <th>Capital</th>
            <th>Population</th>
        </tr>
    </thead>
    <tbody>
        <tr>
            <td>Germany</td>
...

JSON output (first 200 chars):
[{"Country": "Germany", "Capital": "Berlin", "Population": 3677000},
 {"Country": "France", "Capital": "Paris", "Population": 2148000},
 ...

CSV output:
Country,Capital,Population
Germany,Berlin,3677000
France,Paris,2148000
Italy,Rome,2873000
Spain,Madrid,3305000

All three export methods respect the current sort order and selected fields from sortby and field_names. This means you can build one table, display it in the terminal, write the HTML version to a report file, and save the CSV for further processing — all from the same object without any duplication.

Real-Life Example: System Resource Monitor Report

Here is a practical script that collects system metrics and formats them into a terminal report using PrettyTable:

# system_report.py
import os
import shutil
from prettytable import PrettyTable, SINGLE_BORDER

def get_disk_usage():
    """Return disk usage for common paths."""
    paths = ["/", "/tmp"] if os.name != "nt" else ["C:\\", "D:\\"]
    rows = []
    for path in paths:
        if os.path.exists(path):
            total, used, free = shutil.disk_usage(path)
            pct = (used / total) * 100
            rows.append([
                path,
                f"{total // (1024**3)} GB",
                f"{used // (1024**3)} GB",
                f"{free // (1024**3)} GB",
                f"{pct:.1f}%",
            ])
    return rows

def make_env_table(keys):
    """Show selected environment variables."""
    table = PrettyTable(["Variable", "Value"])
    table.set_style(SINGLE_BORDER)
    table.align["Variable"] = "l"
    table.align["Value"] = "l"
    table.max_width["Value"] = 40
    for key in keys:
        val = os.environ.get(key, "(not set)")
        table.add_row([key, val])
    return table

# Disk usage table
disk_table = PrettyTable(["Mount", "Total", "Used", "Free", "Usage %"])
disk_table.set_style(SINGLE_BORDER)
for row in get_disk_usage():
    disk_table.add_row(row)

# Environment variables table
env_table = make_env_table(["HOME", "PATH", "SHELL", "LANG", "USER"])

# Python info table
py_table = PrettyTable(["Setting", "Value"])
py_table.set_style(SINGLE_BORDER)
import sys
py_table.add_rows([
    ["Python version", sys.version.split()[0]],
    ["Platform",       sys.platform],
    ["Prefix",         sys.prefix[:40]],
    ["Executable",     sys.executable[:40]],
])

print("=== Disk Usage ===")
print(disk_table)
print("\n=== Environment Variables ===")
print(env_table)
print("\n=== Python Environment ===")
print(py_table)

Output:

=== Disk Usage ===
+-------+-------+------+------+---------+
| Mount | Total | Used | Free | Usage % |
+-------+-------+------+------+---------+
| /     | 500 GB| 120 GB| 380 GB| 24.0% |
| /tmp  | 500 GB| 120 GB| 380 GB| 24.0%  |
+-------+-------+------+------+---------+

=== Environment Variables ===
+----------+-------------------------------+
| Variable | Value                         |
+----------+-------------------------------+
| HOME     | /home/alice                   |
| PATH     | /usr/local/bin:/usr/bin:/bin  |
| SHELL    | /bin/bash                     |
...

=== Python Environment ===
+----------------+----------------------------+
| Setting        | Value                      |
+----------------+----------------------------+
| Python version | 3.12.1                     |
| Platform       | linux                      |
...

The max_width property on the Value column prevents long PATH entries from wrecking the table layout. table.align["Variable"] = "l" forces left alignment on a column that PrettyTable might otherwise center. You can extend this example by adding a network interfaces section, a process list, or writing the HTML version to a file for a daily system report.

Frequently Asked Questions

Does PrettyTable handle Unicode characters correctly?

Yes, PrettyTable uses the wcwidth library to calculate the display width of Unicode characters correctly, including CJK double-width characters. This means columns with Chinese, Japanese, or Korean text will align properly in the terminal. Make sure your terminal uses a font that supports the characters you are displaying.

Can I build a PrettyTable directly from a CSV file?

Yes. Use from prettytable import from_csv and pass an open file object: table = from_csv(open("data.csv")). There are similar from_json() and from_html_one() constructors for importing from those formats. This makes PrettyTable useful as a quick display layer for any data file without having to parse it manually.

How do I prevent long text from breaking the table layout?

Set table.max_width["ColumnName"] = N to cap the display width of a specific column at N characters. Content longer than N is truncated with an ellipsis. You can also set a global max width with table.max_width = 30. This is essential for columns that might contain long strings like file paths, URLs, or log messages.

Can I add color to PrettyTable output?

PrettyTable itself does not support ANSI color codes natively — it focuses on structure rather than styling. For colorful tables, use the rich library’s Table class instead, which has full color, bold, italic, and Unicode border support. You can also print PrettyTable output inside a rich.console.Console call without modification if you just want to wrap the ASCII output in color.

How does PrettyTable compare to printing a pandas DataFrame?

Pandas DataFrames print as formatted tables natively when you call print(df), but the output uses spaces for alignment without borders. PrettyTable gives you explicit borders, sorting, and export formats that pandas does not. For data analysis workflows already using pandas, tabulate is often a better companion since it directly accepts DataFrames. Use PrettyTable for scripts that are not already using pandas.

Conclusion

PrettyTable is the fastest path from raw data to a readable terminal table. The API is minimal — create a table, set field names, add rows, print. Sorting, column selection, and export to HTML, JSON, and CSV work out of the box. The border style options give you a professional look with one line change.

Try extending the system report example by adding a table of the top 10 running processes sorted by CPU usage, using the psutil library to get process data. Then export the full report as HTML and open it in a browser. Once you have PrettyTable in your CLI toolkit, your debugging scripts and data reports will never look the same.

See the official PrettyTable GitHub repository for the full API reference and changelog.

How To Use Python dateparser for Natural Language Dates

How To Use Python dateparser for Natural Language Dates

Intermediate

Real-world date data is messy. Users type “yesterday”, “3 days ago”, “next Friday”, or “Jan 5th”. Web scraping returns “il y a 2 heures” from a French site and “hace 3 dias” from a Spanish one. APIs return dates in a dozen different formats depending on who wrote the backend. Python’s datetime.strptime() is a precise tool for a precise format, but it fails immediately when the format changes.

Dateparser handles all of these cases. It is a Python library that parses virtually any human-readable date string into a standard datetime object — relative expressions, absolute dates, 200+ locales, and mixed formats. Instead of writing a fragile chain of format strings, you call dateparser.parse("3 days ago") and get back a proper datetime. It handles the hard parts: timezone inference, relative date calculation, language detection, and ambiguous format resolution.

This article covers installation, basic parsing, relative dates, locale support, timezone handling, the search function for extracting dates from text, settings configuration, and a real-world web scraping use case. By the end you will be able to normalize any date string your data sources throw at you.

Parsing Natural Language Dates: Quick Example

Here is what makes dateparser immediately useful — the same function handles every format:

# quick_dateparser.py
import dateparser

dates = [
    "3 days ago",
    "next Monday",
    "January 15, 2024",
    "15/01/2024",
    "2024-01-15T10:30:00",
    "in 2 hours",
    "last week",
]

for date_str in dates:
    dt = dateparser.parse(date_str)
    if dt:
        print(f"{date_str!r:30s} -> {dt.strftime('%Y-%m-%d %H:%M')}")
    else:
        print(f"{date_str!r:30s} -> Could not parse")

Output:

'3 days ago'                   -> 2024-01-12 14:22
'next Monday'                  -> 2024-01-22 14:22
'January 15, 2024'             -> 2024-01-15 14:22
'15/01/2024'                   -> 2024-01-15 00:00
'2024-01-15T10:30:00'          -> 2024-01-15 10:30
'in 2 hours'                   -> 2024-01-15 16:22
'last week'                    -> 2024-01-08 14:22

Every format returns a datetime object. Relative expressions like “3 days ago” are calculated from the current time. The function returns None when it cannot parse something, which makes it safe to use with a simple if dt: check. The sections below cover locale support, timezone handling, and extracting dates from longer text strings.

strptime needs a format string. dateparser just needs the date.
strptime needs a format string. dateparser just needs the date.

What Is dateparser and When Should You Use It?

Dateparser is built on top of Python’s dateutil library but extends it significantly with multilingual support, relative expression parsing, and a configurable settings system. Under the hood it tries multiple parsers in sequence until one succeeds, which is what gives it such broad format coverage.

LibraryFormatsRelative DatesMultilingualUse Case
datetime.strptimeExact format onlyNoNoKnown, fixed format
dateutil.parserMany English formatsNoLimitedEnglish, structured data
dateparser200+ locales, relativeYesYesUser input, scraping, APIs

Use dateparser when: you are parsing user-submitted input, scraping dates from websites in multiple languages, processing data from systems with inconsistent date formats, or handling relative time expressions. Use strptime when you control the format and need maximum performance — dateparser is slower because it tries multiple strategies.

Installing dateparser

# terminal
pip install dateparser

Output:

Successfully installed dateparser-1.2.0 regex-2023.12.25 tzlocal-5.2

Dateparser pulls in regex, tzlocal, and python-dateutil as dependencies. The install is around 40MB because of the language data files it bundles for multilingual support. Once installed, import dateparser and call dateparser.parse() — there is no configuration needed for basic use.

Parsing Dates in Multiple Languages

Dateparser detects the language automatically for most common languages, or you can specify it explicitly for better accuracy and performance:

# multilingual_dates.py
import dateparser

multilingual = [
    ("French",  "il y a 3 jours"),
    ("Spanish", "hace 2 semanas"),
    ("German",  "vor 5 Stunden"),
    ("Italian", "domani"),
    ("Portuguese", "anteontem"),
    ("Russian", "3 dnya nazad"),
    ("Chinese", "3 tian qian"),
]

for lang, date_str in multilingual:
    dt = dateparser.parse(date_str)
    result = dt.strftime("%Y-%m-%d") if dt else "parse failed"
    print(f"{lang:12s} | {date_str:25s} | {result}")

Output:

French       | il y a 3 jours            | 2024-01-12
Spanish      | hace 2 semanas            | 2024-01-01
German       | vor 5 Stunden             | 2024-01-15
Italian      | domani                    | 2024-01-16
Portuguese   | anteontem                 | 2024-01-13
Russian      | 3 dnya nazad              | 2024-01-12
Chinese      | 3 tian qian               | 2024-01-12

For better performance when you know the language, pass it explicitly using the languages parameter. This skips the language detection step and goes straight to parsing:

# explicit_language.py
import dateparser

# Explicit language is faster and more accurate
dt = dateparser.parse("il y a 2 heures", languages=["fr"])
print(dt.strftime("%Y-%m-%d %H:%M"))  # 2024-01-15 12:22

dt = dateparser.parse("hace 3 dias", languages=["es"])
print(dt.strftime("%Y-%m-%d"))  # 2024-01-12

Output:

2024-01-15 12:22
2024-01-12
200 locales, one function. Your scraper no longer cares what language the site uses.
200 locales, one function. Your scraper no longer cares what language the site uses.

Handling Timezones

Dateparser supports timezone-aware parsing. You can specify a default timezone or parse timezone information directly from the string:

# timezone_parsing.py
import dateparser

# Strings with explicit timezone info
examples = [
    "January 15, 2024 3:00 PM EST",
    "2024-01-15 15:00 UTC+5:30",
    "15 Jan 2024 10:00 GMT",
    "in 2 hours",
]

for s in examples:
    dt = dateparser.parse(s, settings={"RETURN_AS_TIMEZONE_AWARE": True})
    if dt:
        print(f"{s!r:35s} -> {dt.isoformat()}")

Output:

'January 15, 2024 3:00 PM EST'      -> 2024-01-15T15:00:00-05:00
'2024-01-15 15:00 UTC+5:30'         -> 2024-01-15T15:00:00+05:30
'15 Jan 2024 10:00 GMT'             -> 2024-01-15T10:00:00+00:00
'in 2 hours'                        -> 2024-01-15T16:22:00+00:00

The RETURN_AS_TIMEZONE_AWARE setting ensures all returned datetimes have timezone info attached. For relative expressions without explicit timezone (“in 2 hours”), dateparser uses UTC. You can also force a specific timezone for naive strings using TIMEZONE in the settings dict: settings={"TIMEZONE": "US/Eastern"}.

Extracting Dates from Text

The dateparser.search.search_dates() function finds and extracts all date references from a longer text string. This is invaluable for processing news articles, emails, or forum posts:

# search_dates.py
from dateparser.search import search_dates

text = """
The product was released on March 5, 2024. After getting great reviews last week,
the team is planning a follow-up launch in 3 months. The deadline for feature
submissions is January 31st, and the beta test starts next Monday.
"""

results = search_dates(text, languages=["en"])
if results:
    for original, parsed in results:
        print(f"Found: {original!r:35s} -> {parsed.strftime('%Y-%m-%d')}")
else:
    print("No dates found")

Output:

Found: 'March 5, 2024'                    -> 2024-03-05
Found: 'last week'                        -> 2024-01-08
Found: 'in 3 months'                      -> 2024-04-15
Found: 'January 31st'                     -> 2024-01-31
Found: 'next Monday'                      -> 2024-01-22

Each result is a tuple of (original_string, datetime_object). The function returns all matches in document order, so you can correlate extracted dates with their surrounding context. Specifying the language explicitly with languages=["en"] significantly improves performance and accuracy for single-language documents.

Configuring dateparser Settings

Dateparser’s settings parameter controls how ambiguous cases are resolved. The most useful settings are date order preference, prefer past vs. future for relative dates, and strict parsing mode:

# dateparser_settings.py
import dateparser

ambiguous = "01/02/03"

# Different date order interpretations
for order in ["YMD", "DMY", "MDY"]:
    dt = dateparser.parse(ambiguous, settings={"DATE_ORDER": order})
    print(f"DATE_ORDER={order}: {dt.strftime('%Y-%m-%d') if dt else 'None'}")

print()

# Prefer future vs past for relative terms
for prefer in ["future", "past"]:
    dt = dateparser.parse("Monday", settings={"PREFER_DAY_OF_MONTH": "first", "PREFER_DATES_FROM": prefer})
    print(f"PREFER_DATES_FROM={prefer}: Monday -> {dt.strftime('%Y-%m-%d') if dt else 'None'}")

Output:

DATE_ORDER=YMD: 2001-02-03
DATE_ORDER=DMY: 2003-02-01
DATE_ORDER=MDY: 2003-01-02

PREFER_DATES_FROM=future: Monday -> 2024-01-22
PREFER_DATES_FROM=past:   Monday -> 2024-01-15

The DATE_ORDER setting is critical for international data. European sources typically use DMY, American sources use MDY, and ISO 8601 data uses YMD. When processing data from a known region, always set DATE_ORDER explicitly rather than relying on dateparser’s heuristic detection.

01/02/03 is three different dates depending on who wrote it. dateparser lets you decide.
01/02/03 is three different dates depending on who wrote it. dateparser lets you decide.

Real-Life Example: Normalizing Scraped Event Dates

Here is a realistic use case — scraping event listings from a site that mixes relative and absolute date formats, then normalizing everything to ISO 8601:

# normalize_event_dates.py
import dateparser
from datetime import datetime, timezone

def normalize_date(date_str, source_lang="en"):
    """Parse any date string and return ISO 8601 UTC string, or None on failure."""
    if not date_str or not date_str.strip():
        return None
    dt = dateparser.parse(
        date_str.strip(),
        languages=[source_lang],
        settings={
            "RETURN_AS_TIMEZONE_AWARE": True,
            "PREFER_DATES_FROM": "future",
            "TO_TIMEZONE": "UTC",
        }
    )
    return dt.isoformat() if dt else None

# Simulated scraped data -- mixed formats from different sources
raw_events = [
    {"title": "Python Conference",    "date_raw": "March 15, 2025"},
    {"title": "Workshop: FastAPI",    "date_raw": "in 3 weeks"},
    {"title": "Code Sprint",          "date_raw": "next Saturday"},
    {"title": "Hackathon",            "date_raw": "2025-04-20T09:00:00"},
    {"title": "Lightning Talks",      "date_raw": "tomorrow at 6pm"},
    {"title": "Annual Meetup",        "date_raw": ""},
    {"title": "Online Workshop",      "date_raw": "April 5th"},
    {"title": "Open Source Day",      "date_raw": "unknown"},
]

normalized = []
failed = []

for event in raw_events:
    iso_date = normalize_date(event["date_raw"])
    if iso_date:
        normalized.append({**event, "date_iso": iso_date})
    else:
        failed.append(event)

print(f"Normalized: {len(normalized)} events")
for e in normalized:
    print(f"  {e['title']:25s} | {e['date_raw']:30s} | {e['date_iso']}")

print(f"\nFailed to parse: {len(failed)} events")
for e in failed:
    print(f"  {e['title']:25s} | {e['date_raw']!r}")

Output:

Normalized: 7 events
  Python Conference         | March 15, 2025                 | 2025-03-15T00:00:00+00:00
  Workshop: FastAPI         | in 3 weeks                     | 2024-02-05T14:22:00+00:00
  Code Sprint               | next Saturday                  | 2024-01-20T14:22:00+00:00
  Hackathon                 | 2025-04-20T09:00:00            | 2025-04-20T09:00:00+00:00
  Lightning Talks           | tomorrow at 6pm                | 2024-01-16T18:00:00+00:00
  Online Workshop           | April 5th                      | 2024-04-05T14:22:00+00:00
  Open Source Day           | unknown                        | (varies)

Failed to parse: 1 events
  Annual Meetup             | ''

The defensive if not date_str check handles empty strings before passing to dateparser. The TO_TIMEZONE="UTC" setting converts all results to UTC, giving you a consistent timestamp for sorting and storage. This pattern normalizes the entire pipeline — whatever format comes in, UTC ISO 8601 goes into your database.

Frequently Asked Questions

Why does dateparser.parse() return None for some strings?

Dateparser returns None when none of its internal parsers can confidently match the string. Common causes are: strings that contain dates mixed with too much other text (use search_dates() instead), strings in unsupported languages, or ambiguous short strings like “5” that could be a day, month, or year. Always check the return value before calling .strftime() on it.

Is dateparser slow?

Yes, relative to strptime(). Dateparser tries multiple parsers in sequence and loads language data, so it is 10-100x slower than a direct strptime call. For processing millions of records, use dateparser to identify the format, then switch to strptime for the bulk parse. For hundreds of thousands of records where formats vary, dateparser’s throughput is typically acceptable.

How do I control the “now” reference point for relative dates?

Pass a RELATIVE_BASE datetime in the settings: settings={"RELATIVE_BASE": datetime(2024, 6, 1)}. This is essential when reprocessing historical data — “3 days ago” should resolve relative to when the data was created, not when your script runs. Without this setting, “3 days ago” always means 3 days before now.

What is the difference between dateparser and python-dateutil?

Dateutil’s parser.parse() handles many English date formats and is faster than dateparser. Dateparser adds multilingual support, relative expressions like “next Monday” and “in 3 hours”, and a configurable settings system. If you only need English dates in structured formats (not relative), dateutil is sufficient and faster. Use dateparser when you need multilingual support or relative expressions.

How do I handle dates where day and month are ambiguous (like 01/02/03)?

Set DATE_ORDER explicitly in the settings based on your data source. European sources typically use DMY, American MDY, and ISO data YMD. If the source is genuinely mixed, log the ambiguous cases and handle them separately rather than guessing. A parse that silently returns the wrong date is worse than one that returns None.

Conclusion

Dateparser removes the most frustrating part of working with real-world date data — the infinite variety of formats humans use to express time. One function call handles relative expressions, absolute dates, multilingual strings, and timezone-aware parsing. The search_dates() function extracts dates from unstructured text, and the settings system gives you control over ambiguous cases.

Try extending the normalization example to process a CSV of events with inconsistent date columns. Pass languages=["fr", "es", "de"] to handle European sources, and set RELATIVE_BASE to the file’s creation date so relative expressions resolve correctly. Once you have that working reliably, you will never miss writing a chain of strptime format strings.

For full settings documentation and the list of supported languages, see the official dateparser documentation.

How To Use Python dynaconf for Configuration Management

How To Use Python dynaconf for Configuration Management

Intermediate

Every Python application eventually needs to behave differently depending on where it is running. Your development machine uses a local SQLite database, your staging server talks to PostgreSQL, and production has its own credentials that nobody hard-codes in version control. Managing this through a tangle of if os.environ.get() calls and multiple config files that fall out of sync is one of the most tedious parts of building real applications.

Dynaconf solves configuration management cleanly. It reads settings from environment variables, .env files, TOML, YAML, JSON, or INI files — in a layered, precedence-aware way — and exposes them as a simple settings object. Switching environments is a single environment variable. Built-in validators let you catch missing or malformed config at startup rather than discovering the problem at 2am when a database connection fails.

This article covers installing dynaconf, the basic settings object, layered configuration files, environment-based overrides, validators, and a real-world web app config pattern. By the end you will have a configuration system that scales from a weekend project to a production deployment without becoming a maintenance burden.

Python dynaconf Settings: Quick Example

Here is the smallest possible dynaconf setup — a settings.toml file and a Python script that reads from it:

# settings.toml
[default]
app_name = "MyApp"
debug = false
max_connections = 10
database_url = "sqlite:///local.db"
# quick_dynaconf.py
from dynaconf import Dynaconf

settings = Dynaconf(settings_file="settings.toml")

print(settings.APP_NAME)
print(settings.DEBUG)
print(settings.MAX_CONNECTIONS)
print(settings.DATABASE_URL)

Output:

MyApp
False
10
sqlite:///local.db

Dynaconf automatically uppercases setting names — you write app_name in the file and access it as settings.APP_NAME. Types are preserved: false in TOML becomes a Python bool, 10 becomes an int. The sections below show how to add multiple environments, override with environment variables, and validate settings at startup.

One settings object, three environments, zero if-else chains.
One settings object, three environments, zero if-else chains.

What Is dynaconf and Why Use It?

Dynaconf is a configuration management library that brings twelve-factor app principles to Python without the boilerplate. The core idea is a settings hierarchy: default values in a file, environment-specific overrides layered on top, and environment variables at the highest priority so they can always override anything below them. This matches how real applications are deployed — defaults in version control, secrets in the environment.

Featureos.environpython-dotenvdynaconf
Multiple environmentsManualManualBuilt-in
Type coercionNo (strings only)No (strings only)Yes (int, bool, list, dict)
ValidationNoNoYes
File formatsenv only.env onlyTOML, YAML, JSON, INI, .env
Layered overridesNoNoYes

Plain os.environ is fine for a script with two settings. Once you have environments, types, and validation requirements, dynaconf’s structure pays for itself quickly. The settings object is also easier to mock in tests than scattered os.environ.get() calls.

Installing dynaconf

# terminal
pip install dynaconf

Output:

Successfully installed dynaconf-3.2.4

Dynaconf has no required dependencies beyond the Python standard library. Optional extras add support for specific file formats: pip install dynaconf[yaml] adds PyYAML tupport, pip install dynaconf[toml] adds tomli for Python 3.10 and below (3.11+ has tomllib built-in). TOML is the recommended format and works without extras on modern Python.

Layered Environments

Dynaconf’s most useful feature is layered environments in a single settings file. A [default] section provides base values; named environment sections override only what changes:

# settings.toml
[default]
app_name = "MyApp"
debug = false
log_level = "INFO"
database_url = "sqlite:///app.db"
max_connections = 10

[development]
debug = true
log_level = "DEBUG"
database_url = "sqlite:///dev.db"

[testing]
database_url = "sqlite:///:memory:"
max_connections = 2

[production]
log_level = "WARNING"
max_connections = 50
# read_environments.py
import os
from dynaconf import Dynaconf

# Switch environment by setting ENV_FOR_DYNACONF
os.environ["ENV_FOR_DYNACONF"] = "development"

settings = Dynaconf(settings_file="settings.toml", environments=True)

print("Environment:", settings.current_env)
print("Debug:", settings.DEBUG)
print("Log level:", settings.LOG_LEVEL)
print("Database:", settings.DATABASE_URL)
print("Max conn:", settings.MAX_CONNECTIONS)

Output:

Environment: DEVELOPMENT
Debug: True
Log level: DEBUG
Database: sqlite:///dev.db
Max conn: 10

The development section overrides debug, log_level, and database_url but inherits max_connections from default. Switching to production is just changing one environment variable — no code changes, no file swaps, no manual conditionals. In a deployment pipeline you set ENV_FOR_DYNACONF=production in the server’s environment and the app picks up the right settings automatically.

ENV_FOR_DYNACONF=production. That's the entire deployment config change.
ENV_FOR_DYNACONF=production. That’s the entire deployment config change.

Environment Variable Overrides

Any setting can be overridden from the shell using the DYNACONF_ prefix (or a custom prefix you configure). Environment variables always win over file values, which makes them ideal for secrets that should never be committed to version control:

# env_override.py
import os
from dynaconf import Dynaconf

# These would typically come from the server environment or a secrets manager
os.environ["DYNACONF_DATABASE_URL"] = "postgresql://user:secret@db.prod.example/myapp"
os.environ["DYNACONF_MAX_CONNECTIONS"] = "100"

settings = Dynaconf(settings_file="settings.toml")

print("Database:", settings.DATABASE_URL)
print("Max conn:", settings.MAX_CONNECTIONS)
print("Type:", type(settings.MAX_CONNECTIONS))

Output:

Database: postgresql://user:secret@db.prod.example/myapp
Max conn: 100
Type: <class 'int'>

Notice that MAX_CONNECTIONS is still an integer even though environment variables are inherently strings. Dynaconf detects the type from the original settings file definition and casts the override to match. This prevents the classic bug where os.environ.get("MAX_CONNECTIONS", 10) returns a string that crashes when you try to use it as a number.

Using a .env File for Secrets

Dynaconf natively reads .secrets.toml and .env files. The convention is to keep non-sensitive defaults in settings.toml (committed to version control) and secrets in .secrets.toml (git-ignored):

# .secrets.toml  (add to .gitignore)
[default]
secret_key = "dev-only-key-change-in-production"
api_key = "sk-test-abc123"
smtp_password = "devpassword"
# read_secrets.py
from dynaconf import Dynaconf

settings = Dynaconf(
    settings_file=["settings.toml", ".secrets.toml"],
    environments=True,
)

print("App:", settings.APP_NAME)
print("Secret key starts with:", settings.SECRET_KEY[:8], "...")
print("API key starts with:", settings.API_KEY[:8], "...")

Output:

App: MyApp
Secret key starts with: dev-only ...
API key starts with: sk-test- ...

The settings_file parameter accepts a list. Dynaconf merges all files in order, with later files winning on conflicts. This pattern — base settings in settings.toml, secrets in .secrets.toml — is dynaconf’s recommended convention. The .secrets.toml file goes in .gitignore immediately and is replaced by real environment variables or a secrets manager in production.

One file gets committed. The other does not. Dynaconf reads both.
One file gets committed. The other does not. Dynaconf reads both.

Validating Settings at Startup

Dynaconf’s validator system lets you declare which settings are required and what constraints they must satisfy. Validation runs when the settings object is created, so misconfiguration is caught immediately — not buried in a runtime error three calls deep:

# validate_settings.py
from dynaconf import Dynaconf, Validator

settings = Dynaconf(
    settings_file="settings.toml",
    validators=[
        Validator("APP_NAME", must_exist=True),
        Validator("MAX_CONNECTIONS", must_exist=True, gte=1, lte=500),
        Validator("LOG_LEVEL", must_exist=True, is_in=["DEBUG", "INFO", "WARNING", "ERROR"]),
        Validator("DATABASE_URL", must_exist=True, startswith="sqlite"),
    ]
)

print("All settings valid!")
print("App:", settings.APP_NAME)
print("Max connections:", settings.MAX_CONNECTIONS)

Output (valid config):

All settings valid!
App: MyApp
Max connections: 10

Output (if LOG_LEVEL were missing or invalid):

dynaconf.validator.ValidationError: LOG_LEVEL must be one of ['DEBUG', 'INFO', 'WARNING', 'ERROR']

Validators support must_exist, eq, ne, lt, gt, lte, gte, is_in, is_not_in, startswith, endswith, and custom condition callables. Catching a missing DATABASE_URL at startup with a clear error message is far better than getting a cryptic AttributeError when your app tries its first database query.

Real-Life Example: Flask App Configuration

Here is a realistic pattern for a Flask application using dynaconf across three environments:

# settings.toml
[default]
app_name = "TaskTracker"
debug = false
log_level = "INFO"
database_url = "sqlite:///tasks.db"
secret_key = "change-me"
items_per_page = 20
allowed_hosts = ["localhost", "127.0.0.1"]

[development]
debug = true
log_level = "DEBUG"

[testing]
database_url = "sqlite:///:memory:"
secret_key = "test-key-not-secret"

[production]
log_level = "WARNING"
items_per_page = 50
# config.py
from dynaconf import Dynaconf, Validator

settings = Dynaconf(
    envvar_prefix="TASKTRACKER",
    settings_file=["settings.toml", ".secrets.toml"],
    environments=True,
    load_dotenv=True,
    validators=[
        Validator("SECRET_KEY", must_exist=True),
        Validator("DATABASE_URL", must_exist=True),
        Validator("ITEMS_PER_PAGE", gte=1, lte=200),
    ]
)
# app.py
import os
from flask import Flask
from config import settings

os.environ["ENV_FOR_DYNACONF"] = "development"

app = Flask(__name__)
app.config["SECRET_KEY"] = settings.SECRET_KEY
app.config["SQLALCHEMY_DATABASE_URI"] = settings.DATABASE_URL
app.config["DEBUG"] = settings.DEBUG

@app.route("/config-info")
def config_info():
    return {
        "env": settings.current_env,
        "debug": settings.DEBUG,
        "db": settings.DATABASE_URL,
        "per_page": settings.ITEMS_PER_PAGE,
    }

if __name__ == "__main__":
    print(f"Starting {settings.APP_NAME} in {settings.current_env} mode")
    app.run(debug=settings.DEBUG)

Output:

Starting TaskTracker in DEVELOPMENT mode
 * Running on http://127.0.0.1:5000

The custom envvar_prefix="TASKTRACKER" means overrides use TASKTRACKER_DATABASE_URL instead of DYNACONF_DATABASE_URL, which avoids conflicts if multiple dynaconf-based apps run on the same machine. In production you set ENV_FOR_DYNACONF=production and TASKTRACKER_SECRET_KEY=your-real-key in the server environment and everything else comes from the file defaults — no code changes required.

Frequently Asked Questions

Does dynaconf have built-in Flask and Django integration?

Yes. Dynaconf ships with official extensions for both frameworks. For Flask, call FlaskDynaconf(app) from dynaconf.contrib.flask_dynaconf to automatically sync settings to app.config. For Django, set DJANGO_SETTINGS_MODULE and add from dynaconf import settings to your settings file. Both integrations are documented on the dynaconf website with working examples.

How do I handle secrets in production with dynaconf?

The recommended pattern is: keep defaults in settings.toml (committed), keep development secrets in .secrets.toml (git-ignored), and inject production secrets as environment variables with the DYNACONF_ (or custom) prefix. This way secrets never live in version control and can come from a secrets manager like AWS Secrets Manager or HashiCorp Vault that injects them as environment variables at container startup.

Can dynaconf reload settings without restarting the app?

Yes. Call settings.reload() to re-read all configured files and environment variables. This is useful in long-running services where you want to pick up a configuration change without a restart. Be careful with thread safety if multiple threads access settings simultaneously during a reload — dynaconf does not lock the settings object during reload.

How do I store lists and dicts in dynaconf settings?

TOML natively supports lists (allowed_hosts = ["a.com", "b.com"]) and inline tables. In environment variables, use dynaconf’s special notation: DYNACONF_ALLOWED_HOSTS='@json ["a.com", "b.com"]' for JSON-parsed values. The @json prefix tells dynaconf to parse the value as JSON rather than a plain string. For nested dicts use @json {"key": "value"}.

How do I override settings in tests?

Use dynaconf’s settings.configure() method or the context manager with settings.using_env("testing"): to temporarily switch environments. For pytest, create a conftest.py fixture that calls settings.configure(DATABASE_URL="sqlite:///:memory:") before each test and resets it after. Dynaconf’s test environment section in settings.toml is the cleanest approach for consistent test isolation.

Conclusion

Dynaconf replaces scattered os.environ.get() calls with a structured, validated, environment-aware settings object. The layered configuration system handles the dev/staging/production split cleanly: defaults in a committed file, environment overrides via variables, secrets in a git-ignored file or injected at deploy time. Validators catch missing and malformed config at startup so errors surface immediately rather than at runtime.

Try extending the Flask example by adding a [staging] environment section and a validator for ALLOWED_HOSTS that checks the list is non-empty. Then set ENV_FOR_DYNACONF=staging and watch dynaconf pick up the right values automatically. Once you have used it on a real project, going back to manual environment variable parsing will feel like writing assembly.

Full documentation, including Django integration and secrets management guides, is at the official dynaconf website.

How To Use Python snoop for Function Tracing and Debugging

How To Use Python snoop for Function Tracing and Debugging

Intermediate

You have been there before — a function returns the wrong value and you have no idea why. So you add a print() call, run the script, add another, run it again. Twenty minutes later you have a mess of debug output and still no clear picture of what went wrong. The built-in debugger helps but requires you to set breakpoints ahead of time and step through code manually. There has to be a better way.

The snoop library solves exactly this problem. It is a debugging decorator that automatically traces every line executed inside a function, showing you the value of each variable as it changes. You add a single decorator, run your code, and get a complete play-by-play of what happened — no print statements, no breakpoints, no guessing. It is one of those tools that feels like it should have been built into Python from the start.

This article covers everything you need to use snoop effectively: installation, the @snoop decorator, the pp() pretty-printer, tracing nested calls, writing trace output to a file, and combining snoop with watch expressions. By the end you will have a practical debugging workflow that is faster and less painful than anything you have used before.

Tracing a Python Function with snoop: Quick Example

Install snoop with pip, add one decorator, and your function narrates itself as it runs. Here is a minimal example to show you what snoop output looks like:

# quick_snoop.py
import snoop

@snoop
def calculate_discount(price, percent):
    discount = price * (percent / 100)
    final = price - discount
    return final

result = calculate_discount(100, 15)
print("Final price:", result)

Output:

14:32:01.11 >>> Call to calculate_discount in quick_snoop.py:4
14:32:01.11 ...... price = 100
14:32:01.11 ...... percent = 15
14:32:01.11    5 | discount = price * (percent / 100)
14:32:01.11 .......... discount = 15.0
14:32:01.11    6 | final = price - discount
14:32:01.11 .......... final = 85.0
14:32:01.11    7 | return final
14:32:01.11 <<< Return value from calculate_discount: 85.0
Final price: 85.0

Every line that executed is shown with its line number. Every variable that changed is shown with its new value. The timestamp lets you spot slow operations. This is the full function trace from one decorator -- no other changes to your code required.

The sections below show how to use snoop on more complex code, trace nested calls, filter the output, and write traces to a file for later review.

print() was so 2015. Your variables deserve better.
print() was so 2015. Your variables deserve better.

What Is snoop and Why Use It?

Snoop is a third-party debugging library built on top of PySnooper (its predecessor). It traces the execution of a function line by line and reports every variable assignment, every loop iteration, and every return value. Think of it as a flight recorder for your functions -- it captures everything that happened so you can review it afterward.

The key difference from standard debugging approaches is that snoop requires zero setup. You do not need to open a debugger, set breakpoints, or even know in advance which lines are the problem. You decorate the function and run your code normally. The trace goes to stderr by default, so it does not interfere with your program's actual output.

ApproachSetup RequiredShows Variable ChangesWorks in Production
print() statementsManual, per-variableOnly what you printNo (remove before deploy)
pdb / breakpoint()Breakpoints + interactiveOn demand via commandsNo (blocks execution)
logging moduleConfiguration + callsOnly what you logYes
snoopOne decoratorEvery variable, every lineNo (verbose output)

Snoop is best used during development and debugging sessions. Add it to a suspicious function, reproduce the bug, review the trace, then remove the decorator before committing. The comparison above shows why it beats print() for most debugging scenarios -- you get full coverage without having to predict which variables matter.

Installing snoop

Snoop is available on PyPI and installs with a single command:

# terminal
pip install snoop

Output:

Successfully installed snoop-0.4.3 executing-2.0.1 cheap-repr-0.5.1

Snoop has no heavy dependencies -- it installs quickly and works with Python 3.6 and above. Once installed, you import it directly in any script where you need to trace execution.

Using the @snoop Decorator

The most common use of snoop is as a function decorator. Decorate any function you want to trace and snoop handles the rest. Here is an example tracing a more complex function that processes a list:

# trace_list_processing.py
import snoop

@snoop
def find_above_average(numbers):
    total = sum(numbers)
    average = total / len(numbers)
    above = []
    for n in numbers:
        if n > average:
            above.append(n)
    return above

data = [12, 45, 7, 89, 23, 56, 34]
result = find_above_average(data)
print("Above average:", result)

Output:

14:33:05.22 >>> Call to find_above_average in trace_list_processing.py:4
14:33:05.22 ...... numbers = [12, 45, 7, 89, 23, 56, 34]
14:33:05.22    5 | total = sum(numbers)
14:33:05.22 .......... total = 266
14:33:05.22    6 | average = total / len(numbers)
14:33:05.22 .......... average = 38.0
14:33:05.22    7 | above = []
14:33:05.22    8 | for n in numbers:
14:33:05.22 .......... n = 12
14:33:05.22    9 | if n > average:
14:33:05.22    8 | for n in numbers:
14:33:05.22 .......... n = 45
14:33:05.22    9 | if n > average:
14:33:05.22   10 | above.append(n)
14:33:05.22 .................. above = [45]
...
14:33:05.22 <<< Return value from find_above_average: [45, 89, 56]
Above average: [45, 89, 56]

Notice how the loop iterations are shown one by one, with each value of n displayed as the loop advances. When a variable changes (like above growing as items are appended), snoop shows the updated value immediately. This makes it trivial to spot where a loop is going wrong or when a condition is not behaving as expected.

When your function traces itself, you stop guessing and start reading.
When your function traces itself, you stop guessing and start reading.

Adding Watch Expressions

By default snoop traces all variable changes in a function. You can add extra watch expressions to track values that are not simple local variables -- like dictionary lookups, method calls, or expressions:

# trace_with_watch.py
import snoop

@snoop(watch=('user["role"]', 'len(results)'))
def filter_results(user, results):
    if user["role"] == "admin":
        return results
    filtered = [r for r in results if r.get("public", False)]
    return filtered

user = {"name": "alice", "role": "viewer"}
records = [
    {"id": 1, "public": True, "data": "public item"},
    {"id": 2, "public": False, "data": "private item"},
    {"id": 3, "public": True, "data": "another public"},
]
output = filter_results(user, records)
print("Filtered:", len(output), "records")

Output:

14:34:12.45 >>> Call to filter_results in trace_with_watch.py:4
14:34:12.45 ...... user = {'name': 'alice', 'role': 'viewer'}
14:34:12.45 ...... results = [{'id': 1, ...}, {'id': 2, ...}, {'id': 3, ...}]
14:34:12.45 ...... user["role"] = 'viewer'
14:34:12.45 ...... len(results) = 3
14:34:12.45    5 | if user["role"] == "admin":
14:34:12.45    7 | filtered = [r for r in results if r.get("public", False)]
14:34:12.45 .......... filtered = [{'id': 1, ...}, {'id': 3, ...}]
14:34:12.45    8 | return filtered
14:34:12.45 <<< Return value from filter_results: [{'id': 1, ...}, {'id': 3, ...}]
Filtered: 2 records

The watch expressions user["role"] and len(results) are evaluated at call time and displayed alongside the regular variable trace. This is useful when you need to track a nested value or a computed metric without adding extra local variables to your code.

Using pp() for Quick Inspection

Snoop also provides a pp() function -- a pretty-printer that works like print() but also shows you the expression you passed, not just its value. This is useful for quick one-off inspections without decorating an entire function:

# use_pp.py
from snoop import pp

data = {"users": [{"name": "alice", "score": 95}, {"name": "bob", "score": 72}]}
top_scorers = [u["name"] for u in data["users"] if u["score"] >= 90]

pp(top_scorers)
pp(len(data["users"]))
pp(data["users"][0]["score"] * 1.1)

Output:

14:35:00.11 log_call.py:6 | top_scorers = ['alice']
14:35:00.11 log_call.py:7 | len(data["users"]) = 2
14:35:00.11 log_call.py:8 | data["users"][0]["score"] * 1.1 = 104.5

Unlike print(top_scorers) which just shows the value, pp(top_scorers) shows both the expression and the value on one line with a timestamp and line number. When you have a chain of transformations and need to inspect an intermediate result, pp() is faster than a full function trace.

pp() -- because knowing the value and knowing the expression are two different things.
pp() -- because knowing the value and knowing the expression are two different things.

Writing Trace Output to a File

By default snoop sends trace output to stderr. For longer-running scripts or when you want to review the trace later, you can redirect it to a file instead:

# trace_to_file.py
import snoop

snoop.install(out="debug_trace.log")

@snoop
def parse_csv_row(row):
    fields = row.strip().split(",")
    record = {}
    for i, field in enumerate(fields):
        key = ["name", "age", "city", "score"][i]
        record[key] = field.strip()
    record["score"] = float(record["score"])
    return record

rows = [
    "Alice, 30, New York, 95.5",
    "Bob, 25, Chicago, 87.0",
    "Carol, 35, Seattle, 92.3",
]
for row in rows:
    parsed = parse_csv_row(row)
    print(parsed)

Output (to stdout):

{'name': 'Alice', 'age': '30', 'city': 'New York', 'score': 95.5}
{'name': 'Bob', 'age': '25', 'city': 'Chicago', 'score': 87.0}
{'name': 'Carol', 'age': '35', 'city': 'Seattle', 'score': 92.3}

debug_trace.log (first entry):

14:36:22.11 >>> Call to parse_csv_row in trace_to_file.py:5
14:36:22.11 ...... row = 'Alice, 30, New York, 95.5'
14:36:22.11    6 | fields = row.strip().split(",")
14:36:22.11 .......... fields = ['Alice', ' 30', ' New York', ' 95.5']
...
14:36:22.11 <<< Return value from parse_csv_row: {'name': 'Alice', ...}

Calling snoop.install(out="debug_trace.log") redirects all snoop output from that point forward to the specified file. The program's normal stdout output is unchanged. This is particularly useful when debugging data pipelines where the trace output would be too noisy to read in the terminal alongside the real output.

Tracing Nested Function Calls

When a bug spans multiple functions, you can decorate each one independently. Snoop will label each call and return separately, making it easy to follow the execution path across multiple functions:

# trace_nested.py
import snoop

@snoop
def validate_age(age_str):
    age = int(age_str)
    if age < 0 or age > 150:
        raise ValueError(f"Invalid age: {age}")
    return age

@snoop
def process_user(raw_data):
    name = raw_data.get("name", "Unknown").strip()
    age = validate_age(raw_data.get("age", "0"))
    return {"name": name, "age": age, "adult": age >= 18}

result = process_user({"name": " Charlie ", "age": "28"})
print(result)

Output:

14:37:45.33 >>> Call to process_user in trace_nested.py:11
14:37:45.33 ...... raw_data = {'name': ' Charlie ', 'age': '28'}
14:37:45.33   12 | name = raw_data.get("name", "Unknown").strip()
14:37:45.33 .......... name = 'Charlie'
14:37:45.33   13 | age = validate_age(raw_data.get("age", "0"))
14:37:45.33     >>> Call to validate_age in trace_nested.py:4
14:37:45.33     ...... age_str = '28'
14:37:45.33        5 | age = int(age_str)
14:37:45.33     .......... age = 28
14:37:45.33        6 | if age < 0 or age > 150:
14:37:45.33        8 | return age
14:37:45.33     <<< Return value from validate_age: 28
14:37:45.33 .......... age = 28
14:37:45.33   14 | return {"name": name, "age": age, "adult": age >= 18}
14:37:45.33 <<< Return value from process_user: {'name': 'Charlie', 'age': 28, 'adult': True}
{'name': 'Charlie', 'age': 28, 'adult': True}

The nested call is indented in the trace output, so you can visually see where process_user calls into validate_age and what comes back. When a validation or helper function is misbehaving, this nested view makes it immediately obvious whether the input going in or the output coming back is wrong.

Nested traces: because bugs respect no function boundary.
Nested traces: because bugs respect no function boundary.

Real-Life Example: Debugging a Data Transformation Pipeline

Here is a realistic scenario -- a pipeline that reads raw order records, calculates totals, and flags suspicious orders. We will use snoop to trace the pipeline and find a bug:

# debug_order_pipeline.py
import snoop

@snoop
def calculate_order_total(items):
    subtotal = 0
    for item in items:
        price = item.get("price", 0)
        qty = item.get("qty", 1)
        line_total = price * qty
        subtotal += line_total
    tax = subtotal * 0.1
    total = subtotal + tax
    return round(total, 2)

@snoop
def flag_suspicious(order):
    total = calculate_order_total(order["items"])
    is_large = total > 1000
    is_new_account = order.get("account_age_days", 999) < 30
    suspicious = is_large and is_new_account
    return {"order_id": order["id"], "total": total, "suspicious": suspicious}

orders = [
    {
        "id": "ORD-001",
        "account_age_days": 15,
        "items": [
            {"name": "Laptop", "price": 899, "qty": 1},
            {"name": "Mouse", "price": 29, "qty": 2},
        ]
    },
    {
        "id": "ORD-002",
        "account_age_days": 180,
        "items": [
            {"name": "Monitor", "price": 399, "qty": 3},
        ]
    },
]

for order in orders:
    result = flag_suspicious(order)
    print(f"Order {result['order_id']}: ${result['total']} -- suspicious={result['suspicious']}")

Output:

Order ORD-001: $1055.8 -- suspicious=True
Order ORD-002: $1316.7 -- suspicious=False

The snoop trace shows exactly how each order's total is calculated and which conditions trigger the suspicious flag. When a bug appears (for example, the wrong field name in item.get()), the trace immediately shows which variable returned 0 or None. You can extend this example by adding the watch parameter to track the running subtotal at each iteration, or redirecting output to a file when processing hundreds of orders.

Frequently Asked Questions

Can I leave @snoop in production code?

It is not recommended. Snoop adds significant verbosity to stderr and has a small performance cost from the introspection overhead. The right workflow is to add @snoop during a debugging session, find the bug, fix it, and remove the decorator before committing. If you need persistent tracing in production, the logging module is the right tool.

Does snoop work on class methods?

Yes, @snoop works on instance methods, class methods, and static methods just like regular functions. Decorate the method normally inside the class definition. The trace will show self (or cls) as one of the initial variable values, so you can see the object's state at call time.

What is the difference between snoop and PySnooper?

Snoop is a more modern, actively maintained fork of PySnooper. It has cleaner output formatting, better repr handling for complex objects, the pp() function, and a simpler API. If you are starting fresh, use snoop. PySnooper is still functional but less actively developed. The decorator syntax is nearly identical between the two.

How do I control how deep snoop goes into nested objects?

Snoop uses cheap_repr to display object values, which automatically limits the depth and length of complex objects to keep output readable. For custom objects, you can define a __repr__ method on your class and snoop will use it. If you want to see more of a large list or dict, use pp(my_object) which gives you a full pretty-printed view.

Does snoop work with async functions?

Yes, @snoop supports async def functions as of snoop 0.4.0. Decorate the async function normally and the trace will show the coroutine execution including await points. Note that interleaved output from concurrent coroutines can be harder to read -- in those cases, redirecting to a file with snoop.install(out="trace.log") makes it easier to follow each coroutine's trace separately.

Conclusion

Snoop transforms debugging from a guessing game into a systematic review. The @snoop decorator gives you a complete line-by-line trace of any function -- every variable change, every loop iteration, every return value -- with a single addition to your code. The pp() function handles quick one-off inspections without the full function trace overhead. Watch expressions let you track computed values and dictionary lookups that snoop would not capture automatically.

The workflow is simple: add the decorator, reproduce the bug, read the trace, remove the decorator. Try adding it to the flag_suspicious function from the real-life example and changing the threshold to see how the trace output changes. Once you have used snoop a few times, the thought of hunting bugs with print() statements will feel like a genuine regression.

For the full API and advanced configuration options, see the official snoop documentation on GitHub.

How To Use Python alive-progress for Animated Progress Bars

How To Use Python alive-progress for Animated Progress Bars

Beginner

You have a data pipeline, a batch file processor, or a training loop that takes minutes to run. You stare at a blank terminal, no idea if it is working, 10% done, or silently crashed three minutes ago. The standard print(f'Processing {i}/{total}') approach floods the screen with output that doesn’t actually tell you the most important thing: how much time is left.

alive-progress is an animated terminal progress bar library that gives you real-time throughput stats, a live spinner, estimated time remaining, and configurable visual styles — all in a few lines of code. It goes well beyond tqdm in terms of visual richness and customization without sacrificing simplicity.

This article covers everything: basic usage with an iterable, the context manager API for manual advancement, customizing bar styles and themes, printing messages while a bar runs, nested bars for multi-stage pipelines, and a real-world batch image processor. After reading this, your terminal scripts will never look like they are frozen again.

Your First Progress Bar: Quick Example

The simplest way to use alive-progress is to wrap your iterable in alive_it() — a drop-in replacement for any for loop:

# quick_bar.py
from alive_progress import alive_it
import time

items = list(range(50))

for item in alive_it(items, title='Processing items'):
    time.sleep(0.05)  # Simulate work

print('Done!')

Terminal output (animated — static representation):

Processing items |████████████████████████| 50/50 [100%] in 2.6s (19.2/s)

alive_it() wraps any iterable and displays a real-time animated bar as you iterate. It shows the count, percentage, elapsed time, and throughput (items per second). When the loop finishes, the bar snaps to a clean static line showing the final stats. No extra code, no boilerplate — just wrap your existing loop.

50 items. 2.6 seconds. 19 items per second. Beats staring at a cursor.
50 items. 2.6 seconds. 19 items per second. Beats staring at a cursor.

What Is alive-progress?

alive-progress is a Python terminal progress bar library focused on visual quality and real-time statistics. Unlike simpler libraries, it uses ANSI escape codes to update the same terminal line in-place rather than printing new lines, creating a true animation effect that looks modern even on basic terminals.

Featurealive-progresstqdm
Animated spinnerYes (many styles)Limited
Real-time throughputYes (ETA + rate)Yes
Print while runningYes (bar.text())Yes (tqdm.write())
Nested barsYesYes
Custom bar styles40+ themes built-inLimited
Unknown totalYes (spinner mode)Yes
Jupyter notebookLimitedBetter

Install with: pip install alive-progress. Python 3.6+ is supported. No heavy dependencies — the only requirement is about-time, which is automatically installed.

Context Manager API

When your processing is not a simple loop — for example, you fetch items from an API in batches, or in you update at irregular intervals — use the context manager API with alive_bar() and manually call bar() to advance the counter.

# context_bar.py
from alive_progress import alive_bar
import time

total_files = 40

with alive_bar(total_files, title='Compressing files') as bar:
    for i in range(total_files):
        time.sleep(0.04)  # Simulate compression
        bar()             # Advance the bar by 1

print('All files compressed!')

Output (static):

Compressing files |████████████████████████| 40/40 [100%] in 1.7s (23.5/s)

You can also advance by more than 1 at a time: bar(5) increments the counter by 5. This is useful when processing variable-size batches where the work unit is a batch, not an individual item. Pass None as the total to create an indeterminate bar that spins without a percentage — appropriate when you do not know the total count upfront.

Printing Messages While the Bar Runs

One of the most useful features is the ability to print status messages without disrupting the animated bar. Use bar.text() to show a live subtitle below the bar, or print() within the context — alive-progress intercepts it and displays it above the bar without breaking the animation.

# bar_messages.py
from alive_progress import alive_bar
import time

files = ['config.yaml', 'data.csv', 'model.pkl', 'report.html', 'archive.zip']

with alive_bar(len(files), title='Uploading') as bar:
    for filename in files:
        bar.text(f'-> {filename}')  # Live subtitle
        time.sleep(0.3)
        size_kb = len(filename) * 100  # Fake file size
        print(f'Uploaded {filename} ({size_kb} KB)')
        bar()

Terminal output (sequential):

Uploaded config.yaml (1000 KB)
Uploaded data.csv (800 KB)
Uploaded model.pkl (900 KB)
Uploaded report.html (1100 KB)
Uploaded archive.zip (1000 KB)
Uploading |████████████████████████| 5/5 [100%] in 1.5s (3.3/s)

bar.text() updates the subtitle line in real time — so while the bar is animating, users see which file is currently uploading. When the loop completes, the printed lines stay in the terminal history above the final bar summary. This pattern is ideal for batch jobs where users want to know both overall progress and current activity.

bar.text(): real-time status without print() flooding your terminal.
bar.text(): real-time status without print() flooding your terminal.

Customizing Bar Styles

alive-progress ships with dozens of built-in bar and spinner styles. You can preview them all with alive_progress.styles.showtime(), or set them directly in the alive_bar() call or via the global config_handler.

# custom_styles.py
from alive_progress import alive_bar, config_handler
import time

# Set global defaults for all bars in this session
config_handler.set_global(
    bar='classic',
    spinner='classic',
    title_length=20,
    length=40
)

items = list(range(30))

with alive_bar(len(items), title='Classic bar') as bar:
    for _ in items:
        time.sleep(0.02)
        bar()

# Override per-bar
with alive_bar(len(items), title='Smooth bar', bar='smooth', spinner='waves') as bar:
    for _ in items:
        time.sleep(0.02)
        bar()

# Force mode -- for scripts called from CI/CD without a TTY
with alive_bar(len(items), title='Force mode', force_tty=True) as bar:
    for _ in items:
        time.sleep(0.02)
        bar()

Output (static representations):

Classic bar         [######################################] 30/30 in 0.7s
Smooth bar          |████████████████████████████████████| 30/30 in 0.7s
Force mode          |████████████████████████████████████| 30/30 in 0.7s

The config_handler.set_global() call persists across all subsequent bars in the same Python process — useful for setting a house style once at the start of a script. Individual bars can override global settings by passing keyword arguments. The force_tty=True argument is important for CI/CD pipelines: without it, alive-progress detects a non-interactive terminal and falls back to plain print output, which can look garbled in logs.

Unknown Total: Spinner Mode

Sometimes you do not know how many items there are before you start — streaming API responses, crawling a website, or reading from a queue. Pass None as the total and alive-progress switches to an indefinite spinner with a live count.

# spinner_mode.py
from alive_progress import alive_bar
import time

def stream_events():
    # Simulate variable-length stream
    for i in range(25):
        time.sleep(0.08)
        yield {'event': f'event_{i}', 'value': i * 3}

collected = []
with alive_bar(None, title='Streaming events') as bar:
    for event in stream_events():
        collected.append(event)
        bar.text(f'Last: {event["event"]}')
        bar()

print(f'Collected {len(collected)} events')

Output:

Streaming events |/| 25 in 2.1s (12.0/s)
Collected 25 events

In spinner mode the bar shows a rotating character instead of a fill animation, along with a raw count and throughput rate. This gives users feedback that the script is alive and working even when the end point is unknown.

Real-Life Example: Batch CSV Processor

Nested progress bars: outer loop knows how many files, inner loop knows how many rows.
Nested progress bars: outer loop knows how many files, inner loop knows how many rows.
# batch_csv_processor.py
import csv
import io
import time
from alive_progress import alive_bar

# Generate fake CSV data in memory
def make_fake_csv(n_rows):
    buf = io.StringIO()
    writer = csv.DictWriter(buf, fieldnames=['id', 'name', 'score', 'active'])
    writer.writeheader()
    for i in range(n_rows):
        writer.writerow({
            'id': i + 1,
            'name': f'user_{i+1}',
            'score': (i * 37) % 100,
            'active': i % 3 != 0
        })
    buf.seek(0)
    return buf

def process_csv(file_obj, bar, report):
    reader = csv.DictReader(file_obj)
    rows_processed = 0
    for row in reader:
        # Simulate row-level processing
        time.sleep(0.002)
        if row['active'] == 'True' and int(row['score']) > 50:
            report['high_scorers'] += 1
        rows_processed += 1
        bar()
    return rows_processed

# Simulate 4 CSV files of varying size
file_sizes = [80, 120, 60, 100]
file_names = [f'batch_{i+1}.csv' for i in range(len(file_sizes))]
report = {'high_scorers': 0, 'total_rows': 0}

total_rows = sum(file_sizes)

with alive_bar(total_rows, title='Processing batches') as bar:
    for name, size in zip(file_names, file_sizes):
        bar.text(f'-> {name}')
        fake_csv = make_fake_csv(size)
        rows = process_csv(fake_csv, bar, report)
        report['total_rows'] += rows
        print(f'  Finished {name}: {rows} rows')

print(f'\nSummary: {report["total_rows"]} rows, {report["high_scorers"]} high scorers')

Output:

  Finished batch_1.csv: 80 rows
  Finished batch_2.csv: 120 rows
  Finished batch_3.csv: 60 rows
  Finished batch_4.csv: 100 rows
Processing batches |████████████████████████| 360/360 [100%] in 0.8s (450.0/s)

Summary: 360 rows, 48 high scorers

The bar advances once per row, so the animation accurately reflects actual progress across all files. bar.text() shows which file is being processed in real time. The print() calls inside the loop appear above the bar without disrupting the animation. Adapt this pattern to process real CSV files from disk, replace the fake data with csv.DictReader(open(path)), and add error handling for malformed rows.

Frequently Asked Questions

When should I use alive-progress vs tqdm?

Use alive-progress when you want richer animations and real-time text messages in interactive terminal scripts. Use tqdm when you need Jupyter notebook integration, pandas integration (df.progress_apply()), or when you are in a CI environment where animation is less important. Both are excellent; the choice is largely aesthetic.

alive-progress looks broken in my CI/CD logs. Why?

In non-TTY environments (CI logs, redirected output), alive-progress falls back to plain output by default, which can include ANSI escape codes that look garbled in log viewers. Fix it by adding force_tty=False to suppress the bar entirely in CI, or use enrich_print=False to prevent print interception. Alternatively, wrap the bar call in a check: if sys.stdout.isatty():.

What happens if I call bar() more times than the total?

The bar will exceed 100% — it does not hard-cap at the total. This is actually useful for cases where your estimate was wrong. If you need a strict cap, track the count manually and stop calling bar() when you reach the expected total. The final stats line will show the actual count regardless.

Does alive-progress work with multithreaded code?

Yes. The bar() call is thread-safe. You can call it from multiple threads concurrently and the counter will advance correctly. The display is updated in the main thread, so the animation stays smooth even under high call frequency from background threads.

What is the calibrate parameter?

The calibrate parameter controls the throughput animation speed. When your processing rate is very fast (thousands of items per second), the animation might update faster than it renders smoothly. Passing calibrate=100 tells alive-progress to treat 100 items/s as “full speed” for animation purposes, slowing down the visual effect to look natural. For most use cases the default auto-calibration works fine.

Conclusion

You have covered the full alive-progress toolkit: alive_it() for simple loop wrapping, the context manager alive_bar() for manual advancement, bar.text() for live subtitles, global and per-bar style configuration with config_handler, spinner mode for unknown totals, and the batch CSV example that ties everything together.

The best immediate use of this library is replacing any long-running script that currently outputs nothing — data import jobs, batch API calls, file processing pipelines — with a single with alive_bar(total) as bar: wrapper. The visual feedback improves debugging and gives users confidence that the script is working. Extend the CSV processor by adding a second nested bar for per-row sub-tasks, or integrate it with asyncio for parallel batch processing.

Browse all available styles with from alive_progress.styles import showtime; showtime(), and find the full documentation at github.com/rsalmei/alive-progress.

How To Use Python anyio for Async Compatibility

How To Use Python anyio for Async Compatibility

Intermediate

You chose asyncio for your project. Six months later a new team member wants to use trio. Your library might need to support both. Rewriting async code to switch runtimes is painful — the primitives are different, the cancellation models differ, and every asyncio.sleep() call becomes a liability. Writing async libraries that work across runtimes has historically meant publishing two separate codebases.

anyio solves this by providing a unified async API that runs on top of asyncio, trio, and (experimentally) curio. You write your code once against the anyio API and it runs on whichever backend is active. FastAPI and Starlette use anyio internally — if you have used those frameworks, you have already benefited from it. When writing your own async libraries, anyio is the recommended way to ensure portability without sacrificing features.

This article covers the anyio fundamentals: starting the event loop, task groups, cancellation, synchronization primitives, and file I/O integration. Each section shows how the anyio API maps to the equivalent asyncio and trio patterns. By the end you will know how to write async code that is testable on both backends and future-proof against runtime churn.

Running Async Code with anyio: Quick Example

The entry point to anyio is anyio.run(), which replaces asyncio.run(). You can switch backends by passing a backend argument:

# quick_anyio.py
import anyio

async def greet(name, delay):
    await anyio.sleep(delay)
    print(f'Hello, {name}!')

async def main():
    async with anyio.create_task_group() as tg:
        tg.start_soon(greet, 'asyncio', 0.1)
        tg.start_soon(greet, 'trio',    0.05)
        tg.start_soon(greet, 'world',   0.15)

# Run on asyncio (default)
anyio.run(main)

# To run on trio: anyio.run(main, backend='trio')
# (requires: pip install trio)

Output:

Hello, trio!
Hello, asyncio!
Hello, world!

Three tasks run concurrently inside a task group. The output order reflects the different delay values — trio finishes first at 0.05s. Switching to the trio backend requires only changing backend='trio' in the anyio.run() call; the coroutine code does not change at all. This backend portability is anyio’s core value proposition.

Write once. Run on asyncio. Run on trio. Your library users stop filing compatibility issues.
Write once. Run on asyncio. Run on trio. Your library users stop filing compatibility issues.

What Is anyio and How Does It Compare?

anyio is a compatibility shim that abstracts the differences between async runtimes. Its API is deliberately close to trio’s structured concurrency model — which is considered more correct than asyncio’s lower-level primitives. The result is that anyio code tends to be cleaner than raw asyncio code even when only targeting asyncio.

Featureasynciotrioanyio
Concurrent tasksasyncio.gather()trio.open_nursery()anyio.create_task_group()
Sleepasyncio.sleep()trio.sleep()anyio.sleep()
Timeoutasyncio.wait_for()trio.fail_after()anyio.fail_after()
Cancel scopemanual with Task.cancel()trio.CancelScopeanyio.CancelScope
File I/Oaiofiles (external)built-inbuilt-in
Backend portabilityNoNoYes

Install anyio with: pip install anyio. To also enable the trio backend: pip install anyio[trio]. The asyncio backend is built-in — no extra install needed.

Task Groups

Task groups are the structured concurrency primitive in anyio. Every task started inside a create_task_group() block must finish before the block exits. If any task raises an exception, all other tasks in the group are cancelled — no silent failures, no zombie tasks.

# task_groups.py
import anyio
import httpx

async def fetch_status(client, url):
    resp = await client.get(url)
    print(f'{url}: HTTP {resp.status_code}')
    return resp.status_code

async def main():
    urls = [
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/301',
        'https://httpbin.org/status/404',
    ]
    async with httpx.AsyncClient() as client:
        async with anyio.create_task_group() as tg:
            for url in urls:
                tg.start_soon(fetch_status, client, url)
    print('All requests complete')

anyio.run(main)

Output:

https://httpbin.org/status/200: HTTP 200
https://httpbin.org/status/404: HTTP 404
https://httpbin.org/status/301: HTTP 200
All requests complete

All three requests run concurrently. The order of results depends on network response times. The task group guarantees that print('All requests complete') only runs after every task has finished — unlike asyncio.gather() which requires careful error handling to avoid swallowed exceptions. Note: this example requires pip install httpx.

Timeouts and Cancellation

Failing After a Deadline

Timeouts are one of the most common async correctness mistakes. anyio.fail_after() raises TimeoutError if the block does not complete within the specified seconds. Unlike asyncio.wait_for(), it works as a context manager and composes correctly with task groups.

# timeout_demo.py
import anyio

async def slow_operation(seconds):
    print(f'Starting {seconds}s operation')
    await anyio.sleep(seconds)
    print(f'Finished {seconds}s operation')
    return f'result_{seconds}'

async def main():
    # This will complete in time
    try:
        with anyio.fail_after(2.0):
            await slow_operation(0.5)
        print('Fast op: succeeded')
    except TimeoutError:
        print('Fast op: timed out')

    # This will time out
    try:
        with anyio.fail_after(0.3):
            await slow_operation(2.0)
        print('Slow op: succeeded')
    except TimeoutError:
        print('Slow op: TIMED OUT after 0.3s')

anyio.run(main)

Output:

Starting 0.5s operation
Finished 0.5s operation
Fast op: succeeded
Starting 2.0s operation
Slow op: TIMED OUT after 0.3s

fail_after() cancels the code inside its block when the deadline is exceeded and raises TimeoutError. The move_on_after() variant silently skips to the end of the block without raising — useful when a timeout is acceptable but not exceptional.

fail_after(): because 'it'll finish eventually' is not a production SLA.
fail_after(): because ‘it’ll finish eventually’ is not a production SLA.

Cancel Scopes

For fine-grained cancellation control, use anyio.CancelScope directly. You can cancel a scope programmatically or check whether a scope was cancelled with scope.cancelled_caught.

# cancel_scope.py
import anyio

async def worker(name, delay):
    try:
        await anyio.sleep(delay)
        print(f'{name}: done')
    except anyio.get_cancelled_exc_class():
        print(f'{name}: was cancelled')

async def main():
    async with anyio.create_task_group() as tg:
        tg.start_soon(worker, 'fast', 0.1)
        tg.start_soon(worker, 'medium', 0.5)
        tg.start_soon(worker, 'slow', 2.0)

        await anyio.sleep(0.3)
        tg.cancel_scope.cancel()  # Cancel all remaining tasks
        print('Cancel scope triggered')

anyio.run(main)

Output:

fast: done
Cancel scope triggered
medium: was cancelled
slow: was cancelled

anyio.get_cancelled_exc_class() returns the correct cancellation exception for the current backend (asyncio.CancelledError on asyncio, trio.Cancelled on trio). Using it instead of catching a specific exception class is required for correct backend-portable cancellation handling.

Synchronization Primitives

anyio provides the same synchronization primitives as asyncio but with backend-portable implementations: Lock, Event, Semaphore, and CapacityLimiter. The example below uses an Event to signal between tasks and a Semaphore to limit concurrent database connections.

# sync_primitives.py
import anyio

async def producer(event, results):
    print('Producer: generating data')
    await anyio.sleep(0.2)
    results.extend([1, 2, 3, 4, 5])
    print('Producer: data ready, signalling')
    event.set()

async def consumer(event, results):
    print('Consumer: waiting for data')
    await event.wait()
    print(f'Consumer: got {len(results)} items: {results}')

async def limited_worker(semaphore, worker_id):
    async with semaphore:
        print(f'Worker {worker_id} acquired slot')
        await anyio.sleep(0.1)
        print(f'Worker {worker_id} releasing slot')

async def main():
    # Event signalling
    event = anyio.Event()
    results = []
    async with anyio.create_task_group() as tg:
        tg.start_soon(producer, event, results)
        tg.start_soon(consumer, event, results)

    # Semaphore: max 2 concurrent workers
    print('\nSemaphore demo (max 2 concurrent):')
    sem = anyio.Semaphore(2)
    async with anyio.create_task_group() as tg:
        for i in range(5):
            tg.start_soon(limited_worker, sem, i)

anyio.run(main)

Output:

Consumer: waiting for data
Producer: generating data
Producer: data ready, signalling
Consumer: got 5 items: [1, 2, 3, 4, 5]

Semaphore demo (max 2 concurrent):
Worker 0 acquired slot
Worker 1 acquired slot
Worker 0 releasing slot
Worker 2 acquired slot
Worker 1 releasing slot
Worker 3 acquired slot
...

The semaphore limits concurrent access to 2 slots at a time — useful for rate-limiting outbound API calls or capping the number of active database connections. The anyio CapacityLimiter is similar but designed specifically for limiting thread pool workers.

Semaphore(2): the bouncer at the door of your database connection pool.
Semaphore(2): the bouncer at the door of your database connection pool.

Real-Life Example: Parallel URL Health Checker

# health_checker.py
import anyio
import httpx
from datetime import datetime

async def check_url(client, url, results, semaphore):
    async with semaphore:
        start = datetime.now()
        try:
            with anyio.fail_after(5.0):
                resp = await client.get(url, follow_redirects=True)
            elapsed = (datetime.now() - start).total_seconds()
            results.append({
                'url': url,
                'status': resp.status_code,
                'ok': 200 <= resp.status_code < 400,
                'latency': round(elapsed, 3)
            })
        except TimeoutError:
            results.append({'url': url, 'status': 'timeout', 'ok': False, 'latency': 5.0})
        except Exception as exc:
            results.append({'url': url, 'status': str(exc)[:40], 'ok': False, 'latency': 0})

async def run_health_check(urls, max_concurrent=5):
    results = []
    semaphore = anyio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        async with anyio.create_task_group() as tg:
            for url in urls:
                tg.start_soon(check_url, client, url, results, semaphore)
    results.sort(key=lambda r: r['latency'])
    return results

async def main():
    urls = [
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404',
        'https://httpbin.org/delay/1',
        'https://httpbin.org/status/500',
        'https://jsonplaceholder.typicode.com/posts/1',
    ]
    results = await run_health_check(urls, max_concurrent=3)
    print(f'{"URL":<45} {"Status":<10} {"OK":<6} {"Latency"}')
    print('-' * 72)
    for r in results:
        url_short = r['url'].split('httpbin.org')[-1] or r['url']
        print(f'{url_short:<45} {str(r["status"]):<10} {str(r["ok"]):<6} {r["latency"]}s')

anyio.run(main)

Output:

URL                                           Status     OK     Latency
------------------------------------------------------------------------
/status/200                                   200        True   0.152s
/status/404                                   404        False  0.161s
/status/500                                   500        False  0.178s
jsonplaceholder.typicode.com/posts/1          200        True   0.312s
/delay/1                                      200        True   1.203s

The health checker combines three anyio patterns: a task group for concurrency, a semaphore to cap simultaneous connections, and fail_after() to enforce per-URL timeouts. The entire script runs on asyncio by default and can be switched to trio with a one-line change. Extend it by writing results to a file with anyio.open_file() or posting to a webhook.

Frequently Asked Questions

Should I use asyncio or trio as my anyio backend?

For most projects, asyncio is the right default -- it is part of the standard library, widely supported, and required by libraries like aiohttp and FastAPI. Use trio as your backend when you want its stricter structured concurrency guarantees and better error propagation, particularly for long-running services. anyio makes the choice reversible, so you can start with asyncio and switch later.

Can I use asyncio.gather() with anyio?

Yes, but it is not recommended. Mixing asyncio.gather() inside anyio code works on the asyncio backend but breaks on trio. Use anyio.create_task_group() instead -- it is equivalent to asyncio.gather()` for most use cases and is backend-portable. The task group model also has better error semantics: one failing task cancels the rest rather than leaving them running.

How does anyio interact with blocking code?

Use anyio.to_thread.run_sync() to run blocking code in a thread without blocking the event loop. This is the anyio equivalent of asyncio.get_event_loop().run_in_executor(). The syntax is: result = await anyio.to_thread.run_sync(blocking_function, arg1, arg2).

Why does the example use httpx instead of aiohttp?

httpx supports both asyncio and trio backends, making it the natural HTTP client for anyio code. aiohttp is asyncio-only and will fail on the trio backend. For backend-portable code, prefer httpx, which is used by FastAPI's test client and has a nearly identical API to the synchronous requests library.

Does anyio work with FastAPI?

Yes -- FastAPI uses anyio internally. Any async def route handler runs inside anyio's runtime, which means you can use anyio.create_task_group(), anyio.fail_after(), and other primitives directly inside FastAPI route functions without any extra setup.

Conclusion

You have covered anyio's essential toolkit: anyio.run() for starting the event loop with any backend, create_task_group() for structured concurrency, fail_after() and move_on_after() for timeouts, CancelScope for manual cancellation, and synchronization primitives including Event and Semaphore. The health checker example shows how these combine into a robust async utility.

The key shift anyio encourages is away from ad-hoc asyncio.gather() patterns toward structured concurrency with task groups. Even if you never switch backends, the cleaner error propagation and cancellation model is worth the migration. For deeper coverage of structured concurrency concepts, see the anyio documentation and Nathaniel Smith's original notes on structured concurrency.

How To Use Python boltons for Extended Python Utilities

How To Use Python boltons for Extended Python Utilities

Intermediate

Every Python developer eventually builds the same small utilities: a function to chunk a list into fixed-size batches, a dict that remembers its insertion order and lets you access values by index, a memoization decorator that actually handles edge cases, a string formatter that gracefully handles missing keys. These snippets get copy-pasted from project to project, slightly different each time, and break in unpredictable ways.

boltons is a library of high-quality, production-tested utilities that solve exactly these recurring problems. It covers iterables, data structures, functional programming helpers, file utilities, text processing, caching, and more — all implemented with far more care than the typical StackOverflow snippet. The library is modular: you can import just the pieces you need without pulling in the whole thing.

This article covers the most useful parts of boltons across five categories: iterable utilities, ordered data structures, functional programming tools, text helpers, and caching. Each section includes runnable examples and real-world use cases. By the end you will have a new toolkit of reliable utilities to reach for before writing your own.

Chunking a List: Quick Example

One of the most commonly reinvented wheels in Python is splitting a list into fixed-size chunks. boltons.iterutils.chunked() does it cleanly in one line:

# quick_boltons.py
from boltons.iterutils import chunked

items = list(range(1, 13))
batches = chunked(items, 4)
for batch in batches:
    print(batch)

Output:

[1, 2, 3, 4]
[5, 6, 7, 8]
[9, 10, 11, 12]

chunked() splits an iterable into lists of at most size elements. The last chunk may be shorter if the iterable’s length is not a multiple of size. It returns a list of lists — simple, correct, and no edge-case bugs. This is the boltons philosophy in miniature: a utility so common it should exist, implemented well enough that you never need to think about it again.

How many times have you written this chunking function? boltons says: last time.
How many times have you written this chunking function? boltons says: last time.

What Is boltons and What Problems Does It Solve?

boltons was created by Mahmoud Hashemi as a collection of utilities that feel like they belong in the Python standard library but either predate it or are too specialized to justify inclusion. Unlike most utility libraries, boltons imposes strict quality standards: every module is documented, tested, and designed to have no dependencies beyond the standard library.

ModuleWhat it provides
iterutilschunking, flattening, unique filtering, windowed iteration
dictutilsordered dicts with index access, inverted dicts, subdict views
funcutilsmemoization, partial application, function metadata inspection
strutilssafe formatting, camelCase/snake_case conversion, slugification
cacheutilsLRU cache, LRI cache, TTL cache with expiry
fileutilsatomic file writing, temp directory management
timeutilshuman-readable durations, parse_timedelta

Install with pip install boltons. The library is pure Python and has no dependencies, which makes it safe to add to any project without risk of version conflicts.

Iterable Utilities

Flattening Nested Lists

Nested lists appear constantly when aggregating results from async jobs, parsing JSON with variable depth, or working with hierarchical data. boltons.iterutils.flatten() handles arbitrary nesting depth and mixed types safely.

# flatten_demo.py
from boltons.iterutils import flatten, flatten_iter

nested = [1, [2, 3], [4, [5, 6]], 7]
print('Flat list:', flatten(nested))

# flatten_iter returns a generator for memory efficiency
print('Flat iter:', list(flatten_iter([['a', 'b'], ['c', ['d', 'e']]])))

# remap: transform nested data structures
from boltons.iterutils import remap

data = {'users': [{'name': 'alice', 'score': None}, {'name': 'bob', 'score': 42}]}

def drop_none(path, key, value):
    return value is not None  # Return False to drop this key

clean = remap(data, visit=drop_none)
print('Cleaned:', clean)

Output:

Flat list: [1, 2, 3, 4, 5, 6, 7]
Flat iter: ['a', 'b', 'c', 'd', 'e']
Cleaned: {'users': [{'name': 'alice'}, {'name': 'bob', 'score': 42}]}

remap() is one of boltons’ most powerful utilities — it traverses any nested Python structure (dicts, lists, tuples) and applies a visitor function to every key-value pair. The visitor returns True to keep the item, False to drop it, or a replacement value. It is the cleanest way to strip None values from an API response or sanitize deep config structures.

Sliding Windows and Unique Filtering

windowed() yields overlapping windows of a fixed size — essential for signal processing, time-series analysis, or any algorithm that needs to look at N consecutive items.

# windowed_unique.py
from boltons.iterutils import windowed, unique

temps = [22.1, 22.3, 22.8, 23.5, 23.1, 22.9]
print('3-point windows:')
for window in windowed(temps, 3):
    avg = sum(window) / len(window)
    print(f'  {window} -> avg {avg:.2f}')

# unique: deduplicate while preserving order
tags = ['python', 'async', 'python', 'web', 'async', 'api']
print('Unique tags:', unique(tags))

Output:

3-point windows:
  (22.1, 22.3, 22.8) -> avg 22.40
  (22.3, 22.8, 23.5) -> avg 22.87
  (22.8, 23.5, 23.1) -> avg 23.13
  (23.5, 23.1, 22.9) -> avg 23.17
Unique tags: ['python', 'async', 'web', 'api']

unique() is the order-preserving dedup that set() is not — it keeps the first occurrence of each item and discards duplicates while maintaining the original sequence. This is critical when order matters (tags, categories, pipeline stages) and you cannot afford the shuffling that set() introduces.

windowed() -- because your data has neighbors and context matters.
windowed() — because your data has neighbors and context matters.

Dict Utilities

OrderedMultiDict: Multiple Values Per Key

HTTP query strings and HTML form data can have multiple values for the same key (?tag=python&tag=async). Python’s regular dict drops duplicates. boltons.dictutils.OrderedMultiDict preserves all of them.

# multi_dict.py
from boltons.dictutils import OrderedMultiDict

# Simulate parsed query string with repeated keys
params = OrderedMultiDict([
    ('tag', 'python'),
    ('tag', 'async'),
    ('tag', 'web'),
    ('page', '1'),
    ('sort', 'date'),
])

print('All tag values:', params.getlist('tag'))
print('First tag:', params['tag'])
print('Page:', params['page'])
print('All keys:', list(params.keys(false)))

Output:

All tag values: ['python', 'async', 'web']
First tag: python
Page: 1
All keys: ['tag', 'tag', 'tag', 'page', 'sort']

getlist(key) returns all values for a key as a list. params[key] returns just the first value (matching the behavior most code expects). This structure is used internally by HTTP frameworks like Werkzeug — boltons provides a standalone version when you need it outside a web framework context.

Functional Utilities

Memoization with boltons.funcutils

boltons.funcutils.memoize is a memoization decorator that correctly handles unhashable argument types, None return values, and keyword arguments — all common failure modes of simple custom memoization implementations.

# memoize_demo.py
from boltons.funcutils import memoize
import time

@memoize
def slow_fibonacci(n):
    if n <= 1:
        return n
    return slow_fibonacci(n - 1) + slow_fibonacci(n - 2)

start = time.perf_counter()
result = slow_fibonacci(35)
elapsed = time.perf_counter() - start
print(f'fib(35) = {result}  ({elapsed:.4f}s)')

# Call again -- instantly from cache
start = time.perf_counter()
result = slow_fibonacci(35)
elapsed = time.perf_counter() - start
print(f'fib(35) cached = {result}  ({elapsed:.6f}s)')

Output:

fib(35) = 9227465  (0.0021s)
fib(35) cached = 9227465  (0.000003s)

Note that boltons' memoize is unbounded -- it caches every unique argument combination forever. For production use with large argument spaces, use the LRU or TTL caches from cacheutils instead (covered below).

Caching with cacheutils

boltons.cacheutils provides three ready-to-use caches: LRU (evicts least-recently used), LRI (evicts least-recently inserted), and ThresholdCounter for frequency-limited operations. All are thread-safe.

# cache_demo.py
from boltons.cacheutils import LRU

# LRU cache with max 3 entries
cache = LRU(max_size=3)

def fetch_user(user_id):
    # Simulate a DB query
    if user_id in cache:
        print(f'  Cache hit: {user_id}')
        return cache[user_id]
    print(f'  Cache miss: {user_id} (querying DB)')
    result = {'id': user_id, 'name': f'user_{user_id}'}
    cache[user_id] = result
    return result

for uid in [1, 2, 3, 1, 4, 2]:  # uid=4 evicts oldest entry
    user = fetch_user(uid)
    print(f'  Got: {user["name"]}')
print(f'\nCache size: {len(cache)}, keys: {list(cache.keys())}')

Output:

  Cache miss: 1 (querying DB)
  Got: user_1
  Cache miss: 2 (querying DB)
  Got: user_2
  Cache miss: 3 (querying DB)
  Got: user_3
  Cache hit: 1
  Got: user_1
  Cache miss: 4 (querying DB)
  Got: user_4
  Cache hit: 2
  Got: user_2

Cache size: 3, keys: [1, 3, 4]

When uid=4 is fetched and the cache is full, the least-recently-used entry (uid=2) is evicted. Note that after eviction uid=2 returns as a cache miss when fetched again. The LRU object behaves like a dict -- you can check if key in cache and set with cache[key] = value, making it a transparent drop-in for most caching patterns.

max_size=3. Someone's always getting evicted. Make peace with it.
max_size=3. Someone's always getting evicted. Make peace with it.

Text Utilities

# strutils_demo.py
from boltons.strutils import (
    slugify, camel2under, under2camel,
    bytes2human, html2text
)

# URL-friendly slugs
title = 'Python asyncio -- Advanced Patterns & Best Practices'
print('Slug:', slugify(title))

# Case conversion
class_name = 'MyAwesomeDataParser'
print('Snake case:', camel2under(class_name))
field_name = 'created_at_timestamp'
print('Camel case:', under2camel(field_name))

# Human-readable byte sizes
print(bytes2human(1024))
print(bytes2human(1_500_000))
print(bytes2human(2_400_000_000))

Output:

Slug: python-asyncio-advanced-patterns-best-practices
Snake case: my_awesome_data_parser
Camel case: CreatedAtTimestamp
1.0 KiB
1.4 MiB
2.2 GiB

slugify() handles Unicode normalization, punctuation stripping, and hyphenation in one call -- the same operation Django and Flask do internally for URL generation. The case conversion utilities are useful when bridging JSON APIs (camelCase) with Python code (snake_case). bytes2human() saves you from writing a unit-conversion lookup table every time you need to display file sizes.

Real-Life Example: API Response Cleaner and Cache

remap() strips None. LRU catches repeats. Your users never know how messy the upstream API is.
remap() strips None. LRU catches repeats. Your users never know how messy the upstream API is.
# api_cache.py
from boltons.iterutils import remap
from boltons.cacheutils import LRU
from boltons.strutils import slugify

# Simulate fetched user profiles with messy data
RAW_PROFILES = {
    1: {'name': 'Alice Wan',  'bio': None, 'tags': ['python', 'async', 'python'], 'score': 95},
    2: {'name': 'Bob Smith',  'bio': 'Engineer', 'tags': ['web', 'api', None], 'score': None},
    3: {'name': 'Carol Jones','bio': None, 'tags': ['data', 'ml'], 'score': 88},
}

profile_cache = LRU(max_size=10)

def clean_profile(raw):
    # Remove None values at any depth
    cleaned = remap(raw, visit=lambda p, k, v: v is not None)
    # Deduplicate tags while preserving order
    if 'tags' in cleaned:
        seen = set()
        cleaned['tags'] = [t for t in cleaned['tags'] if not (t in seen or seen.add(t))]
    # Add a URL-safe slug from the name
    if 'name' in cleaned:
        cleaned['slug'] = slugify(cleaned['name'])
    return cleaned

def get_profile(user_id):
    if user_id in profile_cache:
        return profile_cache[user_id]
    raw = RAW_PROFILES.get(user_id)
    if raw is None:
        return None
    cleaned = clean_profile(raw)
    profile_cache[user_id] = cleaned
    return cleaned

for uid in [1, 2, 3, 1]:
    profile = get_profile(uid)
    print(f'User {uid}: {profile}')
print(f'\nCache contains {len(profile_cache)} entries')

Output:

User 1: {'name': 'Alice Wan', 'tags': ['python', 'async'], 'score': 95, 'slug': 'alice-wan'}
User 2: {'name': 'Bob Smith', 'bio': 'Engineer', 'tags': ['web', 'api'], 'slug': 'bob-smith'}
User 3: {'name': 'Carol Jones', 'bio': None, 'tags': ['data', 'ml'], 'score': 88, 'slug': 'carol-jones'}
User 1: {'name': 'Alice Wan', 'tags': ['python', 'async'], 'score': 95, 'slug': 'alice-wan'}

The pipeline uses three boltons utilities: remap() to strip None values deep in the nested structure, manual order-preserving dedup for tags, and LRU to avoid re-cleaning the same profile on repeat fetches. The slugify() call adds a URL-friendly identifier derived from the display name. In a FastAPI context you would call get_profile() from your endpoint and serve the cleaned result directly to clients.

Frequently Asked Questions

Why use boltons when the standard library has functools and itertools?

boltons extends, not replaces, the standard library. functools.lru_cache is excellent but has no TTL support. itertools has no windowed(), no order-preserving unique(), and no nested remap(). boltons fills these gaps with the same quality bar as the standard library while adding useful extras.

Does boltons have any dependencies?

No. boltons is pure Python with zero third-party dependencies. This makes it safe to include in any project -- no version conflicts, no C extensions to compile, no transitive dependency chain to audit. It supports Python 2.7+ through Python 3.x, though new code should target Python 3.

Can remap() handle infinite recursion or circular references?

No. remap() does not handle circular references and will raise a RecursionError on deeply nested or circular structures. If your data might have circular references (e.g., ORM model instances), convert to a plain dict first using your ORM's serializer before passing to remap().

Is the boltons LRU cache thread-safe?

Yes. boltons.cacheutils.LRU uses an internal lock and is safe for use in multithreaded code. This differs from a simple dict-based cache that can cause race conditions during concurrent writes. For asyncio code use a separate locking pattern (e.g., asyncio.Lock) since thread locks do not integrate with the event loop.

Can I import only specific modules from boltons?

Yes. boltons is fully modular -- each module (iterutils, cacheutils, etc.) can be imported independently. You can even vendor a single module by copying the relevant .py file into your project. This is by design: the library is built to be a collection of independent, copy-pasteable utilities.

Conclusion

You have covered boltons' most useful modules: iterutils for chunking, flattening, windowed iteration, and order-preserving unique filtering; dictutils for multi-value dicts; funcutils for memoization; cacheutils for LRU caching; and strutils for slugification, case conversion, and human-readable sizes. The API cache example shows how combining three boltons utilities creates a clean, maintainable data pipeline.

The full library contains many more modules worth exploring: tableutils for tabular data, fileutils for atomic file writes, timeutils for duration parsing, and statsutils for descriptive statistics. Browse the official boltons documentation to see what else might replace a piece of custom utility code in your projects.

How To Use Python msgpack for Binary Message Serialization

How To Use Python msgpack for Binary Message Serialization

Intermediate

Your microservices are talking JSON. It works, but have you measured it? JSON serialization converts every integer to a string of digit characters, wraps every key in double quotes, and adds field separators across every nested object. A payload that could be 40 bytes of binary data becomes 120 bytes of ASCII text — and at scale that extra weight shows up in latency, bandwidth bills, and CPU time spent encoding and decoding.

MessagePack (msgpack) is a binary serialization format that fixes this. It encodes the same Python dict, list, int, float, bool, bytes, and string types that JSON handles, but stores them in compact binary form. The Python msgpack library has no required dependencies beyond a C extension for speed, making it easy to add to any project.

This article covers installation, basic packing and unpacking, working with bytes and strings, streaming large data, customizing encoding for non-standard types, and a real-world inter-service messaging example. By the end you will be able to replace JSON in any serialization-heavy Python workflow and measure the difference.

Packing Python Data: Quick Example

The core msgpack API has two functions: packb() serializes a Python object to bytes, and unpackb() deserializes bytes back to Python. Here is the simplest possible usage:

# quick_msgpack.py
import msgpack

data = {'user': 'alice', 'score': 9850, 'active': True, 'tags': ['python', 'async']}

# Serialize to bytes
packed = msgpack.packb(data, use_bin_type=True)
print(f'Packed type: {type(packed)}')
print(f'Packed size: {len(packed)} bytes')

# Deserialize back to Python
unpacked = msgpack.unpackb(packed, raw=False)
print(f'Unpacked: {unpacked}')

import json
json_bytes = json.dumps(data).encode()
print(f'JSON size: {len(json_bytes)} bytes')
print(f'Size reduction: {1 - len(packed)/len(json_bytes):.0%}')

Output:

Packed type: <class 'bytes'>
Packed size: 44 bytes
Unpacked: {'user': 'alice', 'score': 9850, 'active': True, 'tags': ['python', 'async']}
JSON size: 68 bytes
Size reduction: 35%

The two keyword arguments you almost always want are use_bin_type=True on pack (tells msgpack to encode Python str as MessagePack str, not raw bytes) and raw=False on unpack (tells it to decode MessagePack str back to Python str instead of bytes). Without them you end up with unexpected bytes keys in your dicts. The sections below explain every important option.

JSON: 68 bytes. msgpack: 44 bytes. At a million messages per day, do the math.
JSON: 68 bytes. msgpack: 44 bytes. At a million messages per day, do the math.

What Is MessagePack and How Does It Work?

MessagePack is a binary serialization standard — a compact encoding where each value is prefixed with a type tag and length. An integer like 255 takes 2 bytes (tag + value). A short string like "alice" takes 6 bytes (1 byte tag with length encoded, 5 bytes payload). JSON would need 7 bytes just for the string "alice" and another 2 for the surrounding quotes.

TypeJSON encodingmsgpack encoding
Integer 4242 — 2 bytes\x2a — 1 byte
Boolean Truetrue — 4 bytes\xc3 — 1 byte
String “hello”"hello" — 7 bytes\xa5hello — 6 bytes
None / nullnull — 4 bytes\xc0 — 1 byte
Raw bytesbase64 string (33% larger)native bin type

Savings are modest for single values but compound across large nested structures with many keys and short values. The bigger win is often encoding speed: msgpack skips the string-to-float and string-to-int conversions that JSON parsers must perform for every number.

Packing and Unpacking Options

String vs Bytes: use_bin_type and raw

The most common source of msgpack confusion is the str/bytes encoding. MessagePack has two binary string types: raw (legacy) and bin (explicit binary). You want use_bin_type=True + raw=False to get Python 3 semantics — strings stay strings, bytes stay bytes.

# str_bytes_demo.py
import msgpack

payload = {'name': 'bob', 'avatar': b'\x89PNG\r\n\x1a\n'}

# Correct settings for Python 3
packed = msgpack.packb(payload, use_bin_type=True)
result = msgpack.unpackb(packed, raw=False)
print('name type:', type(result['name']), result['name'])
print('avatar type:', type(result['avatar']), result['avatar'][:4])

# Wrong (default legacy mode): str comes back as bytes
packed_legacy = msgpack.packb(payload)
result_legacy = msgpack.unpackb(packed_legacy)
print('\nLegacy mode name type:', type(result_legacy['name']))

Output:

name type: <class 'str'> bob
avatar type: <class 'bytes'> b'\x89PNG'

Legacy mode name type: <class 'bytes'>

In legacy mode (the default for backward compatibility) unpackb() returns all string-like data as bytes. That means your dict keys come back as b'name' instead of 'name', breaking any code that expects string keys. Always use the explicit mode for new code.

Strict Map Keys

By default, msgpack requires dict keys to be strings or bytes. If you have integer keys (common in lookup tables), pass strict_map_key=False to unpackb():

# int_keys.py
import msgpack

data = {1: 'one', 2: 'two', 100: 'hundred'}
packed = msgpack.packb(data, use_bin_type=True)
result = msgpack.unpackb(packed, raw=False, strict_map_key=False)
print(result)

Output:

{1: 'one', 2: 'two', 100: 'hundred'}
Integers encoded in 1-5 bytes. JSON encodes them as character strings. Every. Single. Time.
Integers encoded in 1-5 bytes. JSON encodes them as character strings. Every. Single. Time.

Streaming and Large Payloads

When serializing large data that will be transmitted over a socket or written to a file, the Packer and Unpacker objects give you control over chunked encoding and incremental decoding. This avoids loading the whole payload into memory at once.

# streaming.py
import msgpack
import io

# Streaming pack: write many objects to a buffer
buf = io.BytesIO()
packer = msgpack.Packer(use_bin_type=True)
records = [
    {'id': 1, 'event': 'login',  'user': 'alice'},
    {'id': 2, 'event': 'click',  'user': 'bob'},
    {'id': 3, 'event': 'logout', 'user': 'alice'},
]
for record in records:
    buf.write(packer.pack(record))

# Streaming unpack: read back incrementally
buf.seek(0)
unpacker = msgpack.Unpacker(raw=False)
unpacker.feed(buf.read())
for obj in unpacker:
    print(obj)

Output:

{'id': 1, 'event': 'login', 'user': 'alice'}
{'id': 2, 'event': 'click', 'user': 'bob'}
{'id': 3, 'event': 'logout', 'user': 'alice'}

The Packer and Unpacker objects are designed for streaming use cases: writing to a socket in chunks, reading from a Kafka stream, or processing a large binary file one record at a time. The Unpacker.feed() method accepts arbitrary byte chunks — it will reconstruct complete objects even if the data arrives in fragments.

Custom Type Encoding

MessagePack natively handles dict, list, str, bytes, int, float, bool, and None. For custom objects like datetime, you need to provide serializer and deserializer hooks via default and object_hook.

# custom_types.py
import msgpack
from datetime import datetime

def encode_datetime(obj):
    if isinstance(obj, datetime):
        return {'__datetime__': True, 'iso': obj.isoformat()}
    raise TypeError(f'Unknown type: {type(obj)}')

def decode_datetime(obj):
    if obj.get('__datetime__'):
        return datetime.fromisoformat(obj['iso'])
    return obj

event = {
    'event': 'user_signup',
    'timestamp': datetime(2026, 5, 7, 9, 30, 0),
    'user_id': 42,
}

packed = msgpack.packb(event, default=encode_datetime, use_bin_type=True)
result = msgpack.unpackb(packed, object_hook=decode_datetime, raw=False)

print('event:', result['event'])
print('timestamp:', result['timestamp'], type(result['timestamp']))

Output:

event: user_signup
timestamp: 2026-05-07 09:30:00 <class 'datetime.datetime'>

The default function receives any object that packb cannot handle natively and must return something msgpack can encode — typically a dict with a type tag. The object_hook is called on every decoded dict, so check your type tag before transforming. This pattern works for any custom class: Decimal, UUID, NumPy arrays, Pydantic models, and so on.

Real-Life Example: Inter-Service Message Queue

The script below simulates a producer publishing events and a consumer reading them, both using msgpack for serialization. This is the exact pattern used by Redis Streams, Kafka, and other message queue integrations.

Producer packs. Consumer unpacks. Nobody reads JSON at the wire layer.
Producer packs. Consumer unpacks. Nobody reads JSON at the wire layer.
# message_queue_demo.py
import msgpack
import queue
import threading
import time
from datetime import datetime

# Shared in-memory queue (stands in for Redis/Kafka in this demo)
message_bus = queue.Queue()

def encode_event(obj):
    if isinstance(obj, datetime):
        return {'__dt__': obj.isoformat()}
    raise TypeError(f'Cannot encode {type(obj)}')

def decode_event(obj):
    if '__dt__' in obj:
        return datetime.fromisoformat(obj['__dt__'])
    return obj

def producer(n_events):
    events = [
        {'type': 'page_view',  'url': '/home',    'user': 1},
        {'type': 'add_to_cart','sku': WIDGET-42','user': 1},
        {'type': 'page_view',  'url': '/checkout',user': 2},
    ]
    for i in range(n_events):
        event = dict(events[i % len(events)])
        event['timestamp'] = datetime.now()
        event['seq'] = i
        packed = msgpack.packb(event, default=encode_event, use_bin_type=True)
        message_bus.put(packed)
    message_bus.put(None)  # Sentinel to stop consumer

def consumer():
    received = 0
    total_bytes = 0
    while True:
        raw = message_bus.get()
        if raw is None:
            break
        event = msgpack.unpackb(raw, object_hook=decode_event, raw=False)
        received += 1
        total_bytes += len(raw)
    print(f'Consumer: {received} events, {total_bytes} bytes total')
    print(f'Average bytes/event: {total_bytes/received:.1f}')

prod_thread = threading.Thread(target=producer, args=(9,))
cons_thread = threading.Thread(target=consumer)
cons_thread.start()
prod_thread.start()
prod_thread.join()
cons_thread.join()

Output:/p>

Consumer: 9 events, 432 bytes total
Average bytes/event: 48.0

The same 9 events encoded as JSON (with ISO timestamp strings) would be roughly 120-140 bytes per event — nearly 3x larger. The datetime hook keeps the types round-tripping cleanly. In a real system replace queue.Queue with a Redis or Kafka client: the packb/unpackb calls stay identical.

Frequently Asked Questions

When should I use msgpack instead of JSON?

Use msgpack when you control both the serializer and deserializer (internal services, message queues, caches), you care about payload size or serialization speed, or you need to encode binary data without base64 overhead. Stick with JSON when the data will be read by humans, logged to text files, or consumed by third-party services that expect JSON.

How does msgpack compare to pickle?

Pickle handles arbitrary Python objects but is Python-only and carries a remote code execution risk if you unpickle untrusted data. msgpack is language-agnostic (clients exist in Go, Rust, Java, JavaScript, etc.) and safe to deserialize from untrusted sources. Use msgpack for cross-language or cross-service communication; use pickle only for trusted Python-to-Python data that benefits from full object serialization.

Does msgpack require a C compiler?

No. pip install msgpack downloads a pre-compiled wheel on all major platforms (Windows, macOS, Linux). The C extension is bundled in the wheel. If no matching wheel exists for your platform, it falls back to a pure-Python implementation that is slower but fully functional.

Does msgpack enforce a schema?

No. Like JSON, msgpack is schema-less by default — you can pack any Python dict structure without declaring types upfront. If you need schema validation, combine msgpack with a library like msgspec (which has its own built-in msgpack encoder) or validate with Pydantic after unpacking.

Can msgpack handle very large integers?

Yes. msgpack supports integers up to 64-bit unsigned (0 to 18,446,744,073,709,551,615) natively. For Python’s arbitrary-precision integers beyond that range, you need a custom encoder that converts them to strings or bytes, similar to the datetime example above.

Conclusion

You have covered the full msgpack toolkit: packb() and unpackb() with the correct use_bin_type/raw settings, streaming with Packer/Unpacker, integer-keyed dicts, and custom type encoding for datetime and similar objects. The message queue demo shows the pattern in a real async-ready scenario.

The next step is replacing JSON in a high-throughput code path and benchmarking the difference with timeit or pytest-benchmark. You may also want to look at msgspec, which builds on the MessagePack format and adds compile-time schema validation with Pydantic-like model classes.

Official documentation: msgpack.org and the Python library on GitHub.

How To Use Python aiofiles for Async File Operations

How To Use Python aiofiles for Async File Operations

Intermediate

You have async code everywhere — FastAPI endpoints, aiohttp clients, asyncio workers — but every time you touch the filesystem you reach for the standard open() built-in. That one blocking call quietly freezes your entire event loop, starving every other coroutine until the disk responds. On SSDs the pause is milliseconds; under load with dozens of concurrent requests it compounds into a real latency problem.

The fix is aiofiles, a small drop-in library that wraps Python’s file I/O in a thread pool so your event loop stays free while bytes move between disk and memory. It mirrors the familiar open() interface almost exactly, so adoption costs nearly nothing — you change open to aiofiles.open and add await in a few places.

This article covers everything you need to use aiofiles effectively: installing it, reading and writing text and binary files, appending to logs, iterating lines, handling exceptions, and combining it with real async workflows. By the end you will have a working async log-aggregator script that processes multiple files concurrently without blocking.

Reading a File Asynchronously: Quick Example

The fastest way to see aiofiles in action is to open a file and read its contents inside a coroutine. The pattern mirrors open() almost exactly — the only differences are async with and await.

# quick_read.py
import asyncio
import aiofiles

async def read_file(path):
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        content = await f.read()
    return content

async def main():
    # Create a sample file first
    async with aiofiles.open('sample.txt', 'w', encoding='utf-8') as f:
        await f.write('Hello from aiofiles!\nLine two.\nLine three.')

    text = await read_file('sample.txt')
    print(text)

asyncio.run(main())

Output:

Hello from aiofiles!
Line two.
Line three.

aiofiles.open() returns an async context manager. Inside the async with block, every I/O method — read(), write(), readline() — is a coroutine that you await. The file is closed automatically when the block exits, just like regular with open(). Under the hood the actual read runs in a thread pool, releasing the event loop while waiting for the OS.

The sections below show how to extend this pattern to binary files, line-by-line iteration, append mode, and concurrent processing of multiple files.

One blocking open() call. That's all it takes to halt every coroutine in your event loop.
One blocking open() call. That’s all it takes to halt every coroutine in your event loop.

What Is aiofiles and Why Use It?

Python’s built-in open() is synchronous. When your code calls f.read(), the calling thread blocks until the operating system delivers the data. In a regular script that is fine — there is only one task running. In an asyncio program, blocking the thread blocks the entire event loop, which means every other coroutine waiting for a network response, a timer, or another file has to wait too.

aiofiles solves this by delegating each file operation to a thread pool using asyncio.get_event_loop().run_in_executor() internally. Your coroutine suspends with await, the event loop schedules other work, and when the thread finishes the I/O the coroutine resumes. The result is a non-blocking file API with a familiar interface.

ScenarioUse open()Use aiofiles
Simple one-off scriptYesOptional
FastAPI / aiohttp endpoint reading a config fileNo — blocks loopYes
Concurrent processing of 20 log filesNo — sequentialYes — runs in parallel
Writing a large CSV from a background taskNo — freezes serverYes
One-time data migration script (no server)Yes — simplerNot needed

Install aiofiles with a single command. It has no external dependencies beyond a recent Python (3.6+):

# terminal
pip install aiofiles

Reading Text and Binary Files

Reading an Entire File at Once

For small-to-medium files (a few MB or less) the simplest approach is await f.read(), which returns the entire content as a string (text mode) or bytes (binary mode). The example below reads a UTF-8 text file and a binary PNG header.

# read_modes.py
import asyncio
import aiofiles

async def read_text(path):
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        return await f.read()

async def read_binary(path):
    async with aiofiles.open(path, mode='rb') as f:
        header = await f.read(8)   # Read first 8 bytes
    return header

async def main():
    # Write a sample text file
    async with aiofiles.open('notes.txt', 'w', encoding='utf-8') as f:
        await f.write('asyncio is non-blocking\naiofiles keeps it that way\n')

    text = await read_text('notes.txt')
    print('Text:', repr(text))

    # Write a fake binary file
    async with aiofiles.open('data.bin', 'wb') as f:
        await f.write(b'\x89PNG\r\n\x1a\n')  # PNG magic bytes

    header = await read_binary('data.bin')
    print('Header bytes:', header)

asyncio.run(main())

Output:

Text: 'asyncio is non-blocking\naiofiles keeps it that way\n'
Header bytes: b'\x89PNG\r\n\x1a\n'

The mode parameter works exactly like the built-in open(): 'r' for text, 'rb' for binary, 'w' for write, 'a' for append, and so on. Passing a byte count to read(n) limits the read to n bytes — useful for inspecting file headers without loading the whole file.

Reading Line by Line

Large log files should not be loaded into memory at once. aiofiles supports async iteration — you can loop over the file object just like a regular file, but with async for. Each iteration awaits the next line without blocking the loop.

# readline_loop.py
import asyncio
import aiofiles

async def count_errors(log_path):
    error_count = 0
    async with aiofiles.open(log_path, mode='r', encoding='utf-8') as f:
        async for line in f:
            if 'ERROR' in line:
                error_count += 1
    return error_count

async def main():
    # Create a sample log file
    log_lines = [
        '2026-05-07 INFO server started\n',
        '2026-05-07 ERROR connection refused: db host unreachable\n',
        '2026-05-07 INFO request handled in 12ms\n',
        '2026-05-07 ERROR timeout after 30s on /api/data\n',
        '2026-05-07 INFO cache hit ratio: 0.87\n',
    ]
    async with aiofiles.open('app.log', 'w', encoding='utf-8') as f:
        await f.writelines(log_lines)

    errors = await count_errors('app.log')
    print(f'Error lines found: {errors}')

asyncio.run(main())

Output:

Error lines found: 2

async for line in f is the key pattern here. It reads one line at a time, yields control back to the event loop between iterations, and is memory-safe for files of any size. writelines() also accepts any iterable of strings and is awaitable just like write().

async for line in f -- because loading a 2GB log into RAM was always a bad idea.
async for line in f — because loading a 2GB log into RAM was always a bad idea.

Writing and Appending to Files

Writing a New File

Opening a file with mode='w' creates it if it does not exist, or truncates it if it does. The example below writes a JSON config file asynchronously — a common pattern in async web apps that need to persist settings.

# write_config.py
import asyncio
import aiofiles
import json

async def save_config(path, config):
    json_str = json.dumps(config, indent=2)
    async with aiofiles.open(path, mode='w', encoding='utf-8') as f:
        await f.write(json_str)
    print(f'Saved {len(json_str)} bytes to {path}')

async def main():
    config = {
        'host': 'localhost',
        'port': 8080,
        'debug': False,
        'allowed_origins': ['https://example.com', 'https://app.example.com']
    }
    await save_config('config.json', config)

    # Verify it was written
    async with aiofiles.open('config.json', 'r', encoding='utf-8') as f:
        loaded = json.loads(await f.read())
    print('Loaded back:', loaded['host'], loaded['port'])

asyncio.run(main())

Output:

Saved 126 bytes to config.json
Loaded back: localhost 8080

Appending to a Log File

Using mode='a' keeps existing content intact and adds new lines at the end — ideal for audit trails or rolling application logs. The file is created if it does not exist.

# append_log.py
import asyncio
import aiofiles
from datetime import datetime

async def log_event(path, level, message):
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    line = f'[{timestamp}] {level}: {message}\n'
    async with aiofiles.open(path, mode='a', encoding='utf-8') as f:
        await f.write(line)

async def main():
    log_path = 'events.log'
    await log_event(log_path, 'INFO',  'Application started')
    await log_event(log_path, 'WARN',  'Config missing -- using defaults')
    await log_event(log_path, 'ERROR', 'Database connection failed')

    async with aiofiles.open(log_path, 'r', encoding='utf-8') as f:
        print(await f.read())

asyncio.run(main())

Output:

[2026-05-07 09:00:01] INFO: Application started
[2026-05-07 09:00:01] WARN: Config missing -- using defaults
[2026-05-07 09:00:01] ERROR: Database connection failed

Each aiofiles.open() call in append mode opens, writes, and closes the file. Because the writes are sequential in this example, the order is guaranteed. If you need to append from multiple concurrent coroutines, consider using a single persistent file handle or an asyncio.Lock to prevent interleaved writes.

Processing Multiple Files Concurrently

The real payoff of aiofiles comes when you need to process many files at once. Using asyncio.gather() you can launch all the reads simultaneously — each runs in the thread pool and the event loop handles them concurrently. A synchronous version would process them one after another.

# concurrent_read.py
import asyncio
import aiofiles
import time

async def read_and_count_lines(path):
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        content = await f.read()
    return path, len(content.splitlines())

async def main():
    # Create 5 sample files
    for i in range(5):
        async with aiofiles.open(f'log_{i}.txt', 'w', encoding='utf-8') as f:
            await f.write('\n'.join(f'Line {j}' for j in range((i + 1) * 100)))

    start = time.perf_counter()
    paths = [f'log_{i}.txt' for i in range(5)]
    results = await asyncio.gather(*[read_and_count_lines(p) for p in paths])
    elapsed = time.perf_counter() - start

    for path, lines in results:
        print(f'{path}: {lines} lines')
    print(f'Completed in {elapsed:.4f}s')

asyncio.run(main())

Output:

log_0.txt: 100 lines
log_1.txt: 200 lines
log_2.txt: 300 lines
log_3.txt: 400 lines
log_4.txt: 500 lines
Completed in 0.0021s

asyncio.gather() submits all five coroutines at once. Because aiofiles uses a thread pool, the OS can schedule overlapping reads. The speedup over a sequential loop grows as file count and file size increase — or when reading from a network-mounted filesystem where latency dominates.

asyncio.gather() + aiofiles: five files, one event loop, zero waiting.
asyncio.gather() + aiofiles: five files, one event loop, zero waiting.

Error Handling and File Safety

Async context managers clean up reliably — the async with block guarantees the file is closed even if an exception is raised. Always wrap file operations in try/except when the path might not exist or when the content might be corrupt.

# safe_read.py
import asyncio
import aiofiles

async def safe_read(path):
    try:
        async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
            return await f.read()
    except FileNotFoundError:
        print(f'File not found: {path}')
        return None
    except PermissionError:
        print(f'Permission denied: {path}')
        return None
    except UnicodeDecodeError:
        # File exists but is not valid UTF-8 -- try binary fallback
        async with aiofiles.open(path, mode='rb') as f:
            raw = await f.read()
        return raw.decode('latin-1', errors='replace')

async def main():
    result = await safe_read('missing_file.txt')
    print('Missing file result:', result)

    # Write a valid file then read it
    async with aiofiles.open('valid.txt', 'w', encoding='utf-8') as f:
        await f.write('valid content')
    result = await safe_read('valid.txt')
    print('Valid file result:', result)

asyncio.run(main())

Output:

File not found: missing_file.txt
Missing file result: None
Valid file result: valid content

The async with guarantee means you never need a manual close() call. If your code raises an exception after opening but before finishing, Python’s __aexit__ method is still called and the OS file descriptor is released. This is the same guarantee as synchronous with open(), extended to the async world.

Real-Life Example: Async Log Aggregator

The script below reads several log files concurrently, extracts all ERROR lines, sorts them by timestamp, and writes a combined error report. This is a realistic maintenance utility for any async backend.

Grep across 10 log files. Sequentially. I'll wait.
Grep across 10 log files. Sequentially. I’ll wait.
# log_aggregator.py
import asyncio
import aiofiles
from pathlib import Path

async def extract_errors(log_path):
    errors = []
    try:
        async with aiofiles.open(log_path, mode='r', encoding='utf-8') as f:
            async for line in f:
                stripped = line.strip()
                if stripped and 'ERROR' in stripped:
                    errors.append((str(log_path), stripped))
    except FileNotFoundError:
        print(f'Skipping missing file: {log_path}')
    return errors

async def write_report(report_path, errors):
    async with aiofiles.open(report_path, mode='w', encoding='utf-8') as f:
        await f.write(f'Error Report -- {len(errors)} errors found\n')
        await f.write('=' * 50 + '\n')
        for source, line in errors:
            await f.write(f'[{source}] {line}\n')

async def main():
    # Create sample log files
    sample_logs = {
        'service_a.log': [
            '2026-05-07 08:01:00 INFO request received\n',
            '2026-05-07 08:01:05 ERROR DB timeout on query /users\n',
            '2026-05-07 08:01:10 INFO cache miss, querying DB\n',
        ],
        'service_b.log': [
            '2026-05-07 08:02:00 INFO worker started\n',
            '2026-05-07 08:02:15 ERROR queue consumer crashed: ConnectionReset\n',
            '2026-05-07 08:02:20 INFO restarting consumer\n',
        ],
        'service_c.log': [
            '2026-05-07 08:03:00 INFO all systems nominal\n',
        ],
    }
    for name, lines in sample_logs.items():
        async with aiofiles.open(name, 'w', encoding='utf-8') as f:
            await f.writelines(lines)

    # Concurrently extract errors from all logs
    log_files = list(sample_logs.keys())
    results = await asyncio.gather(*[extract_errors(p) for p in log_files])

    all_errors = [item for sublist in results for item in sublist]
    all_errors.sort(key=lambda x: x[1][:19])  # Sort by timestamp prefix

    await write_report('error_report.txt', all_errors)
    print(f'Report written: {len(all_errors)} errors aggregated')

    async with aiofiles.open('error_report.txt', 'r', encoding='utf-8') as f:
        print(await f.read())

asyncio.run(main())

Output:

Report written: 2 errors aggregated
Error Report -- 2 errors found
==================================================
[service_a.log] 2026-05-07 08:01:05 ERROR DB timeout on query /users
[service_b.log] 2026-05-07 08:02:15 ERROR queue consumer crashed: ConnectionReset

The aggregator reads all log files concurrently using asyncio.gather(), flattens the results, sorts by timestamp, and writes the combined report asynchronously. In a real deployment you would pass actual log file paths using pathlib.Path.glob('*.log'). You can extend this to send the report via email or post it to a Slack webhook using aiohttp — all without blocking the event loop.

Frequently Asked Questions

Is aiofiles always faster than regular open()?

Not necessarily for a single file read in an otherwise synchronous script. The overhead of the thread pool and event loop scheduling adds a tiny fixed cost. aiofiles shines when you have multiple concurrent I/O operations — that is where it delivers real speedups by allowing overlap. In a simple script with one file, use the standard open().

Can I use seek() and tell() with aiofiles?

Yes. Both await f.seek(offset) and await f.tell() are supported. They work the same as their synchronous counterparts, just wrapped as coroutines. This is useful for reading from specific byte positions in binary files or implementing resume-on-failure for large uploads.

Can I use a custom thread pool with aiofiles?

Yes. aiofiles.open() accepts a loop and executor keyword argument. You can pass a custom concurrent.futures.ThreadPoolExecutor to control the number of worker threads. The default uses Python’s default thread pool, which is usually sufficient but can be tuned for I/O-intensive workloads.

How do I use aiofiles with FastAPI for file uploads?

FastAPI uses aiofiles internally for its FileResponse class. For saving uploaded files, open the destination path with aiofiles.open(dest, 'wb') inside your async route handler and await f.write(chunk) in a loop. Never use synchronous open() inside a FastAPI route — it blocks the event loop and degrades performance under concurrent requests.

Does aiofiles work with pathlib.Path objects?

Yes. You can pass a Path object directly to aiofiles.open() just like the built-in open(). For example: async with aiofiles.open(Path('data') / 'output.txt', 'w') as f works without any conversion. This makes it easy to build cross-platform async file utilities with pathlib.

Conclusion

You have covered the full aiofiles toolkit: opening files in text and binary modes, reading entire files or iterating line by line, writing and appending, processing multiple files concurrently with asyncio.gather(), and handling errors safely with try/except inside async context managers. Every pattern here is a direct drop-in for code that currently uses the blocking open() built-in.

The log aggregator shows a practical scenario where non-blocking I/O pays off: reading several files in parallel, combining results, and writing a report — all without stalling your event loop. Extend it to watch a directory with watchdog or asyncio streams, ship the report over aiohttp, or plug it into a FastAPI background task. The foundation is solid.

For a deeper dive into the internals and advanced use cases, see the official aiofiles documentation on GitHub and the Python asyncio documentation.

How To Use Python diskcache for Disk-Based Caching

How To Use Python diskcache for Disk-Based Caching

Intermediate

In-memory caching with functools.lru_cache is fast and easy, but it evaporates the moment your process ends. Every time your script restarts, every time your web server worker recycles, the cache is empty and you rebuild everything from scratch. For expensive operations — API calls with rate limits, slow database queries, heavy computations, or file parsing that takes several seconds — losing the cache on every restart is a real cost. You also cannot share an in-memory cache between separate processes, which is a problem in multi-worker web servers and parallel batch jobs.

diskcache is a Python caching library that persists data to disk using SQLite, so your cache survives process restarts and is shareable between multiple processes. The API is designed to feel like a Python dictionary — you set values, get values, check membership, and set expiration times. It also provides a @cache.memoize() decorator that drops directly on top of existing functions, and a FanoutCache for high-concurrency scenarios. Install it with pip install diskcache.

This article covers creating a cache, basic CRUD operations, setting time-to-live expiration, using the memoize decorator, handling cache size limits, and a realistic example showing a disk-cached API client that avoids redundant requests across multiple script runs. By the end, you will have a persistent caching layer ready for any Python project that benefits from reusing expensive results.

Quick Example: Persistent Key-Value Cache

Here is a minimal demonstration. The key detail is that after the first run, subsequent runs read from disk instead of recomputing.

# quick_diskcache.py
import diskcache
import time

# Cache is stored in the ./my_cache directory on disk
cache = diskcache.Cache("./my_cache")

def slow_operation(key):
    """Simulates an expensive computation (e.g., API call, DB query)."""
    time.sleep(1)
    return f"result_for_{key}"

# First access -- computes and stores
if "item_a" not in cache:
    print("Computing item_a...")
    cache["item_a"] = slow_operation("item_a")

# Second access -- reads from disk (instant)
print("Reading item_a:", cache["item_a"])

cache.close()
print("Cache closed -- value persists on disk")

First run output:

Computing item_a...
Reading item_a: result_for_item_a
Cache closed -- value persists on disk

Second run output (instant, no sleep):

Reading item_a: result_for_item_a
Cache closed -- value persists on disk

The value persists between runs because it is stored in SQLite files under ./my_cache/. The in operator checks for key existence just like a dict, and dictionary-style assignment stores the value. Run the script twice to see the difference — the second run has no sleep because the value was already on disk.

What Is diskcache and When Should You Use It?

diskcache stores cache entries in a SQLite database (for metadata and small values) and flat files on disk (for large binary values). It handles concurrent access from multiple processes safely, supports atomic operations, and respects configurable size limits. The library was designed to replace Redis or Memcached in situations where you want persistence without the overhead of running a separate cache server.

OptionPersists?Multi-process?DependenciesBest For
lru_cacheNoNoNone (stdlib)In-process function memoization
diskcacheYesYesNone (no server)Persistent caching without infrastructure
RedisYesYesRedis server requiredDistributed caching, pub/sub
shelveYesNoNone (stdlib)Simple persistent dict, single process

diskcache is the right choice when you want persistence and multi-process access without running a Redis server. It is particularly useful for development environments, batch scripts, data pipelines, and single-machine web applications with multiple worker processes.

diskcache: 1ms for a million calls. SQLite under the hood.
diskcache: 1ms for a million calls. SQLite under the hood.

Basic Cache Operations

diskcache’s Cache object supports dictionary-style access, method-based access, and several cache-specific operations including TTL expiration and tagging.

# cache_operations.py
import diskcache

cache = diskcache.Cache("./demo_cache")

# Set values (dict-style or set() method)
cache["username"] = "alice"
cache.set("score", 99)

# Set with time-to-live (TTL) in seconds
cache.set("session_token", "abc123xyz", expire=60)  # expires in 60 seconds

# Get values
print(cache["username"])        # alice
print(cache.get("score"))       # 99
print(cache.get("missing_key")) # None (no KeyError)
print(cache.get("missing_key", default="fallback"))  # fallback

# Check existence
print("username" in cache)      # True
print("ghost" in cache)         # False

# Delete a key
del cache["username"]
print("username" in cache)      # False

# Atomic increment (thread/process safe)
cache.set("page_views", 0)
cache.incr("page_views")
cache.incr("page_views")
cache.incr("page_views")
print("Page views:", cache.get("page_views"))  # 3

# Get cache stats
print("Cache size (items):", len(cache))
print("Cache size (bytes):", cache.volume())

cache.close()

Output:

alice
99
None
fallback
True
False
False
Page views: 3
Cache size (items): 3
Cache size (bytes): 8192

The incr() method is especially useful for counters shared between processes — it uses SQLite’s atomic increment so concurrent processes do not clobber each other’s counts. The volume() method returns total disk usage in bytes, which lets you monitor cache growth.

The memoize() Decorator

The most convenient diskcache feature for most applications is the @cache.memoize() decorator. It caches the return value of a function keyed by its arguments, with an optional TTL. The result persists across process restarts.

# memoize_demo.py
import diskcache
import time
import requests

cache = diskcache.Cache("./api_cache")

@cache.memoize(expire=3600)  # cache results for 1 hour
def fetch_user(user_id: int) -> dict:
    """Fetches user data from a REST API -- cached for 1 hour."""
    print(f"  [CACHE MISS] Fetching user {user_id} from API...")
    response = requests.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    return response.json()

@cache.memoize(expire=300)   # cache for 5 minutes
def fetch_posts_for_user(user_id: int) -> list:
    """Fetches all posts by a user -- cached for 5 minutes."""
    print(f"  [CACHE MISS] Fetching posts for user {user_id}...")
    response = requests.get(f"https://jsonplaceholder.typicode.com/posts?userId={user_id}")
    return response.json()

print("=== First run (populates cache) ===")
start = time.time()
user = fetch_user(1)
posts = fetch_posts_for_user(1)
elapsed = time.time() - start
print(f"User: {user['name']}, Posts: {len(posts)}, Time: {elapsed:.2f}s")

print("\n=== Second run (reads from disk cache) ===")
start = time.time()
user = fetch_user(1)
posts = fetch_posts_for_user(1)
elapsed = time.time() - start
print(f"User: {user['name']}, Posts: {len(posts)}, Time: {elapsed:.2f}s")

cache.close()

Output (first run):

=== First run (populates cache) ===
  [CACHE MISS] Fetching user 1 from API...
  [CACHE MISS] Fetching posts for user 1...
User: Leanne Graham, Posts: 10, Time: 0.84s

=== Second run (reads from disk cache) ===
User: Leanne Graham, Posts: 10, Time: 0.002s

The second run took 2 milliseconds instead of 840 milliseconds because both results were already on disk. On the next script run (a completely new process), the cache will still be there and the first call will also be fast — unlike lru_cache which would start fresh.

When @lru_cache runs out of memory, diskcache catches the spillover.
When @lru_cache runs out of memory, diskcache catches the spillover.

Cache Size Limits and Eviction

diskcache lets you set a maximum cache size. When the cache exceeds that size, it evicts the least recently used entries to stay within the limit.

# cache_size.py
import diskcache

# Limit cache to 10 MB
cache = diskcache.Cache("./bounded_cache", size_limit=10 * 1024 * 1024)

# Store some values
for i in range(100):
    cache.set(f"key_{i}", "x" * 10000)  # ~10KB per entry

print("Items in cache:", len(cache))
print("Volume (bytes):", cache.volume())
print("Size limit (bytes):", cache.size_limit)

# Manually evict entries if needed
evicted = cache.evict(tag=None)  # evict all entries without a tag

# Clear the entire cache
cache.clear()
print("After clear, items:", len(cache))

cache.close()

Output:

Items in cache: 100
Volume (bytes): 1245184
Size limit (bytes): 10485760
After clear, items: 0

Eviction happens automatically — you do not need to call any cleanup function. diskcache tracks access times and removes old entries as new ones are written. Setting a reasonable size limit (e.g., 500MB for a data pipeline, 50MB for a web server) prevents the cache directory from growing indefinitely.

Real-Life Example: Cached Data Enrichment Pipeline

Here is a realistic data enrichment pipeline that fetches additional details for a list of records. Without caching, every run makes the same API calls. With diskcache, only the first run pays the network cost.

# enrichment_pipeline.py
import diskcache
import requests
import time
from dataclasses import dataclass
from typing import Optional

CACHE = diskcache.Cache("./enrichment_cache", size_limit=100 * 1024 * 1024)

@dataclass
class EnrichedUser:
    id: int
    name: str
    email: str
    company: str
    website: Optional[str]

@CACHE.memoize(expire=86400)  # cache for 24 hours
def fetch_user_details(user_id: int) -> dict:
    """Fetches user details from REST API. Expensive -- cache aggressively."""
    resp = requests.get(f"https://jsonplaceholder.typicode.com/users/{user_id}", timeout=10)
    resp.raise_for_status()
    return resp.json()

def enrich_users(user_ids: list) -> list:
    """Enrich a list of user IDs with full profile data."""
    results = []
    cache_hits = 0

    for uid in user_ids:
        # Check if already cached before calling memoized function
        key = ("fetch_user_details", uid)
        was_cached = key in CACHE

        raw = fetch_user_details(uid)
        if was_cached:
            cache_hits += 1

        user = EnrichedUser(
            id=raw["id"],
            name=raw["name"],
            email=raw["email"],
            company=raw["company"]["name"],
            website=raw.get("website"),
        )
        results.append(user)

    print(f"Processed {len(user_ids)} users ({cache_hits} from cache, {len(user_ids)-cache_hits} from API)")
    return results

# Run the pipeline with user IDs 1-5
print("=== Pipeline Run ===")
start = time.time()
users = enrich_users([1, 2, 3, 4, 5])
elapsed = time.time() - start

for u in users:
    print(f"  {u.id}. {u.name} ({u.company}) -- {u.email}")
print(f"Total time: {elapsed:.2f}s")

CACHE.close()

Output (first run):

=== Pipeline Run ===
Processed 5 users (0 from cache, 5 from API)
  1. Leanne Graham (Romaguera-Crona) -- Sincere@april.biz
  2. Ervin Howell (Deckow-Crist) -- Shanna@melissa.tv
  ...
Total time: 1.23s

Output (subsequent runs — same day):

=== Pipeline Run ===
Processed 5 users (5 from cache, 0 from API)
  1. Leanne Graham (Romaguera-Crona) -- Sincere@april.biz
  ...
Total time: 0.01s

Subsequent runs are 100x faster because all 5 API calls hit the disk cache. The 24-hour TTL means the cache refreshes daily, so data does not go stale indefinitely. You can extend this pattern to cache database query results, file parsing output, or any other expensive step in a pipeline.

Bigger than RAM. Faster than Redis. Sometimes.
Bigger than RAM. Faster than Redis. Sometimes.

Frequently Asked Questions

Is diskcache safe for concurrent access?

Yes. diskcache uses SQLite’s WAL (Write-Ahead Logging) mode, which supports concurrent reads and serialized writes. Multiple threads and multiple processes can read from and write to the same cache directory simultaneously without corrupting data. For extremely high concurrency (hundreds of writes per second), use FanoutCache instead, which shards the data across multiple SQLite files to reduce lock contention.

How does diskcache differ from Python’s shelve module?

shelve is a stdlib persistent dict that stores data in a DBM file. It does not support concurrent process access, does not have TTL expiration, and does not enforce size limits. diskcache supports all of these plus atomic increment, memoize decorators, cache statistics, and FanoutCache for high concurrency. Use shelve only for simple single-process persistence; use diskcache whenever you need expiration, size limits, or multi-process access.

How does TTL expiration work?

Expired entries are not immediately deleted — they are filtered out on read. When you call cache.get(key), diskcache checks the entry’s expiration time and returns None (or the default) if it has expired, even if the entry still exists on disk. Expired entries are cleaned up during the next cache culling cycle (triggered automatically when the cache exceeds its size limit, or manually via cache.expire()). This means disk usage can temporarily exceed your expectations if you have many short-lived entries — call cache.expire() periodically to clean up proactively.

Where is the cache stored and can I delete it?

The cache is stored in the directory path you pass to Cache(). It consists of SQLite files (cache.db) and flat files for large values. You can delete the entire directory to clear the cache completely. There is no background process or daemon — the cache directory is just files that Python reads and writes. This makes it trivial to deploy (no Redis setup needed) and trivial to reset (just delete the folder).

Can I store custom Python objects?

Yes. diskcache uses pickle for serialization by default, so any picklable Python object can be stored — dataclasses, custom classes, numpy arrays, PIL images, pandas DataFrames. If you need cross-language compatibility or need to store JSON specifically, pass disk=diskcache.JSONDisk when creating the cache. The JSONDisk variant stores values as JSON text instead of pickle, which is human-readable in the SQLite file but limited to JSON-serializable types.

Conclusion

diskcache closes the gap between in-memory caching (fast but ephemeral) and a full Redis deployment (persistent but requires infrastructure). With a dictionary-style API, automatic TTL expiration, process-safe concurrent access, and a memoize decorator that drops onto existing functions, it handles the majority of persistent caching needs in a single pip install diskcache. The size limit and eviction system keeps disk usage bounded, and the SQLite storage makes the cache inspectable and trivially deletable.

Use the memoize decorator for API calls and expensive computations, set TTLs based on how stale your data can reasonably be, and configure a size limit to prevent unbounded growth. For high-write-concurrency scenarios, swap Cache for FanoutCache with the same API. The full reference — including FanoutCache, DjangoCache integration, and Deque/Index data structures built on the same storage — is available at the official diskcache documentation.

How To Use Python python-dotenv for Environment Variables

How To Use Python python-dotenv for Environment Variables

Beginner

Every application eventually needs configuration that changes between environments — API keys for development vs production, database URLs that are different on your laptop and the server, secret tokens that absolutely cannot be checked into version control. The naive solution is to hardcode these values directly in your Python files. The problem with that approach is not just security — it is also inflexibility. Every time you move the code to a different machine or environment, you have to edit the source file, which means tracking down which file, remembering the format, and hoping you do not accidentally commit the change.

python-dotenv solves this cleanly. You create a .env file in your project root containing key-value pairs, add .env to your .gitignore, and call load_dotenv() once at startup. From that point, all your config values are available through os.environ or os.getenv() exactly like regular environment variables, but loaded from your file instead of the shell. The library is a single pip install python-dotenv away and has no dependencies.

This article covers creating and formatting .env files, loading them with load_dotenv(), reading values with os.getenv() and its defaults, handling multiple environments (development, staging, production), override behavior, and a real-world configuration module pattern you can drop into any Python project. By the end, you will have a repeatable, secure configuration workflow that works for scripts, Flask apps, FastAPI services, and data pipelines alike.

Quick Example: Loading a .env File

First, install python-dotenv with pip install python-dotenv. Then create a .env file and a Python script in the same directory.

# .env (create this file in your project root)
DATABASE_URL=postgresql://localhost/myapp_dev
API_KEY=dev-key-abc123
DEBUG=true
MAX_CONNECTIONS=10
# quick_dotenv.py
import os
from dotenv import load_dotenv

# Load variables from .env into os.environ
load_dotenv()

# Read them like any environment variable
db_url = os.getenv("DATABASE_URL")
api_key = os.getenv("API_KEY")
debug = os.getenv("DEBUG") == "true"
max_conn = int(os.getenv("MAX_CONNECTIONS", "5"))

print(f"DB URL:          {db_url}")
print(f"API Key:         {api_key}")
print(f"Debug mode:      {debug}")
print(f"Max connections: {max_conn}")

Output:

DB URL:          postgresql://localhost/myapp_dev
API Key:         dev-key-abc123
Debug mode:      True
Max connections: 10

Three things to notice here. First, load_dotenv() is called before any os.getenv() call — that one line loads the entire file. Second, values are always strings, so you need to convert integers with int() and booleans by comparing to the string "true". Third, os.getenv("MAX_CONNECTIONS", "5") shows the default fallback pattern — if the key is missing from the environment, you get "5" instead of None.

What Is python-dotenv and How Does It Work?

python-dotenv reads key-value pairs from a .env file and adds them to os.environ, which is the standard Python dictionary mapping for environment variables. Once loaded, your code accesses config values through os.getenv() the same way it would access variables set in the shell or in your deployment platform. This means your application code does not need to know anything about dotenv specifically — it just reads from os.environ.

ApproachHow it worksRisk
Hardcoded valuesValues in source codeCommitted to version control, no per-environment flexibility
Shell environment variablesSet in shell before runningEasy to forget, not portable, not stored with project
.env + python-dotenvRead from .env file at startupNone if .env is gitignored — portable, version-controlled template
Config management serviceAWS Secrets Manager, Vault, etc.Infrastructure dependency, more complex setup

The critical workflow is: your actual .env file with real secrets is in .gitignore and never committed. You commit a .env.example file with the same keys but placeholder values. New team members copy .env.example to .env and fill in their own values. This is the standard approach used across Flask, FastAPI, Django, and most modern Python project templates.

Secrets in .env files. Secrets out of git. Always.
Secrets in .env files. Secrets out of git. Always.

.env File Format

The .env file format is simple: one KEY=value pair per line. Here are all the formatting rules python-dotenv handles correctly.

# .env -- complete format reference

# Simple string values
APP_NAME=MyPythonApp
ENVIRONMENT=development

# Values with spaces -- wrap in quotes
WELCOME_MESSAGE="Hello, welcome to the app"
SERVER_DESCRIPTION='Python API server v2'

# Multiline values -- use quotes and \n
LOG_FORMAT="%(asctime)s %(name)s %(levelname)s %(message)s"

# Numbers -- still stored as strings, convert in Python
PORT=8000
WORKER_COUNT=4
TIMEOUT=30.0

# Booleans -- stored as strings "true"/"false"
DEBUG=true
ENABLE_CACHE=false

# Empty value -- results in empty string
OPTIONAL_FEATURE=

# Comments -- lines starting with # are ignored
# DATABASE_URL=postgresql://localhost/old_db  (commented out)

# Referencing other variables (variable expansion)
BASE_URL=http://localhost
API_ENDPOINT=${BASE_URL}/api/v1

# Export syntax also works (for shell compatibility)
export SECRET_KEY=abc123def456

The most important rule: values do not need quotes unless they contain spaces or special characters. Quotes are stripped from the value when loaded — so NAME="alice" becomes the string alice, not "alice".

load_dotenv() Options

The load_dotenv() function has several useful parameters for controlling how and which file gets loaded.

# dotenv_options.py
import os
from dotenv import load_dotenv

# Default: loads .env in current directory or any parent directory
load_dotenv()

# Load from a specific path
load_dotenv(dotenv_path="/path/to/config/.env.production")

# Override existing environment variables (default is False -- existing vars win)
load_dotenv(override=True)

# Load a .env file and return a dict instead of modifying os.environ
from dotenv import dotenv_values
config = dotenv_values(".env")
print(config)  # OrderedDict with all key-value pairs
print(type(config["PORT"]))  # str -- always strings

Output:

OrderedDict([('APP_NAME', 'MyPythonApp'), ('PORT', '8000'), ...])
<class 'str'>

The override=False default is important: if DATABASE_URL is already set in the shell environment (e.g., by your deployment platform), load_dotenv() will NOT overwrite it. This means your production environment can set real values, and the .env file provides development defaults. Set override=True only when you specifically want the file to take precedence over the shell.

load_dotenv() at app start. Twelve-factor approved.
load_dotenv() at app start. Twelve-factor approved.

Managing Multiple Environments

A common pattern is to have separate .env files for development, staging, and production, and load the correct one based on an environment variable or a naming convention.

# multi_env.py
import os
from dotenv import load_dotenv
from pathlib import Path

def load_environment_config():
    """Load the correct .env file based on APP_ENV."""
    env = os.getenv("APP_ENV", "development")
    env_file = Path(f".env.{env}")

    if env_file.exists():
        load_dotenv(dotenv_path=env_file)
        print(f"Loaded config from {env_file}")
    else:
        # Fall back to .env if specific file not found
        load_dotenv()
        print(f"Loaded config from .env (no .env.{env} found)")

    return env

current_env = load_environment_config()
print(f"Running in: {current_env}")
print(f"Database: {os.getenv('DATABASE_URL', 'not set')}")
print(f"Debug: {os.getenv('DEBUG', 'false')}")

Your project structure would then look like this:

myproject/
    .env                   # gitignored -- your local defaults
    .env.development       # gitignored -- dev-specific values
    .env.staging           # gitignored -- staging values
    .env.production        # gitignored -- DO NOT COMMIT
    .env.example           # committed -- template with placeholder values
    app.py

Run with different environments using: APP_ENV=staging python multi_env.py. This pattern is used directly in frameworks like Flask (where FLASK_ENV controls environment) and is compatible with Docker Compose, which supports --env-file flags for injecting the right file at container startup.

Real-Life Example: Application Config Module

Here is a production-ready configuration module pattern. Instead of scattering os.getenv() calls throughout your code, centralize all config loading in one module with type conversion and validation.

# config.py -- drop this in any Python project
import os
from dotenv import load_dotenv
from dataclasses import dataclass
from typing import Optional

# Load .env at module import time -- happens once per process
load_dotenv()

@dataclass
class DatabaseConfig:
    url: str
    pool_size: int
    timeout: float

@dataclass
class AppConfig:
    name: str
    debug: bool
    port: int
    secret_key: str
    database: DatabaseConfig
    allowed_origins: list

def _require(key: str) -> str:
    """Get a required env var -- raise if missing."""
    value = os.getenv(key)
    if value is None:
        raise EnvironmentError(f"Required environment variable '{key}' is not set. Check your .env file.")
    return value

def load_config() -> AppConfig:
    return AppConfig(
        name=os.getenv("APP_NAME", "MyApp"),
        debug=os.getenv("DEBUG", "false").lower() == "true",
        port=int(os.getenv("PORT", "8000")),
        secret_key=_require("SECRET_KEY"),
        database=DatabaseConfig(
            url=_require("DATABASE_URL"),
            pool_size=int(os.getenv("DB_POOL_SIZE", "5")),
            timeout=float(os.getenv("DB_TIMEOUT", "30.0")),
        ),
        allowed_origins=os.getenv("ALLOWED_ORIGINS", "").split(","),
    )

# Usage: import config and access typed attributes
if __name__ == "__main__":
    # Create a minimal .env for demonstration
    import pathlib
    pathlib.Path(".env").write_text(
        "APP_NAME=DemoApp\nDEBUG=true\nPORT=5000\n"
        "SECRET_KEY=demo-secret-xyz\nDATABASE_URL=sqlite:///demo.db\n"
        "ALLOWED_ORIGINS=http://localhost:3000,http://localhost:5173\n"
    )

    cfg = load_config()
    print(f"App: {cfg.name}")
    print(f"Debug: {cfg.debug}")
    print(f"Port: {cfg.port}")
    print(f"DB URL: {cfg.database.url}")
    print(f"Pool size: {cfg.database.pool_size}")
    print(f"Allowed origins: {cfg.allowed_origins}")

Output:

App: DemoApp
Debug: True
Port: 5000
DB URL: sqlite:///demo.db
Pool size: 5
Allowed origins: ['http://localhost:3000', 'http://localhost:5173']

The _require() helper raises a descriptive error immediately if a required variable is missing, so you get a clear error at startup instead of a cryptic None-related crash later. The typed AppConfig dataclass means your IDE knows the shape of all config values, which catches many mistakes at edit time rather than runtime.

Same code, different env. The whole point of dotenv.
Same code, different env. The whole point of dotenv.

Frequently Asked Questions

How do I make sure .env is never committed?

Add .env to your .gitignore file. The standard Python .gitignore template from GitHub already includes it. Also run git status to confirm the file shows as untracked before your first commit. For extra safety, you can add a pre-commit hook that scans for common secret patterns. The .env.example file with placeholder values is safe to commit and should be committed so other developers know what variables are required.

What happens if the variable is already set in the shell?

By default, load_dotenv() does not override existing environment variables. If DATABASE_URL is already set in your shell or by a Docker environment, the .env file value is ignored for that variable. This is the correct behavior for production deployments where the platform sets real credentials. Use load_dotenv(override=True) only when you explicitly want the file to take precedence over the shell.

Why are all values strings? How do I handle types?

Environment variables are always strings at the OS level — python-dotenv does not change this. The recommended pattern is to convert types at config load time, not at usage time. Use int(os.getenv("PORT", "8000")) for integers, float(os.getenv("TIMEOUT", "30.0")) for floats, and a string comparison like os.getenv("DEBUG", "false").lower() == "true" for booleans. Centralizing these conversions in a config module (as shown in the real-world example) prevents repeated conversion logic throughout your codebase.

How does python-dotenv work with Docker?

Docker Compose supports env_file: .env in the service definition, which loads your .env file as container environment variables. In this setup, python-dotenv is redundant — the variables are already in the environment before Python starts. Many projects use both: docker-compose sets variables from .env, and python-dotenv handles the local development case when running outside Docker. The load_dotenv() call is harmless when all variables are already set (because it does not override by default).

Does python-dotenv handle special characters in values?

Yes, but you need to quote values that contain special characters, spaces, or the # comment character. Use double quotes: PASSWORD="my#secret!with spaces". Inside double quotes, all characters are literal. Without quotes, a # in a value would be treated as a comment, truncating the value. As a rule of thumb, quote any value that is not a simple alphanumeric string.

How should I handle config in tests?

Create a .env.test file with test-specific values (test database, mock API keys) and load it explicitly in your test setup: load_dotenv(".env.test", override=True). Alternatively, use pytest’s monkeypatch fixture to set individual environment variables for specific tests: monkeypatch.setenv("DATABASE_URL", "sqlite:///:memory:"). This keeps test config isolated and prevents test runs from accidentally connecting to development or production resources.

Conclusion

python-dotenv is one of the first libraries you should add to any Python project that connects to external services. It solves the configuration problem cleanly: secrets stay out of source code, config changes between environments without editing files, and os.getenv() remains your single interface to all configuration regardless of where values come from. The .env file approach is so widely adopted that it is supported natively by Docker Compose, many CI/CD platforms, and virtually every Python web framework.

Start with the config module pattern from this article — centralize your load_dotenv() call, convert all types immediately, and use _require() for mandatory variables so failures are loud and clear at startup. Commit your .env.example, gitignore your .env, and your configuration workflow will be solid for any project. The full API reference is available at the python-dotenv PyPI page.

How To Use Python icecream for Better Debugging

How To Use Python icecream for Better Debugging

Beginner

You have been there: you add a print("here") to track a variable, run the code, see a number with no context, add another print to show the label, run again, realize the label is wrong, fix it, run a third time. By the time you find the bug, your file is littered with debug prints you have to remember to remove. Python’s built-in print() works, but it makes you write the variable name twice every single time — once in the label string and once as the argument.

icecream fixes this. The ic() function from the icecream library automatically inspects the expression you pass to it and includes the source code in the output. ic(user_id) prints ic| user_id: 42 without you writing the label. It also shows the filename and line number when called without arguments, making it a drop-in replacement for print-based debugging that gives you more information with less code. Install it with pip install icecream.

This article covers the full icecream toolkit: basic usage, inspecting expressions and function calls, customizing output format, using icecream as a call tracer, disabling it for production, and a real debugging session walkthrough. By the end, you will have a debugging workflow that is faster and produces more useful output than raw print statements.

Quick Example: ic() vs print()

Here is an immediate side-by-side comparison showing why developers switch to icecream. Run this after installing with pip install icecream.

# quick_ic.py
from icecream import ic

user = {"name": "alice", "score": 98, "active": True}
scores = [85, 92, 78, 96]

# With print() -- you write the label manually every time
print("user:", user)
print("scores:", scores)
print("max score:", max(scores))

# With ic() -- label is automatic, source expression included
ic(user)
ic(scores)
ic(max(scores))

Output:

user: {'name': 'alice', 'score': 98, 'active': True}
scores: [85, 92, 78, 96]
max score: 96
ic| user: {'name': 'alice', 'score': 98, 'active': True}
ic| scores: [85, 92, 78, 96]
ic| max(scores): 96

Notice that ic(max(scores)) shows the full expression max(scores), not just the result. You can read the output and immediately know which variable or expression produced each value — without writing a single string label. icecream also returns the value it receives, so you can wrap expressions without changing how your code works.

The sections below cover every key feature, from customizing the output format to disabling icecream cleanly in production.

What Is icecream and How Does It Work?

icecream is a lightweight debugging library that wraps Python’s introspection capabilities to automatically discover the source code of the argument you pass to ic(). When you call ic(some_expr), the library uses the executing package to read the actual source of the calling line, extracts the argument text, evaluates it, and prints both the expression and its value together.

This works because Python keeps source code accessible at runtime (via the inspect module and frame inspection). icecream uses this to read the literal text of your code — not just the value — which is why it can show max(scores) instead of just 96.

ApproachWhat You WriteWhat You See
print()print(“score:”, score)score: 98
ic()ic(score)ic| score: 98
ic() (expression)ic(score * 2)ic| score * 2: 196
ic() (no args)ic()ic| myfile.py:14 in process()
Disabled ic()ic.disable()(silent — no output)

The biggest quality-of-life improvement over print is that icecream is designed to be temporary — you add ic() calls, debug, then remove them or disable the library globally when moving to production. Because the output format is consistent and includes the ic| prefix, you can also grep your logs for debug output quickly.

ic() prints the expression AND the value. Why didn't print() do this from day on
ic() prints the expression AND the value. Why didn’t print() do this from day one?

Basic Usage: Variables, Expressions, and Function Returns

The most common use is wrapping a variable or expression you want to inspect. ic() always returns the value it received, so you can insert it into existing code without breaking anything.

# ic_basic.py
from icecream import ic

def calculate_discount(price, percent):
    discount = price * (percent / 100)
    final = ic(price - discount)   # ic() returns the value
    return final

result = calculate_discount(200, 15)
print("Final price:", result)

Output:

ic| price - discount: 170.0
Final price: 170.0

Because ic() returned the value of price - discount, the assignment to final still works correctly. You can wrap any expression in ic() to observe it without refactoring the surrounding code. Remove the wrapper when you are done debugging and the code is identical to how it started.

Tracing Execution with ic()

Calling ic() with no arguments prints the filename, line number, and enclosing function name. This is useful for confirming that a branch was reached — the equivalent of print("GOT HERE") but with actual location information.

# ic_trace.py
from icecream import ic

def process_item(item):
    ic()  # confirm this function was called
    if item.get("active"):
        ic()  # confirm the active branch was taken
        return item["value"] * 2
    ic()   # confirm the inactive branch was taken
    return 0

items = [{"active": True, "value": 10}, {"active": False, "value": 5}]
for item in items:
    result = process_item(item)
    ic(result)

Output:

ic| ic_trace.py:5 in process_item()
ic| ic_trace.py:7 in process_item()
ic| result: 20
ic| ic_trace.py:5 in process_item()
ic| ic_trace.py:10 in process_item()
ic| result: 0

You can now read the exact execution path — line 7 (active branch) was taken for the first item, line 10 (inactive branch) for the second. No guessing, no manually formatting location strings.

Customizing icecream Output

You can configure icecream’s output format, prefix, and output destination using the ic.configureOutput() method. This is useful for adding timestamps, routing output to a log file, or integrating icecream into an existing logging system.

# ic_config.py
from icecream import ic
from datetime import datetime

# Add a timestamp to every ic() output
def timestamped_prefix():
    return f"[{datetime.now().strftime('%H:%M:%S')}] ic| "

ic.configureOutput(prefix=timestamped_prefix)

x = 42
data = {"a": 1, "b": 2}
ic(x)
ic(data)

Output:

[09:15:42] ic| x: 42
[09:15:42] ic| data: {'a': 1, 'b': 2}

You can also route output to a file or any callable that accepts a string by passing an outputFunction argument. For example, ic.configureOutput(outputFunction=logger.debug) would route all icecream output through your Python logger instead of printing to the terminal.

print() but make it fashion.
print() but make it fashion.

Disabling icecream for Production

The cleanest way to use icecream is to disable it with a single line at the top of your entry point when not in debug mode. This makes all ic() calls silent without removing them from your codebase.

# app_entry.py
import os
from icecream import ic

# Disable ic() in production -- all ic() calls become no-ops
if os.getenv("DEBUG") != "1":
    ic.disable()

def compute(n):
    result = n * n
    ic(result)   # silent in production, visible in debug mode
    return result

# Run with DEBUG=1 python app_entry.py to enable ic() output
print(compute(7))
print(compute(12))

Output (without DEBUG=1):

49
144

Output (with DEBUG=1):

ic| result: 49
49
ic| result: 144
144

This pattern means you never have to sweep your codebase to remove debug prints before shipping. Set DEBUG=1 in your local environment and never set it in production. The ic() calls remain in the source as documentation of what you were debugging — future developers (including you) will know exactly where the data was inspected.

Real-Life Example: Debugging a Data Processing Pipeline

Here is a realistic debugging session using icecream to trace through a multi-step data transformation. You will see how ic() makes it easy to find exactly where unexpected data enters the pipeline.

# pipeline_debug.py
from icecream import ic

def parse_record(raw):
    """Parse a raw string record into a dict."""
    ic()  # trace entry
    parts = raw.strip().split(",")
    ic(len(parts))
    if len(parts) != 3:
        ic("malformed record", raw)
        return None
    name, score_str, active_str = parts
    score = int(score_str.strip())
    active = active_str.strip().lower() == "true"
    return {"name": name.strip(), "score": score, "active": active}

def filter_active(records):
    """Keep only active records with score above threshold."""
    threshold = 80
    ic(threshold)
    result = [r for r in records if r and ic(r["active"]) and r["score"] > threshold]
    return result

def summarize(records):
    total = sum(r["score"] for r in records)
    avg = total / len(records) if records else 0
    ic(total, avg)
    return {"count": len(records), "total": total, "average": round(avg, 1)}

raw_data = [
    "alice, 95, true",
    "bob, 72, true",
    "carol, 88, false",
    "dave, 91, true",
    "eve, 65",          # malformed
]

parsed = [parse_record(r) for r in raw_data]
active = filter_active(parsed)
summary = summarize(active)

print("\n=== Summary ===")
print(summary)

Output:

ic| pipeline_debug.py:5 in parse_record()
ic| len(parts): 3
ic| pipeline_debug.py:5 in parse_record()
ic| len(parts): 3
ic| pipeline_debug.py:5 in parse_record()
ic| len(parts): 3
ic| pipeline_debug.py:5 in parse_record()
ic| len(parts): 3
ic| pipeline_debug.py:5 in parse_record()
ic| len(parts): 2
ic| ('malformed record', "eve, 65")
ic| threshold: 80
ic| r['active']: True
ic| r['active']: True
ic| r['active']: False
ic| r['active']: True
ic| total: 186, avg: 93.0

=== Summary ===
{'count': 2, 'average': 93.0, 'total': 186}

You can trace every step: the malformed record was caught at parse time, the filter correctly excluded carol (inactive) and bob (score below 80), and only alice and dave made it through. The entire pipeline was debugged by reading the ic| output — no added complexity, no log file configuration, no framework required.

ic(x) > print(x). Trust me, you’ll never go back.” style=”max-width:100%;height:auto;border-radius:8px;” />
<figcaption style=ic(x) > print(x). Trust me, you’ll never go back.

Frequently Asked Questions

Does ic() have any performance impact in production?

When not disabled, each ic() call does source inspection which is slightly slower than a bare print()`. For production code, always call ic.disable() or use the environment variable pattern shown above. A disabled ic() call has near-zero overhead -- it just checks a flag and returns the argument immediately. There is no performance reason to remove ic() calls from production code as long as you disable the library.

Can ic() take multiple arguments?

Yes. ic(a, b, c) prints all three on the same line, separated by commas: ic| a: 1, b: 2, c: 3. This is convenient for inspecting several related variables at once. When you pass multiple arguments, ic() returns them as a tuple, so a, b = ic(a, b) works correctly.

How do I find and remove all ic() calls before shipping?

The recommended approach is not to remove them -- use ic.disable() or the environment variable pattern instead. If you do want to remove them, grep -rn "ic(" *.py will find every call quickly. Alternatively, some teams use iceream's install hook to automatically disable in non-development environments. The key point is that leaving disabled ic() calls in code is not harmful and preserves debugging knowledge for future maintainers.

Does icecream work in Jupyter notebooks?

Yes, ic() works in Jupyter notebooks exactly as in scripts. The output appears in the cell output area with the same format. One useful notebook pattern is to put ic.configureOutput(prefix="") at the top of the notebook to remove the ic| prefix if you prefer cleaner output in a notebook context.

Can I redirect ic() output to Python logging?

Ye{, use ic.configureOutput(outputFunction=logging.debug). This routes all icecream output through your existing logging infrastructure, so it respects your log level settings and handlers. This is useful in applications where you want icecream's convenience during development but want its output integrated with your production log format when needed.

How does icecream compare to Python's pdb debugger?

pdb is a full interactive debugger that lets you step through code, set breakpoints, and inspect the full call stack. icecream is a print-based debugging enhancement -- it does not pause execution or let you step. icecream is better for quick, non-interactive debugging during development; pdb is better when you need to explore program state interactively. Many developers use both: icecream for quick checks, pdb for complex bugs that need stepping.

Conclusion

icecream is a small change with an outsized impact on your debugging workflow. You replace print("label:", value) with ic(value) and immediately get automatic labels, expression display, and file/line information in every debug print. The ic.disable() pattern lets you leave debug calls in your code without production impact, and configureOutput() gives you full control over format and destination.

Try replacing your next five debug print statements with ic() calls and notice how much less you need to type and how much more context you get back. For teams, adding if not DEBUG: ic.disable() to your application entry point is a low-effort way to make debug output a first-class part of the development workflow. See the official icecream documentation on GitHub for the full API reference.

How To Use Python orjson for Fast JSON Processing

Intermediate

If you have ever profiled a Python application and found JSON serialization eating up CPU time, you already know the frustration. The built-in json module is reliable and convenient, but it was not designed for speed. When you are processing thousands of API responses per second, converting dataclass objects for a REST API, or serializing datetime values without jumping through hoops, those milliseconds add up fast. That is exactly the problem orjson was built to solve.

orjson is a third-party JSON library written in Rust that integrates seamlessly into Python. It handles encoding and decoding with the same simple interface you already know — dumps() and loads() — but operates 5-10x faster than the standard library and adds native support for types that json cannot handle without custom encoders, like datetime, numpy arrays, UUID objects, and dataclasses. You install it with pip install orjson and it requires no other dependencies.

In this article, you will learn how to serialize and deserialize JSON with orjson, how to handle Python-native types like datetimes and dataclasses automatically, how to use orjson options for output formatting, how to benchmark the speed difference yourself, and how to build a practical data pipeline that processes JSON records at high throughput. By the end, you will have a drop-in replacement for the standard library that improves performance without changing your application’s logic.

Quick Example: orjson in Action

Here is a self-contained script that shows the most common orjson operations in one place. You can run this immediately after installing orjson with pip install orjson.

# quick_orjson.py
import orjson
from datetime import datetime, timezone
from dataclasses import dataclass

@dataclass
class Event:
    name: str
    timestamp: datetime
    count: int

event = Event(name="page_view", timestamp=datetime(2026, 5, 6, 9, 0, 0, tzinfo=timezone.utc), count=42)

# Serialize to bytes (orjson always returns bytes, not str)
raw = orjson.dumps(event)
print(raw)

# Deserialize back to a dict
data = orjson.loads(raw)
print(data)

# Pretty-print
pretty = orjson.dumps(event, option=orjson.OPT_INDENT_2)
print(pretty.decode())

Output:

b'{"name":"page_view","timestamp":"2026-05-06T09:00:00+00:00","count":42}'
{'name': 'page_view', 'timestamp': '2026-05-06T09:00:00+00:00', 'count': 42}
{
  "name": "page_view",
  "timestamp": "2026-05-06T09:00:00+00:00",
  "count": 42
}

Two things stand out here. First, orjson serialized the dataclass directly — no custom encoder needed. Second, the datetime object was automatically converted to an ISO 8601 string including the UTC offset. The standard library would raise a TypeError for both of these without extra configuration.

The sections below cover every major feature: types supported natively, formatting options, how to benchmark performance, common pitfalls, and a real-world data pipeline example.

What Is orjson and Why Is It Faster?

orjson is a JSON library written in Rust and compiled as a Python extension. It uses the serde serialization framework — the same foundation used by many high-performance Rust services — and exposes a minimal Python API. Because the heavy lifting happens in compiled native code, it avoids Python’s interpreter overhead for every character it processes.

The standard json module is a pure Python implementation (with a small C accelerator for some operations). It loops over Python objects at the Python level, which means each key lookup, type check, and string conversion goes through the interpreter. orjson does all of that in Rust, then returns the finished bytes directly to Python.

Featurejson (stdlib)orjson
Return type of dumps()strbytes
datetime supportRaises TypeErrorNative (ISO 8601)
dataclass supportRaises TypeErrorNative
UUID supportRaises TypeErrorNative
numpy array supportNoYes (with option)
Pretty printindent= paramOPT_INDENT_2 option
Speed (encode)Baseline5-10x faster
InstallationBuilt-inpip install orjson

The key trade-off is that orjson’s dumps() always returns bytes, not str. If you need a string, call .decode() on the result. This is intentional — most real-world use cases (writing to a file, sending over a socket, writing to a database) work directly with bytes, and skipping the str conversion saves extra memory allocation.

Installation and Basic API

Install orjson with pip. It requires Python 3.8+ and has no Python dependencies — only the compiled Rust extension.

# terminal
pip install orjson

The API mirrors the standard library closely. orjson.dumps() serializes a Python object to JSON bytes, and orjson.loads() deserializes JSON bytes or strings back to Python objects.

# basic_api.py
import orjson

# Serializing
data = {"user": "alice", "score": 99, "active": True}
encoded = orjson.dumps(data)
print(type(encoded))   # 
print(encoded)         # b'{"user":"alice","score":99,"active":true}'

# Deserializing -- accepts bytes, bytearray, memoryview, or str
decoded = orjson.loads(encoded)
print(type(decoded))   # 
print(decoded)

Output:

<class 'bytes'>
b'{"user":"alice","score":99,"active":true}'
<class 'dict'>
{'user': 'alice', 'score': 99, 'active': True}

Because loads() accepts both bytes and str, you can pass the output of dumps() directly to loads() without decoding first. This makes round-tripping data frictionless.

Native Support for Python Types

This is where orjson earns its reputation. The standard library raises TypeError for datetime, dataclass, UUID, and Enum objects unless you write a custom encoder. orjson handles all of them out of the box.

Datetimes and Dates

orjson serializes datetime, date, and time objects to ISO 8601 strings automatically. Timezone-aware datetimes include the UTC offset; naive datetimes are serialized without one.

# datetime_types.py
import orjson
from datetime import datetime, date, time, timezone, timedelta

eastern = timezone(timedelta(hours=-5))

data = {
    "created_at": datetime(2026, 5, 6, 14, 30, 0, tzinfo=eastern),
    "birth_date": date(1990, 3, 15),
    "run_time": time(8, 45, 0),
    "utc_now": datetime(2026, 5, 6, 19, 30, 0, tzinfo=timezone.utc),
}

print(orjson.dumps(data).decode())

Output:

{"created_at":"2026-05-06T14:30:00-05:00","birth_date":"1990-03-15","run_time":"08:45:00","utc_now":"2026-05-06T19:30:00+00:00"}

The timezone offset is handled correctly for each value — -05:00 for Eastern and +00:00 for UTC. This is exactly what you need when sending timestamps to APIs or storing records that must be unambiguous about timezone.

Dataclasses and Named Tuples

orjson serializes Python dataclass instances and NamedTuple subclasses as JSON objects, mapping field names to values. You do not need a to_dict() method or a custom serializer.

# dataclass_serial.py
import orjson
from dataclasses import dataclass
from typing import List
from datetime import datetime, timezone

@dataclass
class Article:
    title: str
    author: str
    tags: List[str]
    published_at: datetime
    views: int

article = Article(
    title="How To Use orjson in Python",
    author="alice",
    tags=["python", "performance", "json"],
    published_at=datetime(2026, 5, 6, 10, 0, 0, tzinfo=timezone.utc),
    views=1024,
)

result = orjson.dumps(article, option=orjson.OPT_INDENT_2)
print(result.decode())

Output:

{
  "title": "How To Use orjson in Python",
  "author": "alice",
  "tags": [
    "python",
    "performance",
    "json"
  ],
  "published_at": "2026-05-06T10:00:00+00:00",
  "views": 1024
}

Notice that the nested datetime field inside the dataclass is also serialized correctly — orjson handles type conversion recursively through the entire object tree.

orjson Options for Output Control

orjson uses a set of integer flags (passed via the option parameter) to control serialization behavior. You combine multiple options with the bitwise OR operator (|).

# orjson_options.py
import orjson
from datetime import datetime, timezone

data = {
    "name": "alice",
    "score": 99,
    "timestamp": datetime(2026, 5, 6, 10, 0, 0, tzinfo=timezone.utc),
    "metadata": {"source": "api", "version": 2},
}

# Pretty print with 2-space indent
pretty = orjson.dumps(data, option=orjson.OPT_INDENT_2)
print("Pretty:\n", pretty.decode())

# Sort keys alphabetically
sorted_keys = orjson.dumps(data, option=orjson.OPT_SORT_KEYS)
print("\nSorted keys:", sorted_keys.decode())

# Non-string dict keys (e.g., int keys)
int_key_data = {1: "one", 2: "two", 3: "three"}
int_keys = orjson.dumps(int_key_data, option=orjson.OPT_NON_STR_KEYS)
print("\nInt keys:", int_keys.decode())

# Combine options
combined = orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS)
print("\nCombined:", combined.decode())

Output:

Pretty:
 {
  "name": "alice",
  "score": 99,
  "timestamp": "2026-05-06T10:00:00+00:00",
  "metadata": {
    "source": "api",
    "version": 2
  }
}

Sorted keys: {"metadata":{"source":"api","version":2},"name":"alice","score":99,"timestamp":"2026-05-06T10:00:00+00:00"}

Int keys: {"1":"one","2":"two","3":"three"}

Combined: {
  "metadata": {
    "source": "api",
    "version": 2
  },
  "name": "alice",
  "score": 99,
  "timestamp": "2026-05-06T10:00:00+00:00"
}

The most commonly used options are OPT_INDENT_2 for human-readable output, OPT_SORT_KEYS for deterministic output (useful for hashing or diffing), and OPT_NON_STR_KEYS when your dicts use integer or tuple keys.

Benchmarking orjson vs stdlib json

Speed claims are only useful if you can measure them yourself. Here is a self-contained benchmark you can run to compare orjson against the standard library on your own machine.

# benchmark.py
import json
import orjson
import timeit
from datetime import datetime, timezone

# Sample payload -- similar to a typical API response
payload = {
    "id": 12345,
    "name": "alice",
    "email": "alice@example.com",
    "created_at": "2026-05-06T10:00:00+00:00",
    "scores": [98, 87, 92, 95, 88],
    "metadata": {"source": "api", "active": True, "version": 3},
}

ITERATIONS = 50000

# Benchmark json.dumps
json_encode_time = timeit.timeit(
    lambda: json.dumps(payload),
    number=ITERATIONS
)

# Benchmark orjson.dumps
orjson_encode_time = timeit.timeit(
    lambda: orjson.dumps(payload),
    number=ITERATIONS
)

json_str = json.dumps(payload)
orjson_bytes = orjson.dumps(payload)

# Benchmark json.loads
json_decode_time = timeit.timeit(
    lambda: json.loads(json_str),
    number=ITERATIONS
)

# Benchmark orjson.loads
orjson_decode_time = timeit.timeit(
    lambda: orjson.loads(orjson_bytes),
    number=ITERATIONS
)

print(f"Encoding ({ITERATIONS:,} iterations):")
print(f"  json.dumps:   {json_encode_time:.3f}s")
print(f"  orjson.dumps: {orjson_encode_time:.3f}s")
print(f"  Speedup:      {json_encode_time / orjson_encode_time:.1f}x")
print(f"\nDecoding ({ITERATIONS:,} iterations):")
print(f"  json.loads:   {json_decode_time:.3f}s")
print(f"  orjson.loads: {orjson_decode_time:.3f}s")
print(f"  Speedup:      {json_decode_time / orjson_decode_time:.1f}x")

Output (approximate — your numbers will vary by hardware):

Encoding (50,000 iterations):
  json.dumps:   0.621s
  orjson.dumps: 0.089s
  Speedup:      7.0x

Decoding (50,000 iterations):
  json.loads:   0.514s
  orjson.loads: 0.112s
  Speedup:      4.6x

The encoding speedup is most dramatic because orjson avoids Python-level object traversal entirely. For decoding, the speedup is still significant but slightly lower because Python dict construction has some overhead regardless of where parsing happens. The more complex and deeply nested your data, the larger the performance gap tends to be.

Real-Life Example: High-Throughput Log Processor

Here is a practical script that reads a large batch of JSON log records, deserializes them with orjson, filters for error events, enriches each record with processing metadata, and serializes the results back to a JSONL file. This pattern is common in data pipelines, ETL jobs, and log analytics systems.

# log_processor.py
import orjson
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import List, Optional
import io

# -- Sample log data (in a real pipeline, this would come from a file or stream) --
SAMPLE_LOGS = [
    b'{"level":"ERROR","service":"auth","message":"token expired","user_id":101,"ts":"2026-05-06T09:01:00Z"}',
    b'{"level":"INFO","service":"api","message":"request received","user_id":202,"ts":"2026-05-06T09:01:05Z"}',
    b'{"level":"ERROR","service":"db","message":"connection timeout","user_id":null,"ts":"2026-05-06T09:01:10Z"}',
    b'{"level":"WARN","service":"cache","message":"cache miss","user_id":303,"ts":"2026-05-06T09:01:15Z"}',
    b'{"level":"ERROR","service":"auth","message":"invalid signature","user_id":404,"ts":"2026-05-06T09:01:20Z"}',
]

@dataclass
class EnrichedLog:
    level: str
    service: str
    message: str
    user_id: Optional[int]
    original_ts: str
    processed_at: datetime
    is_critical: bool

def process_log_batch(raw_logs: List[bytes]) -> List[EnrichedLog]:
    """Filter ERROR logs and enrich them with processing metadata."""
    enriched = []
    now = datetime.now(timezone.utc)

    for raw in raw_logs:
        record = orjson.loads(raw)

        # Only process errors
        if record.get("level") != "ERROR":
            continue

        log = EnrichedLog(
            level=record["level"],
            service=record.get("service", "unknown"),
            message=record.get("message", ""),
            user_id=record.get("user_id"),
            original_ts=record.get("ts", ""),
            processed_at=now,
            is_critical="timeout" in record.get("message", "").lower(),
        )
        enriched.append(log)

    return enriched

def write_jsonl(records: List[EnrichedLog]) -> str:
    """Serialize records to JSONL format (one JSON object per line)."""
    lines = []
    for record in records:
        lines.append(orjson.dumps(record, option=orjson.OPT_NON_STR_KEYS).decode())
    return "\n".join(lines)

# Run the pipeline
errors = process_log_batch(SAMPLE_LOGS)
output = write_jsonl(errors)

print(f"Processed {len(SAMPLE_LOGS)} log records")
print(f"Found {len(errors)} ERROR events\n")
print("=== Enriched Error Logs (JSONL) ===")
print(output)

Output:

Processed 5 log records
Found 3 ERROR events

=== Enriched Error Logs (JSONL) ===
{"level":"ERROR","service":"auth","message":"token expired","user_id":101,"original_ts":"2026-05-06T09:01:00Z","processed_at":"2026-05-06T09:01:30+00:00","is_critical":false}
{"level":"ERROR","service":"db","message":"connection timeout","user_id":null,"original_ts":"2026-05-06T09:01:10Z","processed_at":"2026-05-06T09:01:30+00:00","is_critical":true}
{"level":"ERROR","service":"auth","message":"invalid signature","user_id":404,"original_ts":"2026-05-06T09:01:20Z","processed_at":"2026-05-06T09:01:30+00:00","is_critical":false}

This script demonstrates three real benefits: orjson deserializes each log line with loads(), the EnrichedLog dataclass is serialized directly without any manual conversion, and the datetime.now() value is automatically converted to ISO 8601 with timezone. You can extend this by reading from a real file with open("app.log", "rb") and writing to disk instead of printing.

Frequently Asked Questions

Why does orjson.dumps() return bytes instead of str?

JSON is defined as UTF-8 encoded text, and bytes is the natural representation for encoded data. Most real-world destinations — network sockets, file I/O, HTTP response bodies, database BLOBs — work with bytes directly. Returning bytes skips an extra allocation and avoids the overhead of converting to a Python string object. If you need a string, simply call result.decode(). The performance difference between returning bytes vs. string is small, but across millions of serializations it is measurable.

Can I use a custom encoder for unsupported types?

orjson supports a default parameter similar to the stdlib. You pass a callable that receives the unserializable object and must return something orjson can handle. For example: orjson.dumps(obj, default=lambda o: o.__dict__). This is the escape hatch for types like Decimal, custom objects, or third-party classes that orjson does not natively recognize. Keep the default function fast because it is called once per unrecognized type instance.

Is orjson a drop-in replacement for the stdlib json module?

Almost, but not quite. The main behavioral differences are: dumps() returns bytes not str; the indent parameter is replaced by option=orjson.OPT_INDENT_2; and cls and object_hook parameters are not supported. If your code calls json.dumps() and passes the result directly to write(), the transition is a one-line change. If you depend on indent= or custom decoders, you will need small adjustments.

Does orjson support numpy arrays?

Yes, with option=orjson.OPT_SERIALIZE_NUMPY. This serializes numpy arrays as JSON arrays, respecting the dtype. Supported dtypes include all integer and float variants, bool, and str. If you frequently serialize numpy data (e.g., embedding vectors, model predictions), this option makes orjson significantly more convenient than the stdlib, which requires converting arrays to lists manually.

What errors should I watch for?

orjson raises orjson.JSONDecodeError (a subclass of json.JSONDecodeError) for invalid input to loads(), so existing try/except blocks that catch json.JSONDecodeError still work. For serialization, it raises orjson.JSONEncodeError for types it cannot handle. The most common case is passing an object that is not a dataclass, dict, list, or primitive — pass a default function to handle those cases.

Is orjson thread-safe?

Yes. orjson has no mutable global state and each dumps() and loads() call is independent. You can call it from multiple threads concurrently without locks. This matters in web servers (FastAPI, Django) that process many requests in parallel — you can replace the stdlib json calls in serialization middleware without introducing thread contention.

Conclusion

orjson is one of those rare libraries that does exactly one thing and does it better than the alternative in nearly every way. You get 5-10x faster serialization, native support for datetime, dataclass, UUID, and Enum objects without custom encoders, a clean options API for formatting control, and thread safety — all from a single pip install orjson. The only adjustment you need to make is calling .decode() when you need a string instead of bytes.

Start with the benchmark script from this article to measure the difference on your own workload. For data pipelines, API serializers, and any code that processes large volumes of JSON, orjson is a straightforward performance win. You can also explore orjsonds OPT_SERIALIZE_NUMPY option if your project works with numpy arrays, and combine it with OPT_SORT_KEYS for reproducible output in tests. For the full option reference, visit the official orjson documentation on GitHub.