How To Use Python tomllib for TOML File Parsing

How To Use Python tomllib for TOML File Parsing

Beginner

Configuration files are one of those things every Python project eventually needs: a place to store database credentials, API endpoints, feature flags, and application settings that should not be hardcoded into your source files. The traditional options — .ini files parsed by configparser, JSON files that do not support comments, or YAML files that require a third-party library — all have frustrating limitations. TOML (Tom’s Obvious Minimal Language) solves most of them: it is human-readable, supports comments, handles dates and arrays natively, and maps cleanly to Python dictionaries. Since Python 3.11, you can parse TOML without installing anything — it is in the standard library as tomllib.

If you have worked with a modern Python project, you have already encountered TOML. The pyproject.toml file that Poetry, Hatch, PDM, and setuptools all use to define project metadata is TOML. The [tool.pytest.ini_options] section for pytest configuration is TOML. The [tool.ruff] linting configuration is TOML. The format has quietly become the standard for Python tooling configuration, and tomllib gives you first-class access to it from your own code. For Python 3.10 and earlier, the third-party tomli package provides the identical API.

This article covers: the TOML syntax you need to know (strings, integers, booleans, arrays, tables, and inline tables), reading a TOML file with tomllib.load(), accessing nested configuration values, handling missing keys safely with .get(), reading pyproject.toml for project metadata, validating configuration with dataclasses, writing a configuration loader class, and a practical application config pattern you can drop into any project. No installation required for Python 3.11+.

Reading TOML in Python: Quick Example

Here is the complete minimal workflow: a TOML config file and the three lines of Python needed to read it.

# config.toml
[database]
host = "localhost"
port = 5432
name = "myapp"

[app]
debug = false
max_connections = 10
allowed_hosts = ["localhost", "127.0.0.1"]
# read_config.py
import tomllib

with open("config.toml", "rb") as f:
    config = tomllib.load(f)

print(config["database"]["host"])         # localhost
print(config["database"]["port"])         # 5432
print(config["app"]["allowed_hosts"])     # ['localhost', '127.0.0.1']
print(type(config["app"]["debug"]))       # <class 'bool'>
localhost
5432
['localhost', '127.0.0.1']
<class 'bool'>

Three things to notice: the file is opened in binary mode ("rb", not "r") — this is required by tomllib and is not optional. The resulting config is a plain Python dictionary, so all standard dict operations work on it. The TOML false became a Python bool automatically — tomllib maps TOML types to their Python equivalents without any manual conversion. The sections below explore the full TOML type system and more complex configuration patterns.

What is TOML and How Does It Compare to JSON and YAML?

TOML stands for Tom’s Obvious Minimal Language. It was created by Tom Preston-Werner (co-founder of GitHub) in 2013 as a configuration file format that is easier to read than JSON (because it supports comments and has cleaner syntax) and more predictable than YAML (because YAML has notoriously surprising indentation and type coercion rules).

FeatureTOMLJSONYAMLINI
CommentsYes (#)NoYes (#)Yes (; or #)
Nested structuresYes (tables)YesYesLimited
ArraysYesYesYesNo
Dates/timesNativeString onlyYesNo
Python stdlib3.11+ (tomllib)Yes (json)No (requires PyYAML)Yes (configparser)
Readable by humansHighMediumHigh (until it isn’t)Medium

The key TOML advantage over JSON for configuration files is comments — being able to document why a setting exists is essential when multiple people work on a project. The advantage over YAML is predictability: TOML has fewer implicit type coercions and indentation has no semantic meaning, which eliminates a whole class of subtle bugs.

Debug Dee lifting apart nested configuration data layers
tomllib is stdlib since 3.11 — zero dependencies, zero excuses.

TOML Syntax Essentials

You do not need to memorize the full TOML specification to use it effectively for application config. Here are the constructs that cover 95% of real-world use cases.

# toml_examples.toml
# This is a comment -- TOML supports them, unlike JSON

# --- Scalar types ---
name = "MyApp"                    # String (double or single quotes)
version = "1.0.0"
max_retries = 3                   # Integer
timeout = 30.5                    # Float
debug = false                     # Boolean (true/false, lowercase)
start_date = 2026-01-15           # Local date (TOML native type)
created_at = 2026-01-15T10:30:00  # Local datetime

# --- Arrays ---
ports = [8080, 8081, 8082]
tags = ["web", "api", "python"]
matrix = [[1, 2], [3, 4]]         # Arrays of arrays

# --- Tables (sections) ---
[database]
host = "db.example.com"
port = 5432

# --- Nested tables ---
[database.replica]
host = "replica.example.com"
port = 5433

# --- Inline tables (single line) ---
[server]
address = {host = "0.0.0.0", port = 8080}

# --- Array of tables (list of dicts) ---
[[users]]
name = "alice"
role = "admin"

[[users]]
name = "bob"
role = "viewer"

The [[double bracket]] syntax creates an array of tables — each [[users]] entry appends a new dictionary to the users list. This is the TOML way to represent what would be a JSON array of objects. The dot notation in [database.replica] is equivalent to a nested dict: config["database"]["replica"] in Python.

Loading and Accessing Configuration

Once you have a TOML file, reading it is two lines. The challenge is accessing nested values safely — especially when some keys may be optional or provided by different environments.

# load_config.py
import tomllib

def load_config(path: str) -> dict:
    """Load a TOML config file and return it as a dict."""
    with open(path, "rb") as f:
        return tomllib.load(f)

config = load_config("config.toml")

# Direct access (raises KeyError if missing)
db_host = config["database"]["host"]

# Safe access with defaults using .get()
db_port = config.get("database", {}).get("port", 5432)
debug = config.get("app", {}).get("debug", False)
log_level = config.get("logging", {}).get("level", "INFO")

print(f"Database: {db_host}:{db_port}")
print(f"Debug mode: {debug}")
print(f"Log level: {log_level}")

# Access array of tables
for user in config.get("users", []):
    print(f"  User: {user['name']} ({user['role']})")
Database: localhost:5432
Debug mode: False
Log level: INFO
  User: alice (admin)
  User: bob (viewer)

The double .get() pattern — config.get("section", {}).get("key", default) — safely handles both a missing section and a missing key within a present section. Without the empty dict fallback, a missing section would cause .get() to return None, and then calling .get() on None would raise AttributeError. This pattern is defensive and verbose; for larger configs, a dataclass-based validator (shown later) is more maintainable.

Reading pyproject.toml for Project Metadata

One practical use of tomllib is reading your own project’s pyproject.toml at runtime — for example, to display the application version without hardcoding it in two places.

# read_pyproject.py
import tomllib
from pathlib import Path

def get_project_version() -> str:
    """Read the version string from pyproject.toml."""
    pyproject_path = Path(__file__).parent.parent / "pyproject.toml"
    if not pyproject_path.exists():
        return "unknown"
    with open(pyproject_path, "rb") as f:
        data = tomllib.load(f)
    # Standard location for Poetry and setuptools
    return data.get("tool", {}).get("poetry", {}).get("version") \
        or data.get("project", {}).get("version", "unknown")

def get_pytest_config() -> dict:
    """Read pytest configuration from pyproject.toml."""
    pyproject_path = Path(__file__).parent.parent / "pyproject.toml"
    with open(pyproject_path, "rb") as f:
        data = tomllib.load(f)
    return data.get("tool", {}).get("pytest.ini_options", {})

print(f"Project version: {get_project_version()}")
pytest_cfg = get_pytest_config()
if pytest_cfg:
    print(f"Pytest testpaths: {pytest_cfg.get('testpaths', ['tests'])}")
Project version: 1.2.0
Pytest testpaths: ['tests', 'integration_tests']

This pattern lets you maintain a single source of truth for the version string: update it in pyproject.toml and every part of the application that calls get_project_version() picks up the change automatically. The Path(__file__).parent.parent navigates from the module file up to the project root — adjust the number of .parent calls based on your directory structure.

Pyro Pete with open config vault streaming data
Nested TOML tables map directly to nested Python dicts. No surprises.

Validating Config with Dataclasses

Raw dictionary access works but gives you no type checking and no clear picture of which fields are required. Wrapping your config in a dataclass provides IDE autocomplete, type annotations, and a clear schema for what valid configuration looks like.

# config_schema.py
import tomllib
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class DatabaseConfig:
    host: str
    port: int = 5432
    name: str = "app"
    user: str = "postgres"
    password: str = ""

@dataclass
class AppConfig:
    debug: bool = False
    log_level: str = "INFO"
    allowed_hosts: list = field(default_factory=list)
    database: DatabaseConfig = field(default_factory=DatabaseConfig)

    @classmethod
    def from_toml(cls, path: str) -> "AppConfig":
        with open(path, "rb") as f:
            data = tomllib.load(f)
        db_data = data.get("database", {})
        return cls(
            debug=data.get("app", {}).get("debug", False),
            log_level=data.get("app", {}).get("log_level", "INFO"),
            allowed_hosts=data.get("app", {}).get("allowed_hosts", []),
            database=DatabaseConfig(**db_data) if db_data else DatabaseConfig(host="localhost"),
        )

# Usage
config = AppConfig.from_toml("config.toml")
print(f"DB host: {config.database.host}")
print(f"DB port: {config.database.port}")
print(f"Debug: {config.debug}")
print(f"Hosts: {config.allowed_hosts}")
DB host: localhost
DB port: 5432
Debug: False
Hosts: ['localhost', '127.0.0.1']

The from_toml() classmethod is the idiomatic pattern for this: it reads raw TOML, maps values to the dataclass fields, and returns a typed configuration object. Your application code only sees AppConfig objects with dot-attribute access, not raw dict lookups. This approach scales well — as configuration grows more complex, you add nested dataclasses and the validation logic stays in one place.

Real-Life Example: Multi-Environment Configuration Loader

# env_config_loader.py
"""
Loads a base config.toml and merges environment-specific overrides.
Usage: CONFIG_ENV=production python myapp.py
"""
import tomllib
import os
from pathlib import Path
from dataclasses import dataclass, field

CONFIG_DIR = Path("config")

def load_toml(path: Path) -> dict:
    if not path.exists():
        return {}
    with open(path, "rb") as f:
        return tomllib.load(f)

def deep_merge(base: dict, override: dict) -> dict:
    """Recursively merge override into base."""
    result = base.copy()
    for key, val in override.items():
        if key in result and isinstance(result[key], dict) and isinstance(val, dict):
            result[key] = deep_merge(result[key], val)
        else:
            result[key] = val
    return result

def load_config() -> dict:
    env = os.getenv("CONFIG_ENV", "development")
    base = load_toml(CONFIG_DIR / "config.toml")
    override = load_toml(CONFIG_DIR / f"config.{env}.toml")
    merged = deep_merge(base, override)
    print(f"Loaded config for environment: {env}")
    return merged

config = load_config()
print(f"DB host: {config.get('database', {}).get('host')}")
print(f"Debug: {config.get('app', {}).get('debug')}")
print(f"Log level: {config.get('app', {}).get('log_level')}")
Loaded config for environment: development
DB host: localhost
Debug: True
Log level: DEBUG

This loader reads config/config.toml as the base configuration and then reads config/config.development.toml (or config.production.toml, etc.) as an override layer. The deep_merge() function recursively merges the two dictionaries so environment-specific files only need to specify what differs from the base. Set CONFIG_ENV=production before running to load production overrides. This pattern avoids duplicating shared configuration across environment files while still allowing clean environment-specific customization.

Frequently Asked Questions

Can tomllib write TOML files?

tomllib is read-only — it can only parse existing TOML files. To write TOML from Python, use the third-party tomli-w package (pip install tomli-w). It provides a tomli_w.dumps(data) function that serializes a Python dict to a TOML string. For most application configuration use cases, you write config files by hand and only read them from Python, so the read-only stdlib module covers the vast majority of needs.

What if I’m on Python 3.10 or earlier?

Install the tomli package (pip install tomli). It provides the identical API as tomllib and was the reference implementation that was accepted into the standard library. A common compatibility pattern: try: import tomllib except ImportError: import tomli as tomllib. This single try/except block makes your code work on Python 3.10 and earlier without any other changes.

Why does tomllib require binary mode (“rb”) instead of text mode (“r”)?

TOML requires UTF-8 encoding. Opening in binary mode and letting tomllib handle the UTF-8 decoding internally guarantees correct behavior regardless of the platform’s default encoding (which can be UTF-8 on Linux/macOS but something different on some Windows configurations). If you have a TOML string rather than a file, use tomllib.loads(string) — note the s suffix, which accepts a Python str rather than a binary file object.

Should I put secrets like database passwords in TOML files?

No. TOML files (especially config.toml) should not contain secrets because they often end up in version control. Use TOML for non-sensitive configuration and use environment variables (read with os.environ.get()) or a secrets manager for passwords and API keys. A common pattern is to read the database password from an environment variable as the fallback: password = os.environ.get("DB_PASSWORD") or config["database"].get("password", ""). Never commit config.production.toml if it contains any credentials.

When should I use TOML vs. environment variables for configuration?

Use TOML for structured, multi-key configuration that is the same across developer machines and different runs: database hostnames, feature flags, logging levels, allowed hosts. Use environment variables for values that differ per deployment or contain secrets: database passwords, API keys, environment name (production/staging/dev). Many projects use both: a TOML file for the structure and environment variables to inject the secrets at runtime, which is exactly the pattern shown in the multi-environment loader above.

Conclusion

Python’s tomllib module brings first-class TOML parsing to the standard library with no installation required on Python 3.11+. You learned the core TOML syntax including scalars, arrays, tables, and the [[array of tables]] pattern; how to open files in binary mode and load them with tomllib.load(); safe key access patterns with .get(); reading pyproject.toml for project metadata; wrapping raw config dicts in typed dataclasses; and building a multi-environment configuration loader with deep merging. TOML is now the standard format for Python tooling and project configuration — tomllib is the clean, dependency-free way to work with it.

Extend the multi-environment loader by adding validation that raises clear error messages for missing required keys, or combine it with Pydantic’s BaseSettings for automatic environment variable injection and type coercion. Official documentation: docs.python.org/3/library/tomllib.html.

How To Use Python chromadb as a Vector Database

How To Use Python chromadb as a Vector Database

Advanced

Traditional databases store data as rows and columns, which makes them excellent for exact lookups — find me the user with ID 42, or all orders placed last Tuesday. But they struggle with questions like “find documents similar to this one” or “which of these product descriptions best matches what the user is looking for?” Answering those questions requires vector databases, which store data as numerical embeddings and retrieve results by geometric similarity rather than exact match. Chromadb is the easiest way to get started with vector databases in Python: it is open-source, runs embedded inside your process with zero infrastructure, and requires no external server to get started.

Chromadb is an AI-native embedding database designed for developers building LLM-powered applications, semantic search tools, and recommendation systems. You store your text (or any data that can be embedded) along with optional metadata, and chromadb handles the vector math for finding the most semantically similar items when you query it. It ships with built-in support for several sentence-transformer models so you do not even need a separate embedding step — chromadb can generate embeddings from raw text automatically. For production use, it also supports a client-server mode and can back LlamaIndex, LangChain, and similar RAG frameworks.

This article covers: installing chromadb, creating a persistent client and collections, adding documents with automatic embedding, querying by semantic similarity, filtering results with metadata, using custom embedding functions, managing and updating stored documents, and a practical semantic search project you can adapt for your own data. Everything runs locally with no API keys required for the built-in embedding models.

chromadb Similarity Search: Quick Example

This example creates an in-memory chromadb collection, adds four sentences, and queries for the one most similar to a new sentence — all in under 20 lines.

# quick_chroma.py
import chromadb

# Create an in-memory client (data is lost on exit)
client = chromadb.Client()

# Create a collection (chromadb auto-generates embeddings using sentence-transformers)
collection = client.create_collection("quick_test")

# Add some documents
collection.add(
    documents=["Python is a high-level programming language.",
               "Cats are popular household pets.",
               "Machine learning enables pattern recognition.",
               "Dogs need daily exercise and walks."],
    ids=["doc1", "doc2", "doc3", "doc4"]
)

# Query for the most similar document
results = collection.query(query_texts=["AI and deep learning"], n_results=2)
print("Top matches:")
for doc, dist in zip(results["documents"][0], results["distances"][0]):
    print(f"  [{dist:.3f}] {doc}")
Top matches:
  [0.312] Machine learning enables pattern recognition.
  [0.891] Python is a high-level programming language.

The query “AI and deep learning” correctly identifies the machine learning sentence as the closest match (lowest distance = most similar), followed by the Python sentence. Chromadb automatically generated embeddings for both the stored documents and the query using its default sentence-transformer model — no embedding code required. Lower distance means higher similarity. The sections below show persistent storage, metadata filtering, and production-ready patterns.

What is a Vector Database and Where Does chromadb Fit?

A vector database stores data as high-dimensional numerical vectors called embeddings. An embedding is a list of numbers (typically 384 to 1536 floats) that captures the semantic meaning of text, an image, or any data type. Items with similar meaning end up with vectors that are geometrically close to each other. Finding similar items is then a nearest-neighbor search problem — no SQL, no exact keyword matching.

FeaturechromadbPineconeWeaviate
HostingEmbedded (local) or serverCloud-onlySelf-hosted or cloud
Setup time<5 minutesAccount + API keyDocker or account
Built-in embeddingsYes (sentence-transformers)NoYes (modules)
Best forDev, prototyping, small productionLarge-scale prodGraph + vector hybrid
CostFreeFree tier, then paidFree (self-hosted)

Chromadb’s superpower is its zero-friction local mode: one pip install and you have a fully functional vector database running in your Python process. For production applications with millions of documents, you would graduate to Pinecone or a hosted Weaviate — but chromadb handles hundreds of thousands of documents comfortably and supports a server mode for shared access.

Cache Katie running along glowing vector data highway
Vector embeddings: where your text lives in n-dimensional space.

Installing chromadb

Install chromadb with pip. The first time you create a collection with default settings, it also downloads the sentence-transformers model (~90MB) for automatic embedding generation.

# install_chromadb.sh
pip install chromadb
Successfully installed chromadb-0.5.0 sentence-transformers-3.0.0 torch-2.3.0

The install pulls in PyTorch and sentence-transformers as dependencies, which is why the download is large. If you plan to use only external embedding functions (OpenAI, Cohere, etc.), you can install with pip install chromadb --no-deps and manually install only the dependencies you need, but for most use cases the default install is simpler.

Creating a Persistent Client

The in-memory client from the quick example loses all data when your program exits. For any real use case, create a persistent client that saves the database to disk. The interface is identical — only the client creation differs.

# persistent_chroma.py
import chromadb

# Persistent client: data saved to ./chroma_data/
client = chromadb.PersistentClient(path="./chroma_data")

# get_or_create_collection is idempotent -- safe to call every time
collection = client.get_or_create_collection(
    name="my_docs",
    metadata={"hnsw:space": "cosine"}  # use cosine similarity (default is L2)
)

print(f"Collection: {collection.name}")
print(f"Items stored: {collection.count()}")
Collection: my_docs
Items stored: 0

The hnsw:space metadata key sets the distance metric used for nearest-neighbor search. "cosine" normalizes vectors before comparing them, which generally works better for text embeddings. "l2" (Euclidean distance) is the default and works well for embeddings from models that produce unit-length vectors. Choose cosine unless you have a specific reason to use L2. The HNSW (Hierarchical Navigable Small World) index is the approximate nearest-neighbor algorithm chromadb uses for efficient similarity search.

Adding Documents with Metadata

Chromadb stores three things per document: the text content (optional if you provide raw embeddings), a unique string ID, and optional metadata as a flat key-value dictionary. Metadata enables filtering queries to a subset of the collection — for example, all documents from a specific author, date range, or category.

# add_documents.py
import chromadb

client = chromadb.PersistentClient(path="./chroma_data")
collection = client.get_or_create_collection("support_tickets")

# Add documents with metadata
collection.add(
    documents=[
        "The login page is returning a 500 error after the latest deployment.",
        "Users cannot reset their password via the mobile app.",
        "Export to CSV is missing column headers in the downloaded file.",
        "The dashboard loads slowly when more than 100 items are displayed.",
        "Login button is not visible on small screen sizes below 375px.",
    ],
    ids=["ticket-001", "ticket-002", "ticket-003", "ticket-004", "ticket-005"],
    metadatas=[
        {"category": "bug", "priority": "critical", "component": "auth"},
        {"category": "bug", "priority": "high", "component": "auth"},
        {"category": "bug", "priority": "medium", "component": "export"},
        {"category": "performance", "priority": "medium", "component": "dashboard"},
        {"category": "bug", "priority": "low", "component": "ui"},
    ]
)

print(f"Collection now has {collection.count()} items")
Collection now has 5 items

Each document gets a unique ID that you define. IDs must be unique within a collection; attempting to add a duplicate ID raises an error. Metadata values must be strings, integers, or floats — nested objects are not supported. Use flat, query-friendly keys: category, date, author_id, source are typical patterns.

Debug Dee examining colored dot clusters with magnifying glass
Metadata filters cut candidates before the vector search — faster and more relevant.

Querying with Semantic Search and Metadata Filters

The query() method accepts natural language queries and returns the most semantically similar documents. Metadata filters narrow the search to a subset of the collection before similarity search runs — this is both faster and more accurate than filtering after retrieval.

# query_chroma.py
import chromadb

client = chromadb.PersistentClient(path="./chroma_data")
collection = client.get_or_create_collection("support_tickets")

# Semantic search: find 2 most similar issues
results = collection.query(
    query_texts=["user can't log in"],
    n_results=2
)

print("Semantic search results:")
for doc, meta, dist in zip(
    results["documents"][0],
    results["metadatas"][0],
    results["distances"][0]
):
    print(f"  [{dist:.3f}] {meta.get('component')}: {doc}")

# Filtered search: only look at 'auth' component bugs
filtered = collection.query(
    query_texts=["sign in problem"],
    n_results=3,
    where={"$and": [{"category": "bug"}, {"component": "auth"}]}
)

print("\nFiltered results (auth bugs only):")
for doc in filtered["documents"][0]:
    print(f"  - {doc}")
Semantic search results:
  [0.234] auth: The login page is returning a 500 error after the latest deployment.
  [0.312] auth: Users cannot reset their password via the mobile app.

Filtered results (auth bugs only):
  - The login page is returning a 500 error after the latest deployment.
  - Users cannot reset their password via the mobile app.

The filtered search uses chromadb’s where clause to pre-filter to only auth bugs before running similarity search. The filter language supports $and, $or, $eq, $ne, $gt, $lt, and other comparison operators. For simple equality filters on a single field, you can use where={"category": "bug"} directly without the $and wrapper.

ditions with $and and $or. The where_document parameter filters by document content (substring match), while where filters by metadata. Use metadata filters aggressively — they reduce the candidate set before the vector search, which is both faster and produces better results.

Updating and Deleting Documents

Chromadb supports document updates via upsert() (insert or replace) and deletion by ID. Use upsert when you need to keep stored documents in sync with a source of truth like a database or file system.

# update_chroma.py
import chromadb

client = chromadb.PersistentClient(path="./chroma_data")
collection = client.get_or_create_collection("support_tickets")

# Update an existing document
collection.upsert(
    ids=["ticket-003"],
    documents=["Export to CSV is missing column headers. Reproducible on Windows only."],
    metadatas=[{"category": "bug", "priority": "high", "component": "export"}]
)

# Delete a document
collection.delete(ids=["ticket-005"])

print(f"Collection count after update/delete: {collection.count()}")

# Fetch a specific document by ID
item = collection.get(ids=["ticket-003"], include=["documents", "metadatas"])
print("Updated ticket-003:")
print(f"  {item['documents'][0]}")
print(f"  Priority: {item['metadatas'][0]['priority']}")
Collection count after update/delete: 4
Updated ticket-003:
  Export to CSV is missing column headers. Reproducible on Windows only.
  Priority: high

upsert() is safe to call unconditionally — it creates new documents if the ID does not exist and replaces them if it does. This makes it ideal for sync pipelines where you re-process your source data on a schedule. When you upsert, chromadb regenerates the embedding for the new text, so the vector in the index stays current with your document content.

Real-Life Example: Semantic Support Ticket Router

# ticket_router.py
"""
Routes incoming support tickets to the most relevant existing ticket category
using semantic similarity. Returns similar past tickets + suggested category.
"""
import chromadb
from chromadb.utils.embedding_functions import DefaultEmbeddingFunction

TICKETS_DB = "./ticket_router_db"

SAMPLE_TICKETS = [
    ("t001", "Password reset email never arrives", "auth", "high"),
    ("t002", "Two-factor authentication code rejected", "auth", "high"),
    ("t003", "CSV export crashes with large datasets", "export", "medium"),
    ("t004", "PDF export missing images", "export", "medium"),
    ("t005", "Page loads take over 10 seconds", "performance", "high"),
    ("t006", "Search results appear after long delay", "performance", "medium"),
    ("t007", "Mobile layout broken on iPhone SE", "ui", "low"),
    ("t008", "Dropdown menu overlaps page content", "ui", "low"),
]

def setup_collection(client):
    col = client.get_or_create_collection("tickets", metadata={"hnsw:space": "cosine"})
    if col.count() == 0:
        col.add(
            ids=[t[0] for t in SAMPLE_TICKETS],
            documents=[t[1] for t in SAMPLE_TICKETS],
            metadatas=[{"component": t[2], "priority": t[3]} for t in SAMPLE_TICKETS]
        )
    return col

def route_ticket(collection, new_ticket_text):
    results = collection.query(query_texts=[new_ticket_text], n_results=3)
    docs = results["documents"][0]
    metas = results["metadatas"][0]
    dists = results["distances"][0]
    # Infer category from top match
    top_component = metas[0]["component"]
    print(f"New ticket: {new_ticket_text}")
    print(f"Suggested category: {top_component}")
    print("Similar past tickets:")
    for doc, meta, dist in zip(docs, metas, dists):
        print(f"  [{dist:.2f}] [{meta['component']}] {doc}")
    print()

client = chromadb.PersistentClient(path=TICKETS_DB)
col = setup_collection(client)

route_ticket(col, "Users getting 'invalid token' error when logging in")
route_ticket(col, "Report generation freezes the browser tab")
route_ticket(col, "Footer links not visible on Android devices")
New ticket: Users getting 'invalid token' error when logging in
Suggested category: auth
Similar past tickets:
  [0.21] [auth] Two-factor authentication code rejected
  [0.34] [auth] Password reset email never arrives
  [0.81] [performance] Page loads take over 10 seconds

New ticket: Report generation freezes the browser tab
Suggested category: performance
Similar past tickets:
  [0.19] [performance] Page loads take over 10 seconds
  [0.28] [performance] Search results appear after long delay
  [0.55] [export] CSV export crashes with large datasets

New ticket: Footer links not visible on Android devices
Suggested category: ui
Similar past tickets:
  [0.17] [ui] Mobile layout broken on iPhone SE
  [0.29] [ui] Dropdown menu overlaps page content
  [0.74] [auth] Password reset email never arrives

This router correctly categorizes all three new tickets without any keyword rules or explicit training. It works purely by semantic similarity — “invalid token error when logging in” is correctly routed to auth, and “freezes the browser” matches performance. To extend this for production, add a real historical ticket database, route automatically to the correct support team queue, and use the top-3 similar tickets as context when suggesting a resolution to the agent handling the ticket.

Frequently Asked Questions

How many documents can chromadb handle?

The embedded (local) mode comfortably handles hundreds of thousands of documents on a modern laptop. Chromadb uses HNSW for approximate nearest-neighbor search, which scales well to millions of vectors, but the entire index is held in memory. A collection of 1 million 384-dimensional float32 vectors requires roughly 1.5GB of RAM. For very large collections, use the client-server mode (chromadb.HttpClient) or migrate to a managed vector database like Pinecone or Qdrant.

Can I use my own embeddings instead of the built-in ones?

Yes. Pass an embedding_function parameter when creating a collection. LlamaIndex, LangChain, OpenAI, Cohere, and Hugging Face all have ready-made embedding function adapters in chromadb.utils.embedding_functions. You can also pass pre-computed embeddings as a list of float lists directly to collection.add(embeddings=[...], ids=[...]) — skip the documents parameter entirely if you only want to store vectors.

Is chromadb data durable?

With PersistentClient, yes — data is written to the specified directory using SQLite for metadata and HNSW’s native serialization for vectors. The data directory is self-contained and portable. To back up your chromadb database, copy the entire directory. There is no write-ahead log or crash recovery journal, so avoid writes during process crashes, and keep backups of the data directory for production use.

How do I handle multiple users or datasets in one chromadb instance?

Use separate collections per user or dataset. Collections are lightweight — you can create thousands of them. Alternatively, use metadata to tag each document with an owner_id and filter all queries with where={"owner_id": current_user_id}. The collection-per-user approach is simpler and prevents cross-tenant data leakage at the query level. The metadata filter approach uses less disk space when users share a lot of common documents.

Does chromadb integrate with LangChain and LlamaIndex?

Yes, both frameworks have native chromadb support. In LangChain, use Chroma.from_documents(docs, embedding) to create a LangChain vector store backed by chromadb. In LlamaIndex, use ChromaVectorStore from llama_index.vector_stores.chroma. Both frameworks handle the embedding and storage mechanics, letting you use chromadb as the persistence layer while working with the higher-level RAG abstractions those frameworks provide.

Conclusion

Chromadb removes the infrastructure barrier to building vector-search-powered applications. You learned how to create persistent and in-memory clients, add documents with automatic embedding generation, run semantic similarity queries, apply metadata filters to narrow results, update and delete documents with upsert, and build a practical ticket routing system. The zero-configuration local mode makes chromadb ideal for prototyping and small-to-medium production workloads, while the server mode and framework integrations with LlamaIndex and LangChain make it a viable foundation for larger systems.

Extend the ticket router by connecting it to a real issue tracker, adding a feedback loop to improve routing accuracy over time, or using it as the retrieval layer in a full RAG pipeline with an LLM for resolution suggestions. Official documentation: docs.trychroma.com.

How To Use Python LlamaIndex for Document Q&A

Advanced

You have 200 pages of product documentation, a folder of research papers, or a collection of support tickets, and you want to ask questions about that content in plain English and get accurate, specific answers back. Not summaries — answers. “What does section 4.2 say about refund eligibility?” or “Which support ticket mentions the login timeout bug?” This is exactly what LlamaIndex was built for: turning your own documents into a queryable knowledge base powered by a language model, without requiring a data science background to set up.

LlamaIndex (formerly GPT Index) is a Python framework that connects language models to your data. It handles the mechanics of loading documents, splitting them into chunks, converting those chunks into vector embeddings, storing them in an index, and retrieving the most relevant chunks when a question is asked — then passing those chunks plus your question to an LLM to synthesize a final answer. The OpenAI API is the default backend, but LlamaIndex supports local models via Ollama, Hugging Face, and many others. You will need an OpenAI API key for the examples in this article, though the local model alternative is shown at the end.

This article walks through the complete LlamaIndex workflow: installing the library, loading documents from text files and directories, building a VectorStoreIndex, querying it with a QueryEngine, understanding how the retrieval-augmented generation (RAG) pipeline works under the hood, customizing chunk size and retrieval parameters, using a persistent storage backend so you do not re-index on every run, and building a practical document Q&A tool. By the end you will have a working system you can point at any folder of documents.

LlamaIndex Document Q&A: Quick Example

This five-minute example loads a text file, builds a vector index, and answers a question about its content. It requires an OpenAI API key (set as an environment variable) and a text file to query.

# quick_llamaindex.py
import os
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader

# Set your OpenAI key before running:
# export OPENAI_API_KEY="sk-..."

# Load all documents from the ./docs folder
documents = SimpleDirectoryReader("docs").load_data()

# Build a vector index from those documents
index = VectorStoreIndex.from_documents(documents)

# Create a query engine and ask a question
engine = index.as_query_engine()
response = engine.query("What is the main topic of these documents?")
print(response)
The documents cover Python testing best practices, including unit testing
with pytest, test coverage measurement with coverage.py, and setting up
CI pipelines for automated quality checks.

Three steps: load documents with SimpleDirectoryReader, build an index with VectorStoreIndex.from_documents(), and query with engine.query(). LlamaIndex handles chunking, embedding, and retrieval transparently. The answer synthesizes information from across your documents rather than returning raw excerpts. The sections below explain each component and show how to customize the pipeline for real-world use cases.

What is LlamaIndex and How Does the RAG Pipeline Work?

LlamaIndex implements a pattern called Retrieval-Augmented Generation (RAG). Instead of asking a language model to answer from its training data alone, RAG retrieves relevant passages from your documents and includes them as context when asking the question. This gives the model specific, up-to-date information and dramatically reduces hallucination on domain-specific content.

ComponentLlamaIndex ClassWhat It Does
Document loadingSimpleDirectoryReaderReads files from disk (txt, pdf, docx, etc.)
ChunkingSentenceSplitterSplits documents into overlapping text chunks
EmbeddingOpenAIEmbeddingConverts chunks to vector representations
IndexingVectorStoreIndexStores vectors for similarity search
RetrievalVectorIndexRetrieverFinds top-k most relevant chunks for a query
SynthesisResponseSynthesizerSends chunks + query to LLM for final answer

When you call engine.query("..."), LlamaIndex converts your question to a vector, finds the chunks with the highest cosine similarity to that vector, builds a prompt containing those chunks plus your question, and sends it to the LLM. The model’s answer is grounded in what your documents actually say, not what the model memorized during training.

Installing LlamaIndex

LlamaIndex is split into a core package and optional integrations. Install the core package plus the OpenAI integration to follow the examples in this article.

# install_llamaindex.sh
pip install llama-index llama-index-llms-openai llama-index-embeddings-openai
Successfully installed llama-index-core-0.11.0 llama-index-llms-openai-0.3.0
llama-index-embeddings-openai-0.3.0 openai-1.30.0

If you plan to load PDFs, also install pypdf. For Word documents, install docx2txt. LlamaIndex’s SimpleDirectoryReader auto-detects file types and uses the appropriate parser when those packages are available. Set your OpenAI API key as an environment variable before running any of the examples: export OPENAI_API_KEY="sk-your-key-here".

Loading Documents

SimpleDirectoryReader is the fastest way to load a folder of files. By default it reads .txt, .pdf, .docx, .md, and several other formats. You can also load individual files or use custom readers for APIs and databases.

# load_documents.py
from llama_index.core import SimpleDirectoryReader

# Load all files from a directory
documents = SimpleDirectoryReader("docs").load_data()
print(f"Loaded {len(documents)} document(s)")
for doc in documents[:3]:
    print(f"  File: {doc.metadata.get('file_name', 'unknown')}")
    print(f"  Characters: {len(doc.text)}")
    print()

# Load specific file extensions only
pdf_docs = SimpleDirectoryReader(
    "docs",
    required_exts=[".pdf"],
    recursive=True
).load_data()
print(f"PDF documents: {len(pdf_docs)}")
Loaded 4 document(s)
  File: python_testing.txt
  Characters: 8432

  File: deployment_guide.txt
  Characters: 12891

  File: api_reference.txt
  Characters: 5672

PDF documents: 2

Each loaded file becomes a Document object with a text property and a metadata dict containing the file name, path, and other attributes. Documents are loaded in full before chunking — if you have very large files, the chunking step (covered next) handles splitting them into manageable pieces. The recursive=True flag traverses subdirectories.

Building a VectorStoreIndex

Once documents are loaded, VectorStoreIndex.from_documents() chunks them, generates embeddings via the OpenAI API, and stores everything in an in-memory vector store. This step calls the OpenAI embeddings API for each chunk, so it costs a small amount and takes a few seconds to minutes depending on document volume.

# build_index.py
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.core.node_parser import SentenceSplitter

# Configure chunking: 512 token chunks with 50-token overlap
Settings.text_splitter = SentenceSplitter(chunk_size=512, chunk_overlap=50)

documents = SimpleDirectoryReader("docs").load_data()
print(f"Building index from {len(documents)} document(s)...")
index = VectorStoreIndex.from_documents(documents, show_progress=True)
print("Index built successfully.")
print(f"Total nodes (chunks) indexed: {len(index.docstore.docs)}")
Building index from 4 document(s)...
Parsing nodes: 100%|########| 4/4 [00:00<00:00, 12.32it/s]
Generating embeddings: 100%|####| 38/38 [00:03<00:00, 11.8 embeddings/s]
Index built successfully.
Total nodes (chunks) indexed: 38

The chunk_size and chunk_overlap parameters control the granularity of retrieval. Smaller chunks (256-512 tokens) give more precise retrieval at the cost of losing context. Larger chunks (1024+ tokens) preserve more context but may retrieve less relevant sections. The 50-token overlap ensures that sentences split across chunk boundaries are not lost. For most document Q&A tasks, 512 tokens with 50-token overlap is a good starting point.

Querying the Index

The QueryEngine handles retrieval and response synthesis. The default retrieves the top 2 most similar chunks. For document Q&A you usually want more context -- retrieving 5-10 chunks gives the model more to work with and reduces "I don't know" responses on complex questions.

# query_index.py
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader

documents = SimpleDirectoryReader("docs").load_data()
index = VectorStoreIndex.from_documents(documents)

# Default query engine (top-2 retrieval)
engine = index.as_query_engine()
response = engine.query("How do I set up pytest for a new project?")
print("Answer:", response)
print()

# Custom retrieval: top-5 chunks, more context
engine_detailed = index.as_query_engine(similarity_top_k=5)
response2 = engine_detailed.query("What testing libraries are recommended?")
print("Detailed answer:", response2)
print()

# Access the source nodes used to generate the answer
for i, node in enumerate(response2.source_nodes):
    print(f"Source {i+1}: {node.metadata.get('file_name')} (score: {node.score:.3f})")
Answer: To set up pytest for a new project, install it with pip install pytest,
create a tests/ directory, name your test files test_*.py, and run pytest from
your project root.

Detailed answer: The recommended testing libraries include pytest for test
execution, coverage.py (via pytest-cov) for coverage measurement, and
pytest-mock for mocking external dependencies...

Source 1: python_testing.txt (score: 0.912)
Source 2: python_testing.txt (score: 0.887)
Source 3: deployment_guide.txt (score: 0.743)
Source 4: api_reference.txt (score: 0.701)
Source 5: python_testing.txt (score: 0.694)

The source_nodes attribute is extremely useful for building trust in RAG systems -- you can show users exactly which documents and sections the answer came from. The score is the cosine similarity between the query vector and the chunk vector; higher is more relevant. Displaying sources next to answers is standard practice in production document Q&A systems.

Persisting the Index to Disk

Re-indexing every time you run your application is slow and costs money (each embedding call hits the OpenAI API). LlamaIndex can save the index to disk and reload it in under a second on subsequent runs.

# persist_index.py
import os
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    StorageContext,
    load_index_from_storage,
)

INDEX_DIR = "./index_storage"

if os.path.exists(INDEX_DIR):
    # Load from disk -- no API calls needed
    print("Loading index from disk...")
    storage_ctx = StorageContext.from_defaults(persist_dir=INDEX_DIR)
    index = load_index_from_storage(storage_ctx)
else:
    # Build and save
    print("Building index (first run)...")
    documents = SimpleDirectoryReader("docs").load_data()
    index = VectorStoreIndex.from_documents(documents)
    index.storage_context.persist(persist_dir=INDEX_DIR)
    print(f"Index saved to {INDEX_DIR}")

engine = index.as_query_engine(similarity_top_k=4)
print(engine.query("Summarize the key points from all documents."))
Loading index from disk...
The documents cover Python development best practices including testing
strategies with pytest and coverage.py, deployment automation using CI/CD
pipelines, and REST API design patterns with FastAPI...

On the first run, documents are loaded, chunked, and embedded -- which takes a few seconds and costs API credits. Every subsequent run skips that entirely and loads the pre-built index from the ./index_storage folder. This pattern makes LlamaIndex practical for applications that are restarted frequently, like web servers or CLI tools. When your documents change, delete the index_storage directory and let it rebuild on the next run.

Real-Life Example: Company Policy Q&A Tool

# policy_qa.py
"""Simple command-line Q&A tool for a folder of policy documents."""
import os
import sys
from llama_index.core import (
    VectorStoreIndex, SimpleDirectoryReader,
    StorageContext, load_index_from_storage, Settings
)
from llama_index.core.node_parser import SentenceSplitter

DOCS_DIR = "./policies"
INDEX_DIR = "./policy_index"

Settings.text_splitter = SentenceSplitter(chunk_size=512, chunk_overlap=64)

def build_or_load_index():
    if os.path.exists(INDEX_DIR):
        storage = StorageContext.from_defaults(persist_dir=INDEX_DIR)
        return load_index_from_storage(storage)
    docs = SimpleDirectoryReader(DOCS_DIR, recursive=True).load_data()
    if not docs:
        sys.exit(f"No documents found in {DOCS_DIR}")
    print(f"Indexing {len(docs)} document(s)...")
    idx = VectorStoreIndex.from_documents(docs, show_progress=True)
    idx.storage_context.persist(persist_dir=INDEX_DIR)
    return idx

def main():
    index = build_or_load_index()
    engine = index.as_query_engine(similarity_top_k=5)
    print("Policy Q&A ready. Type 'quit' to exit.\n")
    while True:
        question = input("Question: ").strip()
        if question.lower() in ("quit", "exit", "q"):
            break
        if not question:
            continue
        response = engine.query(question)
        print(f"\nAnswer: {response}\n")
        for i, node in enumerate(response.source_nodes[:2], 1):
            fname = node.metadata.get("file_name", "unknown")
            print(f"  Source {i}: {fname} (relevance: {node.score:.2f})")
        print()

if __name__ == "__main__":
    main()
Policy Q&A ready. Type 'quit' to exit.

Question: How many vacation days do employees get per year?
Answer: Full-time employees receive 15 days of paid vacation per year,
increasing to 20 days after 3 years of service...
  Source 1: employee_handbook.txt (relevance: 0.94)
  Source 2: benefits_summary.txt (relevance: 0.87)

Question: What is the remote work policy?
Answer: Employees may work remotely up to 3 days per week with manager
approval. Core hours of 10am-3pm must be observed regardless of location...
  Source 1: remote_work_policy.txt (relevance: 0.96)
  Source 2: employee_handbook.txt (relevance: 0.71)

This tool loads all documents from ./policies, builds a persistent index on first run, and provides an interactive query loop. The source citations after each answer build trust -- users can verify answers against the original documents. To extend this for a web application, replace the input() loop with a Flask or FastAPI endpoint and return str(response) plus the source metadata as JSON.

Frequently Asked Questions

How much does using LlamaIndex with OpenAI cost?

The main cost is embedding generation during indexing. OpenAI's text-embedding-3-small model costs $0.02 per million tokens. A 100-page document is roughly 50,000 tokens, so indexing it costs about $0.001. Query costs depend on the LLM used -- GPT-4o mini queries (which include the retrieved chunks) typically run $0.001-$0.01 per question. For most small to medium document collections, total monthly costs are under $5 with typical usage.

Can I use LlamaIndex without the OpenAI API?

Yes. LlamaIndex supports Ollama, Hugging Face, Anthropic, Google Gemini, and many other providers. Install llama-index-llms-ollama and llama-index-embeddings-huggingface, then configure Settings.llm = Ollama(model="llama3") and Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") before building the index. Local embeddings via HuggingFace are completely free but require a GPU for reasonable performance on large document collections.

How do I improve answer accuracy?

The most impactful changes are: increase similarity_top_k to retrieve more context (try 5-10), reduce chunk_size to 256-384 tokens for more precise matching, use a more capable LLM for synthesis (GPT-4o instead of GPT-4o mini), and experiment with reranking (the CohereRerank or SentenceTransformerRerank postprocessors). Also ensure your documents are clean -- scanned PDFs with poor OCR produce bad embeddings regardless of the retrieval configuration.

What about very large document collections?

The in-memory VectorStoreIndex works well up to a few thousand documents. For larger collections, use a persistent vector database backend: Chroma, Pinecone, Qdrant, or Weaviate all integrate with LlamaIndex via their respective llama-index-vector-stores-* packages. Replace VectorStoreIndex.from_documents(docs) with a ChromaVectorStore or PineconeVectorStore and the query interface remains identical.

How do I add new documents to an existing index?

Use the index.insert(document) method to add individual documents to an existing index, or index.refresh(documents) to update a list of documents (inserting new ones and updating changed ones by comparing document IDs). After inserting, call index.storage_context.persist() to save the updated index. Documents are identified by their doc_id -- LlamaIndex defaults to the file path, so renaming a file will treat it as a new document.

Conclusion

LlamaIndex makes it practical to build document Q&A systems without needing to understand the full stack of embedding models, vector databases, and prompt engineering. You learned how to load documents with SimpleDirectoryReader, build a VectorStoreIndex, query it with configurable retrieval depth, access source node citations, persist the index to disk for fast reloads, and assemble a practical policy Q&A command-line tool.

The next natural extensions are connecting to a real vector database for large-scale document collections, adding a web API wrapper with FastAPI, and experimenting with different chunking strategies and retrieval parameters to tune answer quality. For production systems, always display source citations alongside answers -- this makes the system auditable and helps users trust the output.

Official documentation: docs.llamaindex.ai.

How To Use Python coverage.py for Measuring Test Coverage

Intermediate

You write a test suite, run it, and all tests pass. Green checkmarks everywhere. But how do you know those tests actually exercise the code paths that matter? You might have 50 tests that all hit the happy path while every error handler, edge case, and fallback branch sits completely untested — waiting to blow up in production. Coverage.py answers the one question pytest alone cannot: which lines of your code were never executed during testing?

Coverage.py is a Python standard-bearer for test coverage measurement. It instruments your code at the bytecode level, tracking exactly which lines, branches, and conditions were executed while your tests ran. The results can be printed to the terminal, saved as an XML report for CI pipelines, or rendered as a browsable HTML report that color-codes every file green (covered) or red (missed). It integrates directly with pytest through the pytest-cov plugin, so there is nothing complex to wire up — just install and run.

In this article you will learn how to install coverage.py and pytest-cov, run coverage measurement alongside your existing test suite, interpret the terminal and HTML reports, configure a .coveragerc file to exclude irrelevant paths, enforce a minimum coverage threshold in CI, and apply branch coverage to catch untested conditional logic. By the end you will be able to add meaningful coverage measurement to any Python project in under five minutes.

Measuring Test Coverage: Quick Example

Before diving into configuration and advanced features, here is the simplest possible coverage run. Given a tiny module and its tests, this shows the full workflow from install to report in a handful of commands.

# mymath.py
def add(a, b):
    return a + b

def subtract(a, b):
    return a - b

def multiply(a, b):
    return a * b
# test_mymath.py
from mymath import add, subtract

def test_add():
    assert add(2, 3) == 5

def test_subtract():
    assert subtract(10, 4) == 6
# Run tests with coverage
$ pip install pytest pytest-cov
$ pytest --cov=mymath test_mymath.py
---------- coverage: platform linux, python 3.12 ----------
Name        Stmts   Miss  Cover
-------------------------------
mymath.py       6      2    67%
-------------------------------
TOTAL           6      2    67%

2 passed in 0.12s

The report immediately reveals that mymath.py has 67% coverage — 2 statements were never executed. Those two missed statements are the lines inside multiply(), which has no test. Coverage.py counted 6 executable statements, 2 were missed, and the math gives 67%. The sections below show how to find exactly which lines were missed, configure what gets measured, and enforce coverage thresholds.

What is coverage.py and Why Does It Matter?

Coverage.py is a tool that monitors your Python program while it runs and notes which parts of the code have been executed. It was created by Ned Batchelder and is the de facto standard for Python test coverage. The term “coverage” here means statement coverage: what percentage of your source lines were touched by at least one test.

There are two measurement modes:

ModeWhat It MeasuresBest For
Statement coverageWhich lines ranFinding untested functions/modules
Branch coverageWhich if/else paths ranFinding uncovered logic branches

Branch coverage is stricter. A line like return x if x > 0 else 0 counts as covered under statement coverage if the line ran at all — even if you never tested the negative case. With branch coverage enabled, coverage.py tracks both the true and false branches separately. For most projects, starting with statement coverage and adding branch coverage once your baseline is solid is the practical approach.

Installing coverage.py and pytest-cov

You have two install options depending on how you run your tests. If you use pytest (which most Python projects do), install pytest-cov. This is a pytest plugin that wraps coverage.py and integrates it directly into the pytest runner — no separate step needed.

# install_coverage.sh
pip install pytest-cov
Successfully installed coverage-7.5.0 pytest-cov-5.0.0

The pytest-cov package automatically installs coverage as a dependency. If you use a different test runner (like unittest or nose), install coverage directly and use the coverage run command instead. For most projects using pytest, pytest-cov is all you need.

Running Coverage with pytest

The --cov flag tells pytest-cov which source directory or module to measure. Pass the package or module name, not the test file — you want to measure your application code, not your tests measuring themselves.

# run_coverage.sh
# Measure coverage for a single module
pytest --cov=mymath tests/

# Measure coverage for an entire package
pytest --cov=myapp tests/

# Show which lines were missed in the terminal
pytest --cov=mymath --cov-report=term-missing tests/
---------- coverage: platform linux, python 3.12 ----------
Name        Stmts   Miss  Cover   Missing
-----------------------------------------
mymath.py       6      2    67%   12-13
-----------------------------------------
TOTAL           6      2    67%

2 passed in 0.14s

The --cov-report=term-missing flag adds the “Missing” column showing exactly which line numbers were not executed. In this example, lines 12-13 (the body of multiply()) were never run. Now you know exactly what to test next. The --cov-report flag accepts multiple values and can be specified more than once in the same command to generate several report formats simultaneously.

Generating an HTML Coverage Report

The terminal report is useful for a quick check, but the HTML report is far more powerful for investigating what was missed. It produces a browsable set of HTML pages where each file is color-coded: green for covered lines, red for missed lines, and yellow for partially-covered branches.

# generate_html_report.sh
pytest --cov=mymath --cov-report=html tests/
---------- coverage: platform linux, python 3.12 ----------

Coverage HTML written to dir htmlcov

2 passed in 0.15s

Open htmlcov/index.html in a browser. You will see a table listing every measured file with its coverage percentage. Click any file name to see the annotated source code. Covered lines have a green background, missed lines have a red background. This makes it easy to spot entire functions that were never called or error handlers that were skipped.

Configuring .coveragerc

As your project grows, you will want to exclude certain files from coverage measurement: test files themselves, migration scripts, generated code, or __main__ blocks that only run interactively. The .coveragerc file (placed in the project root) centralizes all coverage configuration so you do not need to repeat flags on every pytest invocation.

# .coveragerc
[run]
source = myapp
branch = True
omit =
    myapp/migrations/*
    myapp/settings.py
    */tests/*
    */__main__.py

[report]
fail_under = 80
show_missing = True
skip_covered = False

[html]
directory = htmlcov

The key settings: source sets the default package to measure (replaces the --cov flag), branch = True enables branch coverage, omit is a glob list of paths to exclude, and fail_under = 80 makes coverage.py exit with a non-zero status code if coverage drops below 80%. That last setting is what makes coverage gates work in CI — your CI pipeline checks the exit code and fails the build if coverage regresses.

Enabling Branch Coverage

Statement coverage can give you a false sense of security on conditional code. Consider a function that takes different paths depending on input — statement coverage says “that line ran” but does not check whether all branches were exercised. Branch coverage fixes this.

# discount.py
def calculate_discount(price, is_member):
    if price > 100:
        if is_member:
            return price * 0.80   # 20% off for members on big orders
        else:
            return price * 0.90   # 10% off for everyone on big orders
    return price                  # no discount for small orders
# test_discount.py
from discount import calculate_discount

def test_member_discount():
    assert calculate_discount(150, True) == 120.0

def test_large_order_nonmember():
    assert calculate_discount(150, False) == 135.0
# Run with branch coverage
pytest --cov=discount --cov-branch --cov-report=term-missing tests/
Name           Stmts   Miss Branch BrPart  Cover   Missing
----------------------------------------------------------
discount.py        6      1      4      1    80%   8->exit, 9

The output now shows Branch (4 branches), BrPart (1 partial branch), and flags line 9 and the 8->exit branch — the case where price <= 100 and the function returns early. No test calls calculate_discount with a price below 100. Without branch coverage, you would never see this gap because all lines in the file were executed by at least one test.

Using Coverage in CI Pipelines

Coverage checks are most valuable when they run automatically on every pull request. The fail_under setting in .coveragerc makes the process straightforward — no additional scripting needed. The pytest command exits with status code 2 when coverage falls below the threshold, which CI systems interpret as a failed step.

# .github/workflows/tests.yml
name: Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - run: pip install pytest pytest-cov
      - run: pytest --cov=myapp --cov-report=xml
      - uses: codecov/codecov-action@v4  # Optional: upload to Codecov
        with:
          files: coverage.xml
FAILED (coverage: 73%, minimum: 80%)
Error: Process completed with exit code 2.

The --cov-report=xml flag generates a coverage.xml file compatible with Codecov, SonarCloud, and most CI dashboards. This lets you track coverage trends over time and get PR comments showing whether a change increased or decreased coverage. Start with a threshold that matches your current baseline and raise it gradually — trying to jump from 40% to 80% overnight usually leads to low-value tests written purely to hit the number.

Real-Life Example: Coverage for a Shopping Cart Module

This example shows a realistic use of coverage.py against a small shopping cart implementation, including the HTML report workflow and fixing the gaps it finds.

# cart.py
class Cart:
    def __init__(self):
        self.items = {}

    def add(self, name, price, qty=1):
        if qty <= 0:
            raise ValueError("Quantity must be positive")
        if name in self.items:
            self.items[name]["qty"] += qty
        else:
            self.items[name] = {"price": price, "qty": qty}

    def remove(self, name):
        if name not in self.items:
            raise KeyError(f"{name!r} is not in the cart")
        del self.items[name]

    def total(self):
        return sum(v["price"] * v["qty"] for v in self.items.values())

    def discount_total(self, pct):
        if not 0 <= pct <= 100:
            raise ValueError("Discount must be 0-100")
        return self.total() * (1 - pct / 100)
# test_cart.py
import pytest
from cart import Cart

def test_add_and_total():
    c = Cart()
    c.add("apple", 0.50, 3)
    c.add("banana", 0.30, 2)
    assert c.total() == pytest.approx(2.10)

def test_add_existing_item():
    c = Cart()
    c.add("apple", 0.50, 2)
    c.add("apple", 0.50, 1)
    assert c.items["apple"]["qty"] == 3

def test_remove():
    c = Cart()
    c.add("apple", 0.50)
    c.remove("apple")
    assert "apple" not in c.items

def test_discount():
    c = Cart()
    c.add("shirt", 50.0)
    assert c.discount_total(10) == pytest.approx(45.0)
pytest --cov=cart --cov-report=term-missing --cov-branch tests/
Name      Stmts   Miss Branch BrPart  Cover   Missing
-----------------------------------------------------
cart.py      19      4      8      3    71%   9, 16, 22, 25

Coverage reveals four uncovered lines: the ValueError on invalid quantity (line 9), the KeyError on remove of nonexistent item (line 16), the invalid discount error (line 22), and one branch in discount_total. Adding tests for these edge cases raises coverage to 100% and -- more importantly -- actually verifies that the error handling works correctly. That is the real value: coverage.py does not just track lines, it points you toward the cases your test suite was ignoring.

Frequently Asked Questions

What is a good coverage percentage to aim for?

80% is the common industry target and a reasonable starting point for most projects. The right number depends on your project's risk tolerance: financial calculation libraries or authentication code warrant 90%+, while a utility script or prototype might be fine at 70%. More important than the percentage is whether your tests cover the critical paths and error conditions. 100% coverage does not mean 100% correctness -- it means every line ran, not that every edge case was verified.

How do I exclude files or lines from coverage measurement?

Use the omit setting in .coveragerc for whole files, and use the special comment # pragma: no cover to exclude individual lines or blocks. Common candidates for exclusion: if __name__ == "__main__": blocks, abstract methods that must be overridden, and defensive code that is genuinely impossible to trigger in tests. Use these sparingly -- over-exclusion makes your coverage number meaningless.

When should I use branch coverage instead of statement coverage?

Use branch coverage whenever your code has significant conditional logic: if/else chains, try/except blocks, or ternary expressions. Statement coverage will show 100% if every line ran at least once, but branch coverage will flag cases where you only tested one side of a conditional. Enable it from the start with branch = True in .coveragerc -- the stricter feedback catches real bugs earlier.

Does coverage.py slow down my tests?

Yes, by roughly 10-30% depending on project size. Coverage.py instruments your code by inserting trace hooks at the bytecode level, which adds overhead to every function call and line execution. For most projects this is completely acceptable. If you have a very large test suite where speed matters, run coverage only in CI (not on every local run) or use the --no-cov flag during rapid development cycles.

Does coverage work with parallel test execution?

Yes, but requires the pytest-xdist plugin and the --cov-append flag. When running tests in parallel with pytest -n auto, each worker writes a separate .coverage.* file. Run coverage combine after the parallel run to merge them, then coverage report to see the combined results. The .coveragerc entry parallel = True enables this automatically.

Why does coverage show lines as covered that I know aren't being tested?

This usually happens when a module is imported (not tested) and coverage.py counts the top-level module code (imports, class definitions, constants) as executed. It can also happen if shared fixtures or setup code in conftest.py inadvertently call your functions. Use --cov-report=annotate or the HTML report to inspect the exact execution context, and consider enabling branch coverage to distinguish "ran once in setup" from "thoroughly tested."

Conclusion

Coverage.py turns the abstract question "are my tests good enough?" into a concrete, measurable answer. You learned how to install pytest-cov and run coverage alongside your test suite, use --cov-report=term-missing to pinpoint uncovered lines in the terminal, generate an HTML report for detailed visual inspection, configure .coveragerc to set thresholds and omit irrelevant paths, and enable branch coverage to catch untested conditional logic.

The real habit to build is not chasing a coverage percentage but using the HTML report as a checklist after writing each new feature. Red lines are not failures -- they are a map of where your next tests should go. Extend the shopping cart example by adding tests for concurrent access, serialization, or currency rounding, and watch your coverage climb as your confidence in the code grows.

Full documentation is available at coverage.readthedocs.io and the pytest-cov plugin documentation at pytest-cov.readthedocs.io.

How To Use PyJWT for JSON Web Tokens in Python

How To Use PyJWT for JSON Web Tokens in Python

Intermediate

Session cookies work fine for traditional web apps, but modern REST APIs and microservices need a stateless authentication mechanism — one where the server does not store session state and any service in the cluster can verify a token independently. JSON Web Tokens (JWTs) are the standard answer. A JWT is a base64-encoded JSON payload with a cryptographic signature that any server can verify without hitting a database.

The PyJWT library is the most widely used Python implementation of the JWT standard (RFC 7519). It is used internally by AWS SDK, GitHub Actions, and dozens of popular frameworks. In this article you will learm to create signed tokens with HS256 and RS256, validate claims like expiry and audience, implement refresh token rotation, and wire JWTs into a FastAPI dependency.

PyJWT Quick Example: Issue and Verify in 10 Lines

# quick_jwt.py
import jwt, datetime

SECRET = "my-256-bit-secret"
payload = {
    "sub": "user_42",
    "exp": datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1)
}

token = jwt.encode(payload, SECRET, algorithm="HS256")
print(f"Token: {token[:50]}...")

decoded = jwt.decode(token, SECRET, algorithms=["HS256"])
print(f"Subject: {decoded['sub']}")
Output:
Token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIi...
Subject: user_42

Installing PyJWT

pip install PyJWT

# For RSA/EC algorithm support, install the cryptography extra:
pip install "PyJWT[cryptography]"

python -c "import jwt; print(jwt.__version__)"
Output:
2.8.0
AlgorithmKey TypeVerificationUse Case
HS256Shared secretSame secretSingle service, internal APIs
HS512Shared secretSame secretHigher security single service
RS256RSA private keyRSA public keyMicroservices, OAuth2, OIDC
ES256EC private keyEC public keyMobile, IoT (smaller tokens)
JWT HS256 wax seal verification
JWT: a self-contained passport that any server can verify.

Standard Claims and Expiry

JWTs have registered claim names defined by RFC 7519. PyJWT validates them automatically during decoding:

# jwt_claims.py
import jwt, datetime

SECRET = "production-secret-min-32-chars-long"

def create_access_token(user_id: str, roles: list) -> str:
    now = datetime.datetime.now(datetime.UTC)
    payload = {
        # Registered claims (RFC 7519)
        "iss": "https://api.myapp.com",     # Issuer
        "sub": user_id,                      # Subject (user identifier)
        "aud": "myapp-frontend",             # Audience
        "exp": now + datetime.timedelta(minutes=15),  # Expiry
        "iat": now,                          # Issued At
        "jti": f"{user_id}-{now.timestamp()}",  # JWT ID (unique per token)
        # Custom claims
        "roles": roles,
        "tier": "premium"
    }
    return jwt.encode(payload, SECRET, algorithm="HS256")

def verify_access_token(token: str) -> dict:
    return jwt.decode(
        token,
        SECRET,
        algorithms=["HS256"],
        audience="myapp-frontend",           # Must match aud claim
        options={"require": ["exp", "iat", "sub", "iss"]}
    )

token = create_access_token("user_42", ["read", "write"])
claims = verify_access_token(token)
print(f"User: {claims['sub']}, Roles: {claims['roles']}, Tier: {claims['tier']}")

# Test expiry
import time
expired_payload = {
    "sub": "user_1",
    "exp": datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=1)
}
expired_token = jwt.encode(expired_payload, SECRET, algorithm="HS256")
try:
    jwt.decode(expired_token, SECRET, algorithms=["HS256"])
except jwt.ExpiredSignatureError as e:
    print(f"Rejected: {e}")
Output:
User: user_42, Roles: ['read', 'write'], Tier: premium
Rejected: Signature has expired.

RS256: Asymmetric JWT for Microservices

In a microservice architecture, each service needs to verify tokens independently without sharing a secret. RS256 lets a central auth service sign with its private key, and all other services verify with the public key:

# rs256_jwt.py
import jwt
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import datetime

# Generate RSA key pair (in production: load from file/secret manager)
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()

# Auth service: sign with private key
payload = {
    "sub": "service_account_7",
    "scope": "billing:read orders:write",
    "exp": datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1),
    "iss": "https://auth.myapp.com"
}
token = jwt.encode(payload, private_key, algorithm="RS256")
print(f"RS256 token length: {len(token)} chars")

# Any microservice: verify with public key only (no private key needed)
decoded = jwt.decode(
    token,
    public_key,
    algorithms=["RS256"],
    options={"verify_aud": False}
)
print(f"Verified service: {decoded['sub']}, scope: {decoded['scope']}")

# Serialize public key to PEM for distribution
pem = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Load back from PEM and verify again
loaded_pubkey = serialization.load_pem_public_key(pem)
decoded2 = jwt.decode(token, loaded_pubkey, algorithms=["RS256"], options={"verify_aud": False})
print(f"PEM round-trip OK: {decoded2['sub']}")
Output:
RS256 token length: 472 chars
Verified service: service_account_7, scope: billing:read orders:write
PEM round-trip OK: service_account_7
RS256 private key forge distributing public keys
RS256: one private key signs, unlimited services verify.

Access and Refresh Token Pattern

Short-lived access tokens (15 minutes) with long-lived refresh tokens (7 days) are the standard pattern for secure stateless authentication:

# token_pair.py
import jwt, datetime, secrets

SECRET = "your-secret-key-min-32-chars"
REFRESH_SECRET = "different-refresh-secret-min-32-chars"

def issue_token_pair(user_id: str) -> dict:
    now = datetime.datetime.now(datetime.UTC)
    access_token = jwt.encode({
        "sub": user_id,
        "type": "access",
        "exp": now + datetime.timedelta(minutes=15),
        "iat": now
    }, SECRET, algorithm="HS256")

    refresh_token = jwt.encode({
        "sub": user_id,
        "type": "refresh",
        "jti": secrets.token_hex(16),  # Unique ID for revocation
        "exp": now + datetime.timedelta(days=7),
        "iat": now
    }, REFRESH_SECRET, algorithm="HS256")

    return {"access_token": access_token, "refresh_token": refresh_token}

def refresh_access_token(refresh_token: str) -> str:
    payload = jwt.decode(refresh_token, REFRESH_SECRET, algorithms=["HS256"])
    if payload.get("type") != "refresh":
        raise ValueError("Not a refresh token")
    # In production: check jti against a revocation list in Redis
    now = datetime.datetime.now(datetime.UTC)
    return jwt.encode({
        "sub": payload["sub"],
        "type": "access",
        "exp": now + datetime.timedelta(minutes=15),
        "iat": now
    }, SECRET, algorithm="HS256")

tokens = issue_token_pair("user_42")
print(f"Access token (first 40 chars): {tokens['access_token'][:40]}...")

new_access = refresh_access_token(tokens["refresh_token"])
decoded = jwt.decode(new_access, SECRET, algorithms=["HS256"])
print(f"Refreshed token for: {decoded['sub']}, expires in 15 min")
Output:
Access token (first 40 chars): eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIi...
Refreshed token for: user_42, expires in 15 min

Frequently Asked Questions

How long should my HS256 secret be?

At least 32 characters (256 bits) for HS256, and 64 characters for HS512. Use secrets.token_hex(32) to generate a cryptographically random 64-character hex secret. Short or dictionary-word secrets are vulnerable to offline brute-force attacks — an attacker who captures a token can try millions of secrets per second without hitting your server.

Where should I store JWTs on the client side?

For browser apps, httpOnly cookies are more secure than localStorage because JavaScript cannot read httpOnly cookies, preventing XSS attacks from stealing tokens. For mobile apps and server-to-server calls, storing in memory or secure device storage is fine. Never store tokens in localStorage if the site runs any third-party JavaScript. For refresh tokens specifically, httpOnly + Secure + SameSite=Strict cookies are the recommended pattern.

How do I revoke a JWT before it expires?

JWTs are stateless by design — the server does not store them, so you cannot “delete” one. The standard approaches are: keep access tokens very short-lived (15 minutes or less) so revocation is rarely needed; maintain a Redis blocklist of revoked jti values checked on each request; or use opaque tokens for security-critical resources and only use JWTs for low-risk claims. The blocklist approach reintroduces some statefulness but is still much lighter than full session storage.

What is the “none” algorithm attack?

Early JWT libraries accepted tokens with "alg": "none" — meaning no signature — allowing attackers to forge any payload. PyJWT is safe by default: you must explicitly list allowed algorithms in jwt.decode(), and none is never allowed unless you pass algorithms=["none"] explicitly. Always pass the algorithms parameter and never include "none" in it. Similarly, never pass the encoded token’s header algorithm as the allowed algorithm — always hardcode the expected algorithm on the server.

How do I use PyJWT with FastAPI?

Create a dependency that reads the Authorization: Bearer <token> header, calls jwt.decode(), and either returns the claims dict or raises HTTPException(401) on failure. Use Depends(verify_token) in any route that requires authentication. FastAPI’s dependency injection system handles the rest — the dependency runs before the route handler, and any exception it raises short-circuits the request.

Conclusion

PyJWT makes stateless authentication straightforward in Python. For single-service APIs, HS256 with a 32+ character secret and 15-minute expiry is simple and secure. For microservices where multiple independent services need to verify tokens, RS256 lets you distribute the public key freely while keeping the signing key private. In both cases, always specify algorithms explicitly in jwt.decode(), always validate exp and iss, and use short-lived access tokens with refresh token rotation for production systems. The jti claim combined with a Redis blocklist gives you revocation capability when you need it without abandoning the stateless model entirely.

How To Use Python cryptography for Encryption and Decryption

How To Use Python cryptography for Encryption and Decryption

Intermediate

Storing passwords in plaintext, sending API keys over unencrypted channels, or using home-grown XOR “encryption” are among the most common causes of data breaches. The Python cryptography library exists precisely to make doing the right thing easy. Built on top of OpenSSL and maintained by the Python Cryptographic Authority (PyCA), it provides both a high-level “recipes” layer for common tasks and a low-level “hazmat” layer for specialists who need direct cipher access.

This article focuses on the recipes layer, which handles all the hard parts — key generation, IV selection, authentication tags — so you can encrypt and decrypt data correctly without a PhD in applied cryptography. We will cover Fernet symmetric encryption for simplicity, AES-GCM for performance-critical authenticated encryption, and RSA asymmetric encryption for key exchange and digital signatures.

Cryptography Quick Example: Fernet in 10 Lines

# quick_fernet.py
from cryptography.fernet import Fernet

key = Fernet.generate_key()          # 32-byte URL-safe base64 key
f = Fernet(key)

plaintext = b"Secret API credentials: sk-abc123"
token = f.encrypt(plaintext)
print(f"Encrypted: {token[:40]}...")

decrypted = f.decrypt(token)
print(f"Decrypted: {decrypted.decode()}")
Output:
Encrypted: gAAAAABl8x2...
Decrypted: Secret API credentials: sk-abc123

Fernet uses AES-128-CBC + HMAC-SHA256 under the hood and always produces authenticated ciphertext — if anyone tampers with the token, decryption raises InvalidToken rather than silently returning garbage.

Installing the cryptography Library

pip install cryptography

python -c "import cryptography; print(cryptography.__version__)"
Output:
42.0.5
LayerAPIUse When
Recipes (high-level)Fernet, MultiFernetDefault — handles IV, padding, authentication automatically
Hazmat (low-level)cryptography.hazmat.*Custom protocols, non-standard cipher modes, HSM integration
Fernet recipes layer mixing AES HMAC IV
The recipes layer: all the hard cryptographic choices made for you.

Fernet: Symmetric Encryption Made Simple

Fernet is the right choice when both the encryptor and decryptor share a secret key — encrypting files at rest, protecting database fields, or securing configuration secrets.

# fernet_demo.py
from cryptography.fernet import Fernet, MultiFernet
import os, base64

# --- Key generation and storage ---
key = Fernet.generate_key()
print(f"Key (store this securely): {key.decode()}")

# Save key to file (in production: use a secret manager)
with open('/tmp/fernet.key', 'wb') as kf:
    kf.write(key)

# Load key from file
with open('/tmp/fernet.key', 'rb') as kf:
    loaded_key = kf.read()

f = Fernet(loaded_key)

# --- Encrypt and decrypt ---
data = b"Database password: s3cret_p@ssw0rd"
token = f.encrypt(data)
print(f"Token length: {len(token)} bytes")

recovered = f.decrypt(token)
assert recovered == data
print(f"Round-trip OK: {recovered.decode()}")

# --- TTL: token expires after N seconds ---
import time
short_lived = f.encrypt_at_time(b"Temporary token", int(time.time()))
try:
    f.decrypt_at_time(short_lived, ttl=1, current_time=int(time.time()) + 5)
except Exception as e:
    print(f"Expired token: {type(e).__name__}")

# --- Key rotation with MultiFernet ---
old_key = Fernet.generate_key()
new_key = Fernet.generate_key()
mf = MultiFernet([Fernet(new_key), Fernet(old_key)])

old_token = Fernet(old_key).encrypt(b"Encrypted with old key")
rotated = mf.rotate(old_token)     # re-encrypts with new_key
print(f"Rotation OK: {mf.decrypt(rotated)}")
Output:
Key (store this securely): bXluWHh4...
Token length: 120 bytes
Round-trip OK: Database password: s3cret_p@ssw0rd
Expired token: InvalidSignature
Rotation OK: b'Encrypted with old key'
MultiFernet key rotation zero downtime
MultiFernet.rotate(): decrypt with old key, re-encrypt with new key — zero downtime key rotation.

AES-GCM: Authenticated Encryption for Performance

When you need more control — streaming large files, custom nonce sizes, or AEAD with associated data — use AES-256-GCM from the hazmat layer directly:

# aes_gcm_demo.py
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

# Generate a 256-bit key
key = AESGCM.generate_key(bit_length=256)
aesgcm = AESGCM(key)

# Nonce MUST be unique per encryption — never reuse with the same key
nonce = os.urandom(12)   # 96-bit nonce is standard for GCM

plaintext = b"Sensitive medical record #12345"
aad = b"patient_id:12345"  # Additional Authenticated Data (not encrypted, but authenticated)

ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
print(f"Ciphertext+tag length: {len(ciphertext)} bytes (plaintext was {len(plaintext)})")

# Decrypt — must provide same nonce and aad
recovered = aesgcm.decrypt(nonce, ciphertext, aad)
print(f"Decrypted: {recovered.decode()}")

# Tamper test: modifying ciphertext raises InvalidTag
import copy
tampered = bytearray(ciphertext)
tampered[0] ^= 0xFF
try:
    aesgcm.decrypt(nonce, bytes(tampered), aad)
except Exception as e:
    print(f"Tamper detected: {type(e).__name__}")
Output:
Ciphertext+tag length: 47 bytes (plaintext was 31)
Decrypted: Sensitive medical record #12345
Tamper detected: InvalidTag

The 16-byte difference between plaintext and ciphertext length is the GCM authentication tag. AES-GCM is significantly faster than AES-CBC because it can be parallelised and is hardware-accelerated on any modern CPU with AES-NI instructions.

RSA Asymmetric Encryption and Signatures

RSA solves the key distribution problem: the public key can be shared freely, and only the private key holder can decrypt or sign:

# rsa_demo.py
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization

# --- Key generation ---
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048
)
public_key = private_key.public_key()

# --- Serialize keys to PEM ---
pem_private = private_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.BestAvailableEncryption(b"passphrase")
)
pem_public = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# --- Encrypt with public key, decrypt with private key ---
message = b"Symmetric key: " + b"\x8f\xa2" * 16  # e.g. an AES key
ciphertext = public_key.encrypt(
    message,
    padding.OAEP(
        mgf=padding.MGF1(algorithm=hashes.SHA256()),
        algorithm=hashes.SHA256(),
        label=None
    )
)
print(f"RSA ciphertext length: {len(ciphertext)} bytes")

decrypted = private_key.decrypt(
    ciphertext,
    padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),
                 algorithm=hashes.SHA256(), label=None)
)
assert decrypted == message
print("RSA encrypt/decrypt: OK")

# --- Sign with private key, verify with public key ---
document = b"Invoice #1234: $5000 due 2026-06-01"
signature = private_key.sign(
    document,
    padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH),
    hashes.SHA256()
)
print(f"Signature length: {len(signature)} bytes")

public_key.verify(
    signature, document,
    padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH),
    hashes.SHA256()
)
print("Signature verified: OK")
Output:
RSA ciphertext length: 256 bytes
RSA encrypt/decrypt: OK
Signature length: 256 bytes
Signature verified: OK
RSA asymmetric encryption public private key
RSA: share the lock, keep the key.

Frequently Asked Questions

When should I use Fernet vs AES-GCM directly?

Use Fernet for most application-level encryption — it handles nonce generation, key derivation format, and authentication automatically, and its base64 output is easy to store in databases or environment variables. Use AES-GCM directly when you need to stream large files without loading them entirely into memory, when you need to integrate with a specific wire protocol that expects raw bytes, or when you need custom AEAD associated data patterns. Both use authenticated encryption and are equally secure in terms of the underlying algorithms.

Should I use cryptography for password hashing?

No — use bcrypt, argon2-cffi, or passlib for passwords. The cryptography library is designed for encrypting data, not for password storage. Password hashing requires deliberately slow key derivation functions that resist brute-force attacks. Encrypting a password with Fernet is wrong because if the key leaks, all passwords are immediately exposed — a proper hash is irreversible. The library does include PBKDF2HMAC and scrypt for key derivation from passwords (e.g., deriving a Fernet key from a passphrase), which is different from storing hashed passwords.

Where should I store encryption keys?

Never hardcode keys in source code or commit them to version control. In development, use environment variables or .env files excluded from git. In production, use a dedicated secret manager: AWS Secrets Manager, HashiCorp Vault, Google Cloud Secret Manager, or Azure Key Vault. For the highest security tier, use a Hardware Security Module (HSM) or cloud KMS where the key never leaves the hardware boundary. The cryptography library integrates with PKCS#11 HSMs via the hazmat layer.

How do I encrypt large files with RSA?

RSA cannot encrypt data larger than its key size minus padding overhead (about 190 bytes for a 2048-bit key with OAEP). The standard approach is hybrid encryption: generate a random AES-256 key, encrypt the file with AES-GCM, then encrypt the AES key with RSA. The recipient decrypts the RSA-wrapped AES key with their private key, then uses that AES key to decrypt the file. This combines RSA’s key distribution advantages with AES-GCM’s performance — the same pattern used by TLS, PGP, and SSH.

How do I rotate encryption keys without downtime?

For Fernet, use MultiFernet: create a new key, put it first in the list with the old key second, then call mf.rotate(old_token) on all existing ciphertext. New encryptions use the first key automatically. Once all tokens are rotated, remove the old key from the list. For AES-GCM, the same principle applies: store a key version with each ciphertext, maintain a mapping of version to key, and migrate data lazily on read or in a background job.

Conclusion

The cryptography library makes correct encryption practical in Python. For most use cases, start with Fernet — it chooses secure defaults, prevents nonce reuse, and produces tokens that are easy to store. When you need streaming, AEAD with associated data, or hardware acceleration, use AES-256-GCM from the hazmat layer. For key exchange and digital signatures, use RSA with OAEP padding for encryption and PSS padding for signatures — never use the older PKCS1v15 padding for new code.

The three non-negotiable rules are: always use authenticated encryption (Fernet and AES-GCM both satisfy this), never reuse a nonce with the same key, and store keys outside your application code in a secret manager or environment variable. With these rules in place, the cryptography library handles the rest.

How To Build a Python gRPC Service with grpcio

How To Build a Python gRPC Service with grpcio

Intermediate

REST APIs built on HTTP and JSON are everywhere, but they come with trade-offs: verbose payloads, loose contracts, and no built-in streaming. When services need to talk to each other at high throughput — hundreds of thousands of calls per second with tight latency budgets — gRPC is the industry answer. Used by Google, Netflix, and Cloudflare, gRPC combines Protocol Buffers for compact binary serialization with HTTP/2 for multiplexed connections and native bidirectional streaming.

The grpcio Python package brings the full gRPC runtime to Python. You define services in a .proto schema file, run a code generator, and get strongly-typed client and server stubs automatically. The contract lives in the schema, not in documentation that can drift out of sync. If you change the proto file, the generated code changes with it — no more “the client sends a string but the server expects an int” bugs.

This article covers the complete gRPC workflow in Python: writing a .proto service definition, generating Python stubs, building a server, writing a client, and implementing all four RPC types — unary, server-streaming, client-streaming, and bidirectional streaming.

gRPC Quick Example: Hello Service in 30 Lines

The fastest way to see gRPC in action is a minimal hello service. You need three files: the schema, the server, and the client:

# hello.proto
syntax = "proto3";
package hello;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest { string name = 1; }
message HelloReply   { string message = 1; }
# Generate Python stubs (run once after changing the .proto)
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello.proto
# Creates: hello_pb2.py  hello_pb2_grpc.py
# server.py
import grpc
from concurrent import futures
import hello_pb2
import hello_pb2_grpc

class GreeterServicer(hello_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return hello_pb2.HelloReply(message=f"Hello, {request.name}!")

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on :50051")
server.wait_for_termination()
# client.py
import grpc
import hello_pb2
import hello_pb2_grpc

with grpc.insecure_channel('localhost:50051') as channel:
    stub = hello_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(hello_pb2.HelloRequest(name="World"))
    print(f"Server replied: {response.message}")
Output:
Server replied: Hello, World!

Three files, zero JSON parsing, zero URL routing. The schema enforces the contract at compile time rather than at runtime.

Installing grpcio

Install both the runtime and the code generation tools:

pip install grpcio grpcio-tools

# Verify
python -c "import grpc; print(grpc.__version__)"
Output:
1.62.1

grpcio is the runtime — servers, channels, and interceptors. grpcio-tools bundles the protoc compiler with the Python gRPC plugin so you can generate stubs without installing a separate C++ toolchain.

PackagePurposeRequired At
grpcioClient/server runtimeRuntime
grpcio-toolsProto compiler + code genDev/build time
protobufProtocol Buffer serializationRuntime (auto-installed)
grpcio-reflectionServer reflection for grpcurl/PostmanOptional/dev
grpcio-health-checkingStandard health check serviceOptional/prod
gRPC .proto file generating Python code
grpc_tools.protoc: your schema becomes your code.

Writing a Protocol Buffers Service Definition

The .proto file is the source of truth for your service. It defines the service methods and the request/response message types. Here is a more realistic example — a currency conversion service:

# finance.proto
syntax = "proto3";
package finance;

service CurrencyConverter {
  // Unary: one request, one response
  rpc Convert (ConvertRequest) returns (ConvertReply);

  // Server streaming: one request, stream of rate updates
  rpc WatchRates (WatchRequest) returns (stream RateUpdate);

  // Client streaming: stream of amounts, one summary response
  rpc BatchConvert (stream ConvertRequest) returns (BatchReply);

  // Bidirectional streaming: real-time conversion feed
  rpc LiveConvert (stream ConvertRequest) returns (stream ConvertReply);
}

message ConvertRequest {
  string from_currency = 1;   // e.g. "USD"
  string to_currency   = 2;   // e.g. "EUR"
  double amount        = 3;
}

message ConvertReply {
  double converted_amount = 1;
  double rate             = 2;
  string timestamp        = 3;
}

message WatchRequest {
  string base_currency = 1;
  repeated string target_currencies = 2;
}

message RateUpdate {
  string currency = 1;
  double rate     = 2;
}

message BatchReply {
  double total_converted = 1;
  int32  count           = 2;
}
# Generate stubs
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. finance.proto
# Creates: finance_pb2.py  finance_pb2_grpc.py

Field numbers (the = 1, = 2 after field names) are used in the binary encoding — they must be unique within a message and should never be reused once deployed. Adding new fields with new numbers is backward-compatible; removing or renumbering fields breaks existing clients.

The Four gRPC RPC Types

gRPC defines four communication patterns. Here is a complete server implementing all four for the finance service:

# finance_server.py
import grpc
import time
import random
from concurrent import futures
import finance_pb2
import finance_pb2_grpc

# Simulated exchange rates
RATES = {
    ('USD', 'EUR'): 0.92, ('USD', 'GBP'): 0.79, ('USD', 'JPY'): 149.5,
    ('EUR', 'USD'): 1.09, ('GBP', 'USD'): 1.27, ('JPY', 'USD'): 0.0067,
}

def get_rate(frm, to):
    if frm == to:
        return 1.0
    return RATES.get((frm, to), 1.0) * (1 + random.uniform(-0.002, 0.002))

class CurrencyConverterServicer(finance_pb2_grpc.CurrencyConverterServicer):

    # --- Unary RPC ---
    def Convert(self, request, context):
        rate = get_rate(request.from_currency, request.to_currency)
        return finance_pb2.ConvertReply(
            converted_amount=request.amount * rate,
            rate=rate,
            timestamp=str(time.time())
        )

    # --- Server Streaming RPC ---
    def WatchRates(self, request, context):
        """Stream rate updates every second until client cancels."""
        while not context.is_active() is False:
            for target in request.target_currencies:
                rate = get_rate(request.base_currency, target)
                yield finance_pb2.RateUpdate(currency=target, rate=rate)
            time.sleep(1.0)
            if not context.is_active():
                break

    # --- Client Streaming RPC ---
    def BatchConvert(self, request_iterator, context):
        total = 0.0
        count = 0
        for req in request_iterator:
            rate = get_rate(req.from_currency, req.to_currency)
            total += req.amount * rate
            count += 1
        return finance_pb2.BatchReply(total_converted=total, count=count)

    # --- Bidirectional Streaming RPC ---
    def LiveConvert(self, request_iterator, context):
        for req in request_iterator:
            rate = get_rate(req.from_currency, req.to_currency)
            yield finance_pb2.ConvertReply(
                converted_amount=req.amount * rate,
                rate=rate,
                timestamp=str(time.time())
            )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    finance_pb2_grpc.add_CurrencyConverterServicer_to_server(
        CurrencyConverterServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Finance gRPC server running on :50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
gRPC four RPC types switchboard
gRPC four RPC types — pick the pattern that fits your data flow.

Writing the gRPC Client

The generated stub class handles all the serialization. Each of the four RPC types gets a corresponding client pattern:

# finance_client.py
import grpc
import finance_pb2
import finance_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = finance_pb2_grpc.CurrencyConverterStub(channel)

        # 1. Unary call
        reply = stub.Convert(finance_pb2.ConvertRequest(
            from_currency='USD', to_currency='EUR', amount=100.0
        ))
        print(f"Unary: $100 USD = €{reply.converted_amount:.2f} EUR (rate: {reply.rate:.4f})")

        # 2. Server streaming: receive 5 rate updates then cancel
        watch_req = finance_pb2.WatchRequest(
            base_currency='USD',
            target_currencies=['EUR', 'GBP', 'JPY']
        )
        update_count = 0
        for update in stub.WatchRates(watch_req):
            print(f"Rate update: USD/{update.currency} = {update.rate:.4f}")
            update_count += 1
            if update_count >= 6:
                break

        # 3. Client streaming: send a batch of conversions
        def batch_requests():
            pairs = [('USD','EUR',100), ('USD','GBP',250), ('EUR','USD',80)]
            for frm, to, amt in pairs:
                yield finance_pb2.ConvertRequest(
                    from_currency=frm, to_currency=to, amount=amt
                )
        batch_reply = stub.BatchConvert(batch_requests())
        print(f"Batch: {batch_reply.count} conversions, total = {batch_reply.total_converted:.2f}")

        # 4. Bidirectional streaming
        def live_requests():
            for amount in [50, 100, 200]:
                yield finance_pb2.ConvertRequest(
                    from_currency='USD', to_currency='JPY', amount=amount
                )
        for reply in stub.LiveConvert(live_requests()):
            print(f"Live: ${reply.converted_amount:.2f} JPY at rate {reply.rate:.2f}")

if __name__ == '__main__':
    run()
Output:
Unary: $100 USD = €92.14 EUR (rate: 0.9214)
Rate update: USD/EUR = 0.9198
Rate update: USD/GBP = 0.7893
Rate update: USD/JPY = 149.82
Rate update: USD/EUR = 0.9207
Rate update: USD/GBP = 0.7901
Rate update: USD/JPY = 149.71
Batch: 3 conversions, total = 412.37
Live: $7479.50 JPY at rate 149.59
Live: $14955.00 JPY at rate 149.55
Live: $29942.00 JPY at rate 149.71

Error Handling and Status Codes

gRPC has a rich set of status codes that map to HTTP/2 status semantics. Use context.abort() on the server to return a typed error, and catch grpc.RpcError on the client:

# Error handling patterns
import grpc
import finance_pb2
import finance_pb2_grpc

# Server side: abort with a status code
def Convert(self, request, context):
    supported = {'USD', 'EUR', 'GBP', 'JPY'}
    if request.from_currency not in supported:
        context.abort(
            grpc.StatusCode.INVALID_ARGUMENT,
            f"Unsupported currency: {request.from_currency}"
        )
    if request.amount <= 0:
        context.abort(
            grpc.StatusCode.OUT_OF_RANGE,
            f"Amount must be positive, got: {request.amount}"
        )
    rate = get_rate(request.from_currency, request.to_currency)
    return finance_pb2.ConvertReply(converted_amount=request.amount * rate, rate=rate)

# Client side: catch RpcError
with grpc.insecure_channel('localhost:50051') as channel:
    stub = finance_pb2_grpc.CurrencyConverterStub(channel)
    try:
        reply = stub.Convert(finance_pb2.ConvertRequest(
            from_currency='BTC', to_currency='EUR', amount=1.0
        ))
    except grpc.RpcError as e:
        print(f"gRPC error: {e.code()} — {e.details()}")
        # Output: gRPC error: StatusCode.INVALID_ARGUMENT — Unsupported currency: BTC
gRPC status codes checklist
grpc.StatusCode: typed errors that cross language boundaries without ambiguity.

Frequently Asked Questions

When should I use gRPC instead of REST?

Use gRPC when you control both client and server (internal microservices), when you need high-throughput low-latency calls, when you want a strict schema that fails fast on contract violations, or when you need streaming. REST is better for public APIs consumed by browsers and third parties — the JSON format is universally readable and HTTP/1.1 works everywhere. Many systems use REST at the edge and gRPC internally.

How do I evolve a proto schema without breaking clients?

Add new fields with new field numbers — older clients simply ignore them. Never remove or renumber existing fields once deployed. Never change the type of an existing field. Rename fields freely (the wire format uses field numbers, not names). If you need to remove a field, mark it as reserved so future developers cannot accidentally reuse the number. Use semantic versioning in the package name (e.g., package finance.v2) for breaking changes.

Can I use gRPC with Python async/await?

Yes — grpcio ships grpc.aio, an asyncio-native implementation. Replace grpc.server() with grpc.aio.server(), replace grpc.insecure_channel() with grpc.aio.insecure_channel(), and add async/await to your servicer methods. The grpc.aio module is production-ready as of grpcio 1.32 and is the recommended approach for new services using asyncio frameworks like FastAPI or Starlette.

How do I add TLS to a gRPC server?

Replace add_insecure_port with add_secure_port and pass a grpc.ssl_server_credentials() object constructed from your certificate and key files. On the client side, replace grpc.insecure_channel() with grpc.secure_channel('host:443', grpc.ssl_channel_credentials()). For mutual TLS (mTLS), pass both root certificates and client cert/key to grpc.ssl_channel_credentials(). In Kubernetes environments, service meshes like Istio handle mTLS transparently without application code changes.

How do I test a gRPC service without writing a client?

Install grpcio-reflection and enable server reflection, then use grpcurl (a command-line tool like curl for gRPC) or Postman (which supports gRPC natively). Alternatively, use grpc_tools.protoc to generate a Python test client, or write unit tests using the grpc.testing utilities that let you call servicer methods directly without a running server.

Conclusion

gRPC turns service-to-service communication into a typed function call. The .proto schema defines the contract, grpc_tools.protoc generates the boilerplate, and grpcio handles the HTTP/2 transport, multiplexing, and serialization. In this article we covered the four RPC types — unary, server streaming, client streaming, and bidirectional streaming — along with typed error handling using grpc.StatusCode. The key rules are: never change field numbers in deployed schemas, always use context.abort() for typed server errors, and catch grpc.RpcError on the client side.

For production use, add TLS via grpc.ssl_server_credentials(), enable server reflection for debugging, add a health check service via grpcio-health-checking, and consider the grpc.aio asyncio API if your service is built around async Python frameworks. When you need to expose a gRPC backend to browsers or mobile clients, grpc-gateway or Envoy proxy can translate REST/JSON calls to gRPC automatically at the edge.

How To Use Python numba for JIT-Compiled Performance

How To Use Python numba for JIT-Compiled Performance

Intermediate

Python is famously readable, but famously slow for tight numerical loops. If you have ever benchmarked a pure Python loop against the equivalent C or Fortran code, you know the gap is often 100x or worse. For most web code and business logic that does not matter, but in scientific computing, signal processing, or machine learning pipelines it matters enormously. That is where numba enters — a JIT compiler that translates Python and NumPy code into fast machine code at runtime with almost no changes to your existing code.

Numba is built on LLVM, the same compiler infrastructure that powers Clang and Rust. When you decorate a function with @jit, numba watches the first call, infers the types of the arguments, compiles the function body to native machine code, and caches that compiled version for all subsequent calls. The result is code that often matches or exceeds the speed of hand-written C, while your source file still looks like Python.

This article covers the complete numba workflow: installing the library, understanding the @jit and @njit decorators, using @vectorize and @guvectorize for NumPy ufuncs, enabling parallel execution with parallel=True, and knowing when numba helps versus hurts. By the end you will have a fully benchmarked Monte Carlo simulation that runs as fast as NumPy without writing a single line of C.

Numba Quick Example: 100x Faster Loop

The fastest way to understand numba is to see how little you need to change. Here is a pure Python loop computing the sum of squares, and its numba-compiled equivalent:

# quick_numba.py
import numba
import numpy as np
import time

# Pure Python version
def sum_squares_python(n):
    total = 0.0
    for i in range(n):
        total += i * i
    return total

# Numba JIT version -- only the decorator changes
@numba.jit(nopython=True)
def sum_squares_numba(n):
    total = 0.0
    for i in range(n):
        total += i * i
    return total

N = 10_000_000

# Warm up numba (first call triggers compilation)
sum_squares_numba(1)

t0 = time.perf_counter()
result_py = sum_squares_python(N)
t1 = time.perf_counter()
result_nb = sum_squares_numba(N)
t2 = time.perf_counter()

print(f"Python:  {t1-t0:.3f}s  result={result_py:.0f}")
print(f"Numba:   {t2-t1:.4f}s  result={result_nb:.0f}")
print(f"Speedup: {(t1-t0)/(t2-t1):.0f}x")

Output:

Python:  2.841s  result=333333328333335000000
Numba:   0.021s  result=333333328333335000000
Speedup: 135x

The only change to the function was adding @numba.jit(nopython=True). Numba compiled the loop body to machine code that runs entirely without the Python interpreter — the same values, same logic, 135x faster. The first call is slower because it triggers compilation; every subsequent call runs at full speed.

Installing Numba

Install numba via pip or conda. The conda route is recommended in scientific environments because it handles the LLVM dependency automatically:

# pip
pip install numba numpy

# conda (preferred for scientific stacks)
conda install numba

# Verify installation
python -c "import numba; print(numba.__version__)"

Output:

0.59.1

Numba requires a C compiler on some platforms. On Windows, the Visual C++ Build Tools must be installed. On macOS and Linux, the system compiler (clang or gcc) is sufficient. If you see an LLVM-related error, install numba via conda which bundles the needed LLVM runtime.

DecoratorUse CaseNumPy SupportPython Objects
@jitGeneral loops, fallback modeYesLimited (object mode)
@njit / @jit(nopython=True)Loops, math, no Python objectsYesNo
@vectorizeScalar-to-array ufuncsYesNo
@guvectorizeArray-to-array ufuncsYesNo
@stencilSliding window patternsYesNo
@cuda.jitGPU kernelsYesNo
numba JIT compiler transforming Python to machine code
@jit — because life is too short to wait for Python loops.

The @jit and @njit Decorators

The @jit decorator is numba’s primary entry point. In its default mode it tries to compile to native code but silently falls back to “object mode” (interpreted Python) if it encounters unsupported constructs. The @njit shorthand (equivalent to @jit(nopython=True)) disables the fallback and raises an error instead — this is almost always what you want because silent fallback means you are running slow code and thinking it is fast.

# jit_modes.py
import numba
import numpy as np

# @njit: strict, raises an error if something cannot be compiled
@numba.njit
def dot_product(a, b):
    result = 0.0
    for i in range(len(a)):
        result += a[i] * b[i]
    return result

# Caching: saves the compiled binary to disk so the first call of the
# NEXT run is also fast (avoids recompiling on every script restart)
@numba.njit(cache=True)
def euclidean_distance(a, b):
    total = 0.0
    for i in range(len(a)):
        diff = a[i] - b[i]
        total += diff * diff
    return total ** 0.5

# Eager compilation: specify types upfront so the first call is instant
from numba import float64
@numba.njit(float64(float64[:], float64[:]))
def weighted_sum(values, weights):
    total = 0.0
    for i in range(len(values)):
        total += values[i] * weights[i]
    return total

a = np.array([1.0, 2.0, 3.0])
b = np.array([4.0, 5.0, 6.0])
w = np.array([0.2, 0.5, 0.3])

print(f"Dot product:        {dot_product(a, b):.1f}")
print(f"Euclidean distance: {euclidean_distance(a, b):.4f}")
print(f"Weighted sum:       {weighted_sum(a, w):.2f}")

Output:

Dot product:        32.0
Euclidean distance: 5.1962
Weighted sum:       1.9

Use cache=True in production scripts that run repeatedly — numba saves the compiled binary to __pycache__ and reloads it on subsequent runs, eliminating the compilation delay. Use eager compilation (passing a type signature) when you know the input types in advance and want guaranteed zero overhead on the first call.

numba type inference and nopython mode
nopython=True: if numba cannot compile it, it tells you instead of quietly running slow Python.

@vectorize: Creating NumPy Ufuncs

NumPy’s built-in operations like np.sin and np.exp are already fast because they are implemented in C as “ufuncs” that broadcast over arrays without Python overhead. The @numba.vectorize decorator lets you create your own ufuncs from pure Python scalar logic:

# vectorize_demo.py
import numba
import numpy as np
import time

# Define types the ufunc should support
@numba.vectorize(['float64(float64)', 'float32(float32)'])
def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

@numba.vectorize(['float64(float64, float64)'])
def clipped_relu(x, threshold):
    if x < 0.0:
        return 0.0
    elif x > threshold:
        return threshold
    return x

arr = np.linspace(-5, 5, 10_000_000, dtype=np.float64)
thresholds = np.full(10_000_000, 3.0)

# Pure Python equivalent for comparison
def sigmoid_python(x):
    return 1.0 / (1.0 + np.exp(-x))  # already vectorized via numpy

t0 = time.perf_counter()
out_np = sigmoid_python(arr)
t1 = time.perf_counter()
out_nb = sigmoid(arr)
t2 = time.perf_counter()

print(f"NumPy sigmoid:  {t1-t0:.4f}s")
print(f"Numba sigmoid:  {t2-t1:.4f}s")
print(f"Max difference: {np.max(np.abs(out_np - out_nb)):.2e}")
print(f"Clipped relu sample: {clipped_relu(arr[:5], thresholds[:5])}")

Output:

NumPy sigmoid:  0.0621s
Numba sigmoid:  0.0184s
Max difference: 0.00e+00
Clipped relu sample: [0.         0.00625025 0.01250049 0.01875074 0.02500099]

@vectorize is most useful when your scalar logic does not map cleanly to a single NumPy expression — branching logic like clipped_relu would require multiple np.where calls in NumPy but is natural in numba. The resulting function behaves exactly like a NumPy ufunc: it supports broadcasting, works on arrays of any shape, and returns an array of the same shape as the input.

Parallel Execution with parallel=True

Numba can automatically parallelize loops over arrays using all available CPU cores by adding parallel=True and replacing Python’s range with numba.prange:

# parallel_numba.py
import numba
from numba import prange
import numpy as np
import time

@numba.njit(parallel=True)
def parallel_pairwise_distance(X):
    """Compute n x n pairwise Euclidean distance matrix."""
    n = X.shape[0]
    d = X.shape[1]
    result = np.zeros((n, n), dtype=np.float64)
    for i in prange(n):          # prange parallelizes this loop
        for j in range(i, n):
            dist = 0.0
            for k in range(d):
                diff = X[i, k] - X[j, k]
                dist += diff * diff
            dist = dist ** 0.5
            result[i, j] = dist
            result[j, i] = dist
    return result

@numba.njit
def serial_pairwise_distance(X):
    """Same but single-threaded."""
    n = X.shape[0]
    d = X.shape[1]
    result = np.zeros((n, n), dtype=np.float64)
    for i in range(n):
        for j in range(i, n):
            dist = 0.0
            for k in range(d):
                diff = X[i, k] - X[j, k]
                dist += diff * diff
            dist = dist ** 0.5
            result[i, j] = dist
            result[j, i] = dist
    return result

X = np.random.rand(2000, 10)

# Warm up
parallel_pairwise_distance(X[:10])
serial_pairwise_distance(X[:10])

t0 = time.perf_counter()
r_serial = serial_pairwise_distance(X)
t1 = time.perf_counter()
r_parallel = parallel_pairwise_distance(X)
t2 = time.perf_counter()

print(f"Serial:   {t1-t0:.3f}s")
print(f"Parallel: {t2-t1:.3f}s")
print(f"Speedup:  {(t1-t0)/(t2-t1):.1f}x (on {numba.get_num_threads()} threads)")
print(f"Results match: {np.allclose(r_serial, r_parallel)}")

Output:

Serial:   0.847s
Parallel: 0.122s
Speedup:  6.9x (on 8 threads)

Parallel speedup scales with the number of CPU cores. The key constraint is that the loop iterations must be independent — if iteration i depends on iteration i-1, you cannot parallelize it. The prange function signals to numba that you guarantee independence; if you use regular range with parallel=True, numba will still compile but will not parallelize the loop.

numba prange parallel loop execution
prange: when one core just is not enough.

Real-Life Example: Monte Carlo Pi Estimation

Monte Carlo simulation is a classic use case for numba: a tight inner loop with no dependencies between iterations, pure floating-point math, and a meaningful speedup target. This implementation estimates pi by sampling random points inside a unit square and counting how many fall inside the unit circle:

# monte_carlo_pi.py
import numba
from numba import prange
import numpy as np
import time

@numba.njit(parallel=True, cache=True)
def monte_carlo_pi(n_samples):
    """Estimate pi via Monte Carlo simulation."""
    inside = 0
    for _ in prange(n_samples):
        x = np.random.random()
        y = np.random.random()
        if x*x + y*y <= 1.0:
            inside += 1
    return 4.0 * inside / n_samples

def monte_carlo_pi_numpy(n_samples):
    """Same computation using vectorized NumPy."""
    x = np.random.rand(n_samples)
    y = np.random.rand(n_samples)
    inside = np.sum(x*x + y*y <= 1.0)
    return 4.0 * inside / n_samples

N = 100_000_000

# Warm up
monte_carlo_pi(1000)

t0 = time.perf_counter()
pi_np = monte_carlo_pi_numpy(N)
t1 = time.perf_counter()
pi_nb = monte_carlo_pi(N)
t2 = time.perf_counter()

print(f"NumPy  pi={pi_np:.6f}  time={t1-t0:.3f}s")
print(f"Numba  pi={pi_nb:.6f}  time={t2-t1:.3f}s")
print(f"True   pi={np.pi:.6f}")
print(f"Speedup: {(t1-t0)/(t2-t1):.1f}x")

Output:

NumPy  pi=3.141596  time=1.823s
Numba  pi=3.141618  time=0.241s
True   pi=3.141593
Speedup: 7.6x

The numba version runs each sample on a separate thread without allocating a 100-million-element array. NumPy allocates two 800 MB arrays before doing any math, which causes memory bandwidth pressure. Numba's loop generates one random number per iteration, processes it immediately, and discards it -- a much more cache-friendly access pattern. For problems larger than the CPU cache, this streaming pattern will outperform NumPy's vectorized approach even on a single core.

Frequently Asked Questions

Why is the first numba call slow?

The first call triggers JIT compilation: numba inspects the argument types, generates LLVM IR, and compiles it to machine code. This takes 0.1–2 seconds depending on function complexity. Use cache=True to save the compiled binary to disk, or pass an explicit type signature to compile at import time with @njit(float64(float64[:])). In long-running servers or batch jobs, the warm-up cost amortizes quickly over millions of calls.

Does numba replace NumPy?

No -- numba and NumPy are complementary. NumPy excels at vectorized array operations on large, regular data. Numba excels at loops with complex branching logic, accumulated state (like running totals), or access patterns that are hard to vectorize. Many high-performance scientific libraries use NumPy for data layout and numba for the innermost computation. A common pattern is to pass NumPy arrays into @njit functions and return NumPy arrays from them.

What Python constructs does numba support in nopython mode?

Numba supports Python arithmetic, boolean logic, comparisons, while/for loops, if/else, most math functions, and a large subset of NumPy array operations including indexing, slicing, shape, and common ufuncs. It does not support dictionaries, sets, string formatting, list comprehensions with dynamic types, or arbitrary Python objects. When in doubt, decorate with @njit and run -- numba's error messages clearly identify the unsupported construct.

How do I debug a numba-compiled function?

Temporarily remove the @njit decorator and run the function as plain Python -- it will work identically but slowly. Once the logic is correct, add the decorator back. You can also use @njit(debug=True) to enable bounds checking (raises IndexError on out-of-bounds array access instead of silent memory corruption) and numba.typed.List for typed lists that work in nopython mode. The func.inspect_types() method shows inferred types for each variable.

Can numba run on GPUs?

Yes -- numba supports CUDA GPU kernels via @numba.cuda.jit. You write the kernel as a Python function that operates on individual elements, and CUDA launches it across thousands of threads. You need an NVIDIA GPU with CUDA support and the cudatoolkit package installed. For most numerical work, the parallel CPU mode (parallel=True) delivers sufficient speedup without the complexity of GPU memory management, but for matrix operations on data above ~1 GB the GPU path becomes compelling.

Conclusion

Numba eliminates the traditional trade-off between Python's readability and native code performance. By adding a single decorator to functions with tight loops or heavy floating-point math, you can achieve speedups of 10x to 200x over pure Python and 2x to 10x over NumPy vectorization. In this article we covered the @njit decorator for strict compilation, cache=True for persisting compiled code, @vectorize for custom NumPy ufuncs, and parallel=True with prange for multi-core execution.

The most important rule is to use @njit rather than @jit so that silent fallbacks are impossible. Apply numba to the innermost loop of your computation -- the 20% of code consuming 80% of your runtime -- and leave the rest as plain Python. Profile first with cProfile or line_profiler to confirm where the bottleneck actually is before adding any decorator. When you are ready for the next level, explore @guvectorize for generalised ufuncs, numba.typed.Dict for compiled dictionaries, and @numba.cuda.jit for GPU kernels.

How To Use Python PyGame for 2D Game Development

How To Use Python PyGame for 2D Game Development

Intermediate

You have learned the syntax, wrestled with list comprehensions, and maybe even built a web API. But at some point every Python developer asks the same question: can I actually build a game with this? The answer is yes — and Python’s pygame library makes it surprisingly accessible. Whether you want to build a simple platformer, a Pong clone, or a top-down shooter, PyGame gives you the primitives you need: a game loop, sprite management, keyboard input, collision detection, and a drawing surface that updates 60 times per second.

PyGame is a set of Python modules built on top of SDL (Simple DirectMedia Layer), a C library that handles graphics, sound, and input across Windows, macOS, and Linux. You install it with pip install pygame, and from that point everything happens in pure Python. You do not need a game engine, a shader language, or a degree in computer graphics. If you can write a for loop and create a Python class, you can build a working 2D game today.

This article walks you through everything you need to get started with PyGame in Python. We will cover the game loop pattern that makes real-time graphics work, drawing shapes and images to the screen, handling keyboard and mouse input, creating reusable sprite classes, detecting collisions between objects, and managing game state. By the end you will have built a fully playable Pong clone from scratch.

PyGame Quick Example: A Moving Square

The fastest way to see PyGame in action is to open a window and move a shape around with the arrow keys. This 30-line script captures the entire structure of a PyGame game:

# moving_square.py
import pygame

pygame.init()
screen = pygame.display.set_mode((640, 480))
pygame.display.set_caption("Moving Square")
clock = pygame.time.Clock()

x, y = 300, 220
speed = 4
running = True

while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    keys = pygame.key.get_pressed()
    if keys[pygame.K_LEFT]:  x -= speed
    if keys[pygame.K_RIGHT]: x += speed
    if keys[pygame.K_UP]:    y -= speed
    if keys[pygame.K_DOWN]:  y += speed

    screen.fill((30, 30, 30))
    pygame.draw.rect(screen, (0, 200, 100), (x, y, 40, 40))
    pygame.display.flip()
    clock.tick(60)

pygame.quit()

Output:

A 640x480 window opens with a green square you can move with the arrow keys at 60 fps.

Four lines do all the heavy lifting: pygame.init() starts every PyGame subsystem, pygame.display.set_mode() creates the window, clock.tick(60) caps the frame rate at 60 FPS, and pygame.display.flip() pushes everything you drew onto the screen. The while running loop is the game loop — it runs continuously until the player closes the window. We will break down every piece of this pattern in the sections below.

What Is PyGame and How Does It Work?

PyGame is a game development library for Python that wraps the SDL2 multimedia library. SDL handles low-level tasks — opening a window, reading keyboard/mouse events, blitting pixel data to the screen — while PyGame gives you a Pythonic API on top of it. Every PyGame program follows the same fundamental structure: initialize, create a window, enter the game loop, handle events, update state, draw, and repeat.

Think of the game loop like a film projector. A film is a sequence of still frames shown fast enough that your brain perceives motion. Your game is the same — you redraw the entire screen every frame (typically 60 times per second), and each frame shows the objects in their slightly updated positions. clock.tick(60) tells PyGame to pause long enough so each iteration takes at least 1/60th of a second, keeping the animation smooth regardless of how fast the computer is.

ComponentPyGame ToolPurpose
Windowpygame.display.set_mode()Creates the drawing surface
Game loopwhile running: + clock.tick()Keeps the game running at fixed FPS
Inputpygame.event.get(), pygame.key.get_pressed()Keyboard and mouse input
Drawingpygame.draw.*, screen.blit()Renders shapes and images
Spritespygame.sprite.SpriteReusable game object class
Collisionpygame.Rect.colliderect()Detects overlapping rectangles
Soundpygame.mixerPlays audio files

Install PyGame before running any of the examples below:

# Install PyGame
pip install pygame
PyGame arcade cabinet with Python game loop
60 frames per second. clock.tick(60) is the only thing between smooth gameplay and slideshow mode.

Understanding the Game Loop

The game loop is the heartbeat of every PyGame program. Understanding it deeply will save you from mysterious bugs later. Here is the canonical three-phase pattern — events, update, draw — with comments explaining each phase:

# game_loop.py
import pygame

pygame.init()
screen = pygame.display.set_mode((800, 600))
clock = pygame.time.Clock()
FPS = 60

player_x = 400
player_y = 300

running = True
while running:
    # --- PHASE 1: Handle events ---
    # pygame.event.get() drains the event queue each frame.
    # If you skip this call, the window freezes and can't be closed.
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
        if event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE:
            running = False

    # --- PHASE 2: Update state ---
    # Read keys that are currently held down (not just pressed this frame).
    # pygame.key.get_pressed() returns a snapshot of the keyboard state.
    keys = pygame.key.get_pressed()
    if keys[pygame.K_w]: player_y -= 3
    if keys[pygame.K_s]: player_y += 3
    if keys[pygame.K_a]: player_x -= 3
    if keys[pygame.K_d]: player_x += 3

    # Keep the player inside the window boundaries
    player_x = max(20, min(780, player_x))
    player_y = max(20, min(580, player_y))

    # --- PHASE 3: Draw ---
    screen.fill((20, 20, 40))       # Clear screen to dark blue
    pygame.draw.circle(screen, (255, 200, 0), (player_x, player_y), 20)
    pygame.display.flip()           # Show the new frame
    clock.tick(FPS)                 # Wait until 1/FPS seconds have passed

pygame.quit()

Output:

A window opens with a yellow circle you move using WASD keys. It stays within the window bounds. Press Escape to quit.

The most important detail is event handling is mandatory. Without pygame.event.get(), the operating system sees your window as unresponsive and will freeze it. You must drain the event queue every frame, even if you do not use the events. The separation between event.type == pygame.KEYDOWN (fires once when a key is pressed) and pygame.key.get_pressed() (fires every frame while a key is held) is the difference between a snappy player vs a character that moves in jerky steps.

Sprites: Reusable Game Objects

Hardcoding player position as two variables works for a tutorial, but the moment you need multiple enemies, multiple bullets, and a player, you need a better structure. PyGame’s Sprite class is the standard way to model game objects:

# sprites_demo.py
import pygame

pygame.init()
screen = pygame.display.set_mode((800, 500))
clock = pygame.time.Clock()

class Player(pygame.sprite.Sprite):
    def __init__(self):
        super().__init__()
        # Create a surface for the sprite and fill it with a color
        self.image = pygame.Surface((50, 50))
        self.image.fill((0, 180, 255))
        self.rect = self.image.get_rect(center=(400, 250))
        self.speed = 5

    def update(self):
        keys = pygame.key.get_pressed()
        if keys[pygame.K_LEFT]:  self.rect.x -= self.speed
        if keys[pygame.K_RIGHT]: self.rect.x += self.speed
        if keys[pygame.K_UP]:    self.rect.y -= self.speed
        if keys[pygame.K_DOWN]:  self.rect.y += self.speed
        # Clamp to window
        self.rect.clamp_ip(pygame.Rect(0, 0, 800, 500))

class Enemy(pygame.sprite.Sprite):
    def __init__(self, x, y, speed):
        super().__init__()
        self.image = pygame.Surface((40, 40))
        self.image.fill((220, 50, 50))
        self.rect = self.image.get_rect(topleft=(x, y))
        self.speed = speed

    def update(self):
        self.rect.x += self.speed
        if self.rect.right > 800 or self.rect.left < 0:
            self.speed = -self.speed  # Bounce off walls

player = Player()
enemies = pygame.sprite.Group(
    Enemy(100, 100, 2),
    Enemy(400, 200, -3),
    Enemy(600, 350, 2),
)
all_sprites = pygame.sprite.Group(player, *enemies.sprites())

running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    all_sprites.update()

    # Check if player was hit by any enemy
    hits = pygame.sprite.spritecollide(player, enemies, False)
    if hits:
        player.image.fill((255, 50, 50))  # Flash red on hit
    else:
        player.image.fill((0, 180, 255))

    screen.fill((15, 15, 30))
    all_sprites.draw(screen)
    pygame.display.flip()
    clock.tick(60)

pygame.quit()

Output:

A window shows a blue player square (WASD to move) and three red enemy squares that bounce. The player turns red when any enemy overlaps it.

The power of pygame.sprite.Group is that calling all_sprites.update() calls update() on every sprite in the group, and all_sprites.draw(screen) blits every sprite's image at its rect position -- in one line. pygame.sprite.spritecollide(player, enemies, False) returns a list of enemies whose rectangles overlap the player; the third argument controls whether colliding sprites are removed from the group (True would kill them, False leaves them alive).

PyGame sprite collision detection
spritecollide() — because manually checking 40 enemy rects against the player was never going to scale.

Collision Detection and Game State

Collision detection is the core mechanic of amost every 2D game. PyGame provides several levels of collision precision. Rectangle collision (Rect.colliderect()) is the fastest and good enough for most games. Pixel-perfect collision (pygame.sprite.collide_mask()) is available for oddly-shaped sprites where rectangle overlap is too imprecise.

# collision_types.py
import pygame

pygame.init()

# --- Rect collision (fastest) ---
player_rect = pygame.Rect(100, 100, 50, 50)
bullet_rect  = pygame.Rect(120, 110, 10, 20)

if player_rect.colliderect(bullet_rect):
    print("Rect collision: hit!")

# --- Point-in-rect collision ---
mouse_pos = (125, 125)
if player_rect.collidepoint(mouse_pos):
    print("Point collision: mouse is over player!")

# --- Group collision with kill ---
# spritecollide(sprite, group, dokill)
# dokill=True removes the colliding sprites from the group automatical[K���\�Y�[�܈�[]�][��[�[ZY\ΈH�[][�[�[^H\�\X\�ۈ[\X�����\���[]
Y�[YK���]K���]JN��Y���[�]���[�JN���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J
�
JB��[��[XY�K��[

�MK�MK
JB��[���X�H�[��[XY�K��]ܙX�
�[�\�JJJB��Y�\]J�[�N���[���X��HOH
��[ݙH\�Y��[���X�����H���[���[

H��[[ݙH���H[ܛ�\��[�ٙ��ܙY[����\��\��]
Y�[YK���]K���]JN��Y���[�]���[�JN���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J

��
JB��[��[XY�K��[

�
JB��[���X�H�[��[XY�K��]ܙX�
�Y�JJJB���[]�HY�[YK���]K�ܛ�\
�[]
�

JB�\��]�HY�[YK���]K�ܛ�\
\��]
M�L
K\��]
�ML
JB������\��ۙH��[YHو��\�[ۜ˜�[]˝\]J
B�]�HY�[YK���]K�ܛ�\��YJ�[]�\��]��YK�YJB��]�H؝[]���]N��\��]���]\��]_B��[�
���[]�]]\��]Έ�[�]�_H�B����O���O������ۙϓ�]]����ۙϏ����O���O��X���\�[ێ�]B��[���\�[ێ�[�\�H\�ݙ\�^Y\�B��[]�]]\��]Έ����O���O����HY��\�[��H�]�Y[���O���]X��YO���O�[���O�ܛ�\��YO���O����O���]X��YO���O��X���ۙH��]HY�Z[��Hܛ�\�[H��O�ܛ�\��YO���O��X���]�\�H��]H[�ܛ�\HY�Z[��]�\�H��]H[�ܛ�\���]����O���[���O�\��[Y[�����O��YO���O��XZ�H��\�[ۜ�]]�X]X�[H\���H��H�[][�H\��]KHH�X[�ۙK[[�HYX�[�X��܈���[���[Y\ˏ�����YH�^X[�\��ܙH���[�\�[��^[���ܙO������]�\�H�[YH�YY�H��ܙH\�^K�Q�[YI���۝�\�[H]�[�H�[�\�[�H[��[Y�\�[H�۝܈H�Z[Z[��۝�H�\��X�K[��]]�\��X�H۝�H�ܙY[�Z�H[�H�\�[XY�N������O���O����ܙW�\�^K�B�[\ܝY�[YB��Y�[YK�[�]

B��ܙY[�HY�[YK�\�^K��]�[�J

�

JB�����HY�[YK�[YK�����
B���Y�[YK��۝��۝
�۝ٚ[K�^�JHKH\���ۙH�܈HY�][�Z[Z[��۝��۝�\��HHY�[YK��۝��۝
�ۙK
�
B��۝��X[HY�[YK��۝��۝
�ۙK̊B����ܙHH�]�\�Hœ�[��[��H�YB���[H�[��[�΂��܈]�[�[�Y�[YK�]�[���]

N��Y�]�[��\HOHY�[YK�URU���[��[��H�[�B�Y�]�[��\HOHY�[YK��VQ�ӈ[�]�[���^HOHY�[YK����P�N����ܙH
�HL��[][]H��ܚ[��H�[����ܙY[���[

LL�JJB����[�\�^�H�\��X�K[��]]��ܙ[�]\ˆ��ܙW��\��H�۝�\��K��[�\�����ܙN����ܙ_H��YK
�MK��
JB�]�\���\��H�۝��X[��[�\���]�\Έ�]�\�H��YK
���
JB���ܙY[���]
��ܙW��\��
��
JB��ܙY[���]
]�\���\��
�L
JB����[�\�Y��\���P�H���ܙH�[��[�H�۝��X[��[�\���\���P�H���ܙH��YK
LLL
JB�[�ܙX�H[���]ܙX�
�[�\�J̌�
JB��ܙY[���]
[�[�ܙX�
B��Y�[YK�\�^K��\

B����˝X��
�
B��Y�[YK�]Z]

B����O���O������ۙϓ�]]����ۙϏ����O���O�H�[����������ܙN��[��]�\Έȋ��\���P�H�[�ܙ[Y[�H��ܙH[��X[[YK����O���O������O��۝��[�\�^[�X[X\���܊O���O��]\���H�Y�[\�Q�[YH�\��X�K�H�X�ۙ\��[Y[�
��O��YO���O�H[�X�\�[�X[X\�[���܈�[��\�^�ۘ�H[�H]�HH�\��X�H[�H��][ۈ]�]��O��]
�\��
JJO���O�܋[ܙH�^X�K�]��O��\����]ܙX�
�[�\�J���JK����O�KHH��O��]ܙX����O�[\�]�[�H[��܈H�X�[��H�H[�Hو]��ܛ�\��Y�\�܈�[�\��X�\��\�X\�Y\�[�X[�X[Hٙ��][���H[�H^�Y�����KKHSPQ�W�P�R�T����\��HZ[�[��H�X[���ܙX��\�ۈH��X���[�[H]ۈ��H�ܛ��X�ݙH]��\[ێ���۝��[�\�
HKH[�\���ܙK\�^YY�[�\�Yۚ]K�Y��XX�K��KO����YH��X[[Y�KY^[\H���X[SY�H^[\N�ۙ�[�]ۏ������[YH��Z[��Y][��^XX�K�\�ۙ�[\[Y[�][ۈ\�[�\�L�[�\��\ܝ���^Y\��
���[�T��ӊK�X�����ܙK[�\�\�]�\�][���H�ݙ\�Y��[YH����]\���\�[ۋ[�^�[�\�[�ˏ�����O���O��ۙ˜B�[\ܝY�[YK�\‚�Y�[YK�[�]

B��H
���ܙY[�HY�[YK�\�^K��]�[�J
�
JB�Y�[YK�\�^K��]��\[ۊ�ۙ�KH]ۈY][ۈ�B�����HY�[YK�[YK�����
B��۝HY�[YK��۝��۝
�ۙK
̊B��X[HY�[YK��۝��۝
�ۙK͊B�����ܜ•�UHH
�MK�MK�MJB��P��H

B�ԐVHH

B�QS��H
�MK��
B���\��YJY�[YK���]K���]JN��Y���[�]���[�\��^K�ۗ��^JN���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J
ML
JB��[��[XY�K��[
�UJB��[���X�H�[��[XY�K��]ܙX�
�[�\�J���JB��[���YYH
���[��\��^K�[���ۗ��^HH\��^K�ۗ��^B��Y�\]J�[�N���^\�HY�[YK��^K��]��\��Y

B�Y��^\���[��\��^WN��[���X��HOH�[���YY�Y��^\���[���ۗ��^WN��[���X��H
�H�[���YY��[���X���[\�\
Y�[YK��X�
�
JB���\���[
Y�[YK���]K���]JN��Y���[�]���[�N���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J
M�M�JB�Y�[YK��]˘�\��J�[��[XY�KQS��

K
B��[��[XY�K��]���ܚ�^J�P��B��[���X�H�[��[XY�K��]ܙX�
�[�\�J�������JB��[����[���HH
K
��Y�\]J�[�Y\�N���[���X��
�H�[�����[���X��H
�H�[���B����[��Hٙ��؛��B�Y��[���X���H܈�[���X�����H�H���[���HH\�[���B����[��Hٙ�Y\ˆ�܈YH[�Y\΂�Y��[���X����Y\�X�
YK��X�
N���[���H\�[�����[���H[�
�[���
�K�
JH��YY\�Y�HXX�]��Y���ܙY
�[�N������]\���HY��Y�^Y\���ܙYLHY�Y�^Y\���ܙY�\��\�K�����Y��[���X���Y���]\��H�Y�Z\��YKH�Y���ܙ\ˆY��[���X��Y��Έ�]\��LH��Y�Z\��YKHY���ܙ\ˆ�]\����Y��\�]
�[�N���[���X���[�\�H
�������B��[����[���HH
K
��HHYJ�Y�[YK����Y�[YK����B��HYJ
��Y�[YK���TY�[YK����ӊB��[H�[

B�Y\�HY�[YK���]K�ܛ�\
K�B�[���]\�HY�[YK���]K�ܛ�\
K��[
B����ܙW�Y���ܙWܚY�H��S����ԑHH
B���[H�YN���܈]�[�[�Y�[YK�]�[���]

N��Y�]�[��\HOHY�[YK�URU��Y�[YK�]Z]

N��\˙^]

B��Y\˝\]J
B��[�\]JY\˜��]\�
JB���\�[H�[���ܙY

B�Y��\�[OHN����ܙWܚY�
�HN��[��\�]

B�[Y��\�[OHLN����ܙW�Y�
�HN��[��\�]

B����]ˆ�ܙY[���[
�P��B�Y�[YK��]˛[�J�ܙY[�ԐVK
����
K
����
K�H��[�\�[�B��[���]\˙�]��ܙY[�B�����ܙ\ˆ�ܙY[���]
�۝��[�\�����ܙW�Y�
K�YK�UJK
���H��
JB��ܙY[���]
�۝��[�\�����ܙWܚY�
K�YK�UJK
ʕ���H��
JB��Y���ܙW�Y��H�S����ԑH܈��ܙWܚY��H�S����ԑN���[��\�H�Y��Y���ܙW�Y��H�S����ԑH[�H��Y���\��H�X[��[�\�����[��\�H^Y\��[��H�\�����\�\����YKQS��B��ܙY[���]
\��\�˙�]ܙX�
�[�\�J��̋�̊JJB�Y�[YK�\�^K��\

B��Z][��H�YB��[H�Z][�΂��܈]�[�[�Y�[YK�]�[���]

N��Y�]�[��\HOHY�[YK�URU��Y�[YK�]Z]

N��\˙^]

B�Y�]�[��\HOHY�[YK��VQ�ӈ[�]�[���^HOHY�[YK��܎����ܙW�Y�H��ܙWܚY�H��[��\�]

B��Z][��H�[�B��Y�[YK�\�^K��\

B����˝X��
�
B����O���O������ۙϓ�]]����ۙϏ����O���O�[�
�ۙ��[����[�ˈY�^Y\�\�\�����Y�^Y\�\�\�T��Ӌ��\���
H�[���[�ˈ�\�����\�\�����O���O����\��ڙX�Y\���]\�]�\�H�ۘ�\���HH\�X�N�H�[YH���]�\�]�\�][����O�YO���O�[���O��[���O�\�H��\���O���]O���O��X��\��\��]Z\��ۈ��O�\]J
O���O���X��X�[��H��\�[ۈ[�\�H�[\YH��[��K[���O��۝��[�\�
O���O��Y\�H��ܙX��\��\��[�]�\�H��[YK��^[�]�Y��[�Y��X���]��O�Y�[YK�Z^\����[����O��\X�HH�[Y�X�[��\��]����]\��YY�XH��O�Y�[YK�[XY�K��Y

O���O�܈Y[�RHYH]�X���H�[	��H��][ۋ�����KKHSPQ�W�P�R�T��\��]H][��[��H���[��Y[���[Xܛ���H�[ۈ��\��]Y\�ۈ���Y\ˈ�\[ێ���[���H\�[����H[�\�H\�X��[��[�H�܈ۙ�[�ۙH[�K��KO����YH��\H����\]Y[�H\��Y]Y\�[ۜ�������YH��\KZ[��[�����H[��[Q�[YH[��X��Y�]�ܚ����ς���[���O�\[��[Y�[YO���O�[�[�\�\�Z[�[�Y�\�[��[[���\�Y�H]�ܚ���H�[��[����O�]ۈ[HY�[YK�^[\\˘[Y[�����O�KH\��[��H�Z[Z[��[\H�[YH]�\��]Q�[YK�Y�H�[����[��[�H�[YH�[��[�\�[��[][ۈ\��ܜ�X��ۈ��YH[�^�\�[\�[�HX^H�YY��O��Y�\[��[]ی�\�����O��\����]\ٞH��\[�[��Y\ˏ�����YH��\KY�ȏ��H�\�^H�[YH�[�]Y��\�[��YY�ۈY��\�[���\]\����ς���YY[��ۜ�\�[��H\[���[�[�H[ݙHؚ�X���HH�^Y�[X�\�و^[�\���[YK�]H���\�Y\��]�Y[�XX�[�\ˈH�^\�[K][YH[ݙ[Y[��][\H[�\��YY�H��O����O�
H[YH[��X�ۙ��[��HH\���[YK�]\��Y�H��O����˝X��
O���O�K�^[\N���O��[���X��
�H�[���YY
����O��\�H��O�H���˝X��
�
H�L����O��\�XZ�\�[ݙ[Y[���[YK\�]KZ[�\[�[�KHH�\���\]\�[�H���ۙH�[[ݙHH�[H�[YH\�[��H\��X�ۙ������YH��\KZ[XY�\ȏ����H\�H��[XY�\�[��XYو��ܙY�X�[��\���ς���\X�H��O��[��[XY�HHY�[YK��\��X�J
�
JO���O��]��O��[��[XY�HHY�[YK�[XY�K��Y
�^Y\���ȊK��۝�\��[J
O���O��H��O��۝�\��[J
O���O��[�۝�\��H[XY�H�H�ܙY[���^[�ܛX][��\�\��\��[��\�[��HKH�]�]]���[��\�[��H����\��X�ˈ��[H[XY�\��]��O�Y�[YK��[�ٛܛK���[J�\��
�]����]��
JO���O��Y�ܙH\��Yۚ[�����O��[��[XY�O���O���Y\[�\���]H[XY�\�[�HYX�]Y��O�\��]�����O���\��^�[�\��[YH�ܚ\������YH��\K\��[������H^H��[�Y��X����ς��Q�[YI����O�Y�[YK�Z^\����O�[�[H[�\�]Y[ˈ[�]X[^�H]�]��O�Y�[YK�Z^\��[�]

O���O�
[�XYH�[Y�H��O�Y�[YK�[�]

O���O�K�YH�U�܈����[H�]��O���[�HY�[YK�Z^\����[�
�]��]��O���O�[�^H]�]��O���[��^J
O���O���܈�X��ܛ�[�]\�X�\�H��O�Y�[YK�Z^\��]\�X˛�Y
�[YK���ȊO���O�[���O�Y�[YK�Z^\��]\�X˜^JLJO���O�
H��O�LO���O����[�Y�[�][JK�Q�[YH�\ܝ��U����[�T��Y����\��X��[Y[�Y�܈]\�X�\�]��X[\��]�]�Y[��H��H�[K������YH��\KY\��X�]H���[�H�\�H^HQ�[YH�[YH�][�H�����]�H]ۏ��ς��Y\�KH\�H��O�Z[��[\����O���[�H[�\��[YH[��H�[�[ۙH^X�]X�K��[���O�\[��[Z[��[\����O�[���O�Z[��[\�K[ۙY�[HK]�[���Y[�\���[YK�O���O��R[��[\��[�\�]ۋQ�[YK[�[\��]�[��H�[��H��O��^O���O�
�[����H܈�[�\�H
XX����[�^
H]�[���]�][�H[��[][ۋ�Y��O�KXYY]H�\��]��\��]ȏ���O��[��YH[�\�[XY�H[���[��[\ˈ\�H�\�[[���[�\�HۈH�X[�XX�[�H�Y�ܙH\��X�][�ˏ�����YH��ۘ�\�[ۈ���ۘ�\�[ۏ������Q�[YH�]�\�]ۈ]�[�\��H��\]H��[YH]�[�Y[����]]�[��ۈ]�\�HXZ�܈�\�][���\�[K�[�\�\�X�H�H�ݙ\�YH�[YH��]�]�\��X[][YHܘ\X��H��O���]O���O��\���܈�Z[[���]\�X�H�[YHؚ�X����\�[ۈ]X�[ۈ�]��O���Y\�X����O�[���O���]X��YO���O�^�[�\�[���܈��ܙ\�[�RK[�]][��]\�[�H�ܚ�[��ۙ��ۙK�H�[YH]\���KH��\]K�]�KH\H�]\�[�H�Z[H[�H^��H�[YH܈H�[]�ܛY\�������^[�Hۙ��ۙH�HY[����[�Y��X���]��O�Y�[YK�Z^\����O��\X�[���X�[��\��]����]\��XH��O�Y�[YK�[XY�K��Y

O���O�܈Y[��Y��X�[H��[[��][�ܙX\�\��[�YY\�H�[YH��ܙ\��\ˈ�[�[�H\�H�XYH�܈\��\��ڙX��^ܙH��O�Y�[YK���]K�^Y\�Y\]\����O��܈�[ܙ\�[����]\���O�Y�[YK��\��X�K��ܛ�

O���O��܈�ܛ�[���X��ܛ�[��[�H�[Y\�H]\���܈�ܛ�\��\�[�H�ܙY[�������Hٙ�X�X[��[Y[�][ۈ]H�Y�H�΋����˜Y�[YK�ܙ�����ȏ�Y�[YK�ܙ������O��ݙ\��]�\�H[�[H[�]Z[��܈��X�\�YX\��[��HH�Y�H�΋�ܙX[]ۋ���K�Y�[YKXK\�[Y\�ȏ��X[]ۈQ�[YH�[Y\��O��ݚY\�^�[[��\[Y[�\�H^\��\�\ˏ�����YH��[]YX\�X�\ȏ��[]Y\�X�\�����[��O�H�Y�H�΋��]ۚ�����ܘ[K���K�]ۋ]�XY[��]��[][\���\��[��]��X\�[��[�]�[�]�]\�KYXX�ȏ�]ۈ�XY[����][\���\��[����\�[��[Έ�[��\�HXX��O��O��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\]ۋ\[��Y�܋Z[XY�K\���\��[��ȏ����\�H]ۈ[���܈[XY�H���\��[���O��O��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\]ۋY]X�\��\�Y�܋X�X[�\�X��Kȏ����\�H]ۈ]X�\��\��܈�X[�\���O�O��O���[�����]���^V��]�����[[�V��]��ܛ��V��]����X�[ۗB�KKH���]�K�X�Z�\�KO�				
					
How To Use Python pre-commit for Automated Code Quality Hooks

How To Use Python pre-commit for Automated Code Quality Hooks

Intermediate

Code review comments about formatting, trailing whitespace, and import ordering are a waste of everyone’s time. If a linter can catch it automatically, a human reviewer shouldn’t have to. The problem is that getting developers to manually run Black, Ruff, and mypy before every commit requires discipline that erodes under deadline pressure — and one lapse is all it takes for formatting drift to creep into the codebase.

pre-commit solves this by turning your linters and formatters into Git hooks that run automatically when you type git commit. If any hook fails, the commit is blocked until the issue is fixed. Black even fixes formatting automatically and re-stages the corrected files. The whole setup is declared in a single .pre-commit-config.yaml file that every developer installs once with a single command. Install the tool with pip install pre-commit.

In this tutorial you’ll install pre-commit, configure hooks for Black, Ruff, mypy, and trailing whitespace checks, understand how hooks run and fail, add custom scripts as hooks, and set up the same checks in GitHub Actions so they run in CI. By the end your project will automatically enforce code quality standards on every commit without any manual effort.

Python pre-commit: Quick Example

Here is a minimal working setup. These are terminal and configuration commands, not Python scripts.

# Terminal

# Install pre-commit
pip install pre-commit

# In your project root, create .pre-commit-config.yaml
cat > .pre-commit-config.yaml << 'EOF'
repos:
  - repo: https://github.com/psf/black
    rev: 24.4.2
    hooks:
      - id: black

  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.4.4
    hooks:
      - id: ruff
        args: [--fix]
EOF

# Install the hooks into .git/hooks/
pre-commit install

# Now try committing a file -- the hooks run automatically
echo "x=1+2; print(x)" > test_code.py
git add test_code.py
git commit -m "test commit"

Output:

black....................................................................Failed
- hook id: black
- files were modified by this hook

reformatted test_code.py

ruff.....................................................................Passed

Black reformatted the file in place (x=1+2 becomes x = 1 + 2) and staged the change, but the commit was blocked. Run git add test_code.py again and re-commit — this time both hooks pass and the commit goes through. The pattern of “fail, fix, re-add, re-commit” takes getting used to, but it becomes muscle memory quickly.

What is pre-commit and How Does It Work?

pre-commit is a framework for managing Git hooks. A Git hook is a script that Git runs automatically at specific points — before a commit (pre-commit), before a push (pre-push), or after a merge (post-merge). The pre-commit framework makes it easy to install and configure hooks from a catalog of community-maintained hooks, without writing shell scripts manually.

Hook stageWhen it runsCommon use
pre-commitBefore creating a commitLinting, formatting, secret detection
pre-pushBefore pushing to remoteRunning tests, type checking
commit-msgAfter writing commit messageEnforcing commit message format
post-checkoutAfter git checkoutInstalling dependencies

Each hook runs in an isolated virtual environment that pre-commit creates and manages automatically. This means the hook’s tools (e.g., Black) are installed at a specific pinned version independent of your project’s virtual environment. Even if your project uses Black 23.x, the pre-commit hook uses whatever version you pinned in .pre-commit-config.yaml — no conflicts.

Tutorial image
pre-commit: the last line of defense before the pull request of shame.

A Full pre-commit Configuration

Here is a production-ready .pre-commit-config.yaml that covers formatting, linting, type checking, and common file issues. Customize by removing hooks you don’t need.

# .pre-commit-config.yaml
# Run 'pre-commit autoupdate' periodically to update pinned versions

default_language_version:
  python: python3.11

repos:
  # General file checks (no extra dependencies required)
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.6.0
    hooks:
      - id: trailing-whitespace      # remove trailing spaces
      - id: end-of-file-fixer        # ensure files end with a newline
      - id: check-yaml               # validate YAML syntax
      - id: check-toml               # validate TOML syntax
      - id: check-json               # validate JSON syntax
      - id: check-merge-conflict     # block accidental merge conflict markers
      - id: check-added-large-files  # prevent committing huge files
        args: ['--maxkb=500']
      - id: debug-statements         # catch leftover pdb/breakpoint() calls
      - id: no-commit-to-branch      # prevent direct commits to main/master
        args: ['--branch', 'main', '--branch', 'master']

  # Black -- opinionated code formatter
  - repo: https://github.com/psf/black
    rev: 24.4.2
    hooks:
      - id: black
        language_version: python3.11

  # Ruff -- fast linter (replaces flake8, isort, and more)
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.4.4
    hooks:
      - id: ruff
        args: [--fix, --exit-non-zero-on-fix]

  # mypy -- optional static type checker (remove if not using type hints)
  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.10.0
    hooks:
      - id: mypy
        additional_dependencies: [types-requests, types-PyYAML]
        args: [--ignore-missing-imports]

The rev field pins each hook to a specific tagged version. Running pre-commit autoupdate updates all these pins to the latest released versions — it’s good practice to run this every month or as part of your dependency update process. The no-commit-to-branch hook is particularly valuable on shared projects: it prevents developers from accidentally pushing directly to the main branch instead of creating a pull request.

Running Hooks Manually and Skipping Them

You don’t have to commit to run the hooks. Several commands let you run hooks on demand or against the entire codebase.

# Terminal -- running hooks manually

# Run all hooks on all staged files (same as a commit would)
pre-commit run

# Run all hooks on ALL files in the repo (not just staged)
pre-commit run --all-files

# Run a single specific hook
pre-commit run black --all-files
pre-commit run ruff --all-files

# Skip hooks entirely for one commit (use sparingly)
git commit -m "WIP: quick fix" --no-verify

# Update all hooks to latest versions
pre-commit autoupdate

# Clear cached hook environments (useful when a hook behaves strangely)
pre-commit clean

# Uninstall hooks (removes from .git/hooks/)
pre-commit uninstall

Output of pre-commit run –all-files (example):

Trim Trailing Whitespace.................................................Passed
Fix End of Files.........................................................Passed
Check Yaml...............................................................Passed
Check for added large files..............................................Passed
Debug Statements (Python)................................................Passed
black....................................................................Passed
ruff.....................................................................Failed
- hook id: ruff
- exit code: 1

app/models.py:12:1: F401 `os` imported but unused
Found 1 error.

The --no-verify flag completely bypasses all hooks for one commit. Use it only when you have a genuine reason to skip checks (e.g., committing auto-generated code that doesn’t meet style standards, or doing an emergency hotfix). Overusing --no-verify defeats the purpose of having hooks. A better approach for long-term exceptions is to add # noqa: F401 inline comments for specific Ruff suppressions, or configure exclusions in pyproject.toml.

Tutorial image
All hooks passed. The code review will be mercifully short.

Running pre-commit in GitHub Actions

pre-commit hooks run locally on each developer’s machine, but you also want them to run in CI so that PRs from contributors who didn’t install hooks are still checked. GitHub Actions makes this easy.

# .github/workflows/quality.yml

name: Code Quality

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main, develop]

jobs:
  pre-commit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      # Cache pre-commit environments to speed up runs
      - uses: actions/cache@v4
        with:
          path: ~/.cache/pre-commit
          key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }}
          restore-keys: pre-commit-

      - name: Install pre-commit
        run: pip install pre-commit

      - name: Run all hooks
        run: pre-commit run --all-files

The cache step stores the hook environments so they don’t need to be recreated on every CI run. The cache key is based on the hash of .pre-commit-config.yaml — when you change hook versions or add new hooks, the cache is automatically invalidated. A typical pre-commit CI run takes 30-60 seconds with a warm cache, versus 2-3 minutes cold on the first run.

Real-Life Example: Setting Up a New Python Project with Full Quality Gates

Tutorial image
Enforced at commit, enforced in CI, enforced in code review. Pick your battles.

Here is a complete bootstrap script that sets up a new Python project with pre-commit, Black, Ruff, and a matching pyproject.toml configuration that keeps all tools in sync.

# setup_quality_gates.py
# Run once to set up a new project: python setup_quality_gates.py

import subprocess
import os

def run(cmd):
    print(f"$ {cmd}")
    subprocess.run(cmd, shell=True, check=True)

# 1. Install tools
run("pip install pre-commit black ruff mypy")

# 2. Install hooks into .git
run("pre-commit install")
run("pre-commit install --hook-type commit-msg")  # optional: enforce commit format

# 3. Create pyproject.toml tool config (appends if file exists)
config = """
[tool.black]
line-length = 100
target-version = ["py311"]
include = '\\.pyi?$'
exclude = '''
/(
    \\.git
  | \\.venv
  | __pycache__
  | migrations
)/
'''

[tool.ruff]
line-length = 100
target-version = "py311"
select = [
    "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "I",   # isort
    "B",   # flake8-bugbear
    "UP",  # pyupgrade
]
ignore = ["E501"]  # line too long (handled by black)

[tool.ruff.per-file-ignores]
"tests/*" = ["S101"]  # allow assert in tests

[tool.mypy]
python_version = "3.11"
ignore_missing_imports = true
strict = false
"""

mode = "a" if os.path.exists("pyproject.toml") else "w"
with open("pyproject.toml", mode) as f:
    f.write(config)

# 4. Run hooks on all files to establish baseline
print("\\nRunning hooks on all files (initial baseline)...")
result = subprocess.run("pre-commit run --all-files", shell=True)
if result.returncode == 0:
    print("\\nAll hooks passed! Project is clean.")
else:
    print("\\nSome files were reformatted. Review changes with 'git diff', then commit.")

Output:

$ pip install pre-commit black ruff mypy
$ pre-commit install
pre-commit installed at .git/hooks/pre-commit
Running hooks on all files (initial baseline)...
Trim Trailing Whitespace.................................................Passed
black....................................................................Passed
ruff.....................................................................Passed
All hooks passed! Project is clean.

The script uses subprocess.run(..., check=True) for the setup commands so it fails immediately if any step errors — no silent partial setups. The pyproject.toml configuration aligns Black and Ruff to the same line length (100) so they don’t conflict: Black formats to 100 characters and Ruff’s E501 rule (which would also complain about line length) is disabled since Black handles it.

Frequently Asked Questions

My pre-commit hooks are slow — how do I speed them up?

The most common cause is mypy, which is inherently slow because it type-checks the entire project. Move mypy to a pre-push hook (add stages: [pre-push] under the mypy hook entry) so it only runs on push rather than every commit. For other hooks, run pre-commit run --all-files once to warm the cache — subsequent runs are much faster because hooks only process changed files. Also, replace flake8 + isort with Ruff (which handles both) for a significant speed improvement.

Do all team members need to run pre-commit install?

Yes, each developer needs to run pre-commit install in the cloned repo once. This installs the hooks into the local .git/hooks/ directory, which is not tracked by Git. To automate this, add a Makefile target or a setup.sh script that runs pre-commit install, and document it in your README. Alternatively, use the pre-commit GitHub Action to catch issues in CI from developers who skipped local installation.

Can I exclude specific files or lines from a hook?

Yes. To exclude entire files or directories, add an exclude pattern to the hook config in .pre-commit-config.yaml: exclude: ^(migrations/|docs/). To suppress a specific Ruff or flake8 rule on one line, add a trailing comment: import os # noqa: F401. For Black, you can mark a region to skip with # fmt: off / # fmt: on comments — useful for hand-aligned data structures where Black’s formatting destroys readability.

Can I write my own custom hook?

Yes. Add a repo: local entry in your config and define the hook directly. For example, a custom hook that runs your test suite on push:

repos:
  - repo: local
    hooks:
      - id: run-tests
        name: Run pytest
        entry: pytest tests/ -x -q
        language: system
        pass_filenames: false
        stages: [pre-push]

The language: system setting means the hook uses the Python interpreter from your active virtual environment rather than an isolated pre-commit environment.

Conclusion

pre-commit turns code quality enforcement from a manual discipline into an automatic process. You configured hooks for Black (formatting), Ruff (linting and import sorting), mypy (type checking), and common file hygiene checks; ran them manually with pre-commit run --all-files; handled the fail-fix-re-add-recommit workflow; and wired the same checks into GitHub Actions for CI coverage. The complete setup described here takes about 10 minutes to install on a new project and saves hours of formatting-related code review comments over the life of the project.

The next step is exploring the full catalog of community hooks at pre-commit.com/hooks.html, which includes hooks for detecting secrets accidentally committed to Git, validating Dockerfile syntax, enforcing commit message conventions, and many more. Combine pre-commit with a branch protection rule requiring CI to pass before merging and you’ve built a complete, automated quality gate.

Full documentation is at pre-commit.com.

Setting Up pre-commit

# pip install pre-commit

# Create .pre-commit-config.yaml in repo root:
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.6.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files

  - repo: https://github.com/psf/black
    rev: 24.1.1
    hooks:
      - id: black

  - repo: https://github.com/charliermarsh/ruff-pre-commit
    rev: v0.4.4
    hooks:
      - id: ruff
        args: [--fix]

# Install the git hooks
pre-commit install

# Now every git commit runs the checks automatically

The pre-commit install command writes a .git/hooks/pre-commit shell script. Every commit triggers the checks; failures block the commit until fixed.

Common Hooks for Python

repos:
  # Formatters
  - repo: https://github.com/psf/black
    rev: 24.1.1
    hooks: [{id: black}]
  
  - repo: https://github.com/pycqa/isort
    rev: 5.13.2
    hooks: [{id: isort, args: ["--profile=black"]}]

  # Linters
  - repo: https://github.com/charliermarsh/ruff-pre-commit
    rev: v0.4.4
    hooks:
      - id: ruff
        args: [--fix]

  - repo: https://github.com/pycqa/flake8
    rev: 7.0.0
    hooks: [{id: flake8}]

  # Type checking
  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.10.0
    hooks:
      - id: mypy
        additional_dependencies: [types-requests]

  # Security
  - repo: https://github.com/Yelp/detect-secrets
    rev: v1.5.0
    hooks: [{id: detect-secrets}]

Pin every rev to a specific version — auto-updates to hooks can break the repo for everyone simultaneously.

Running on All Files

# Run hooks against ALL files, not just changed ones — useful after adding new hooks
pre-commit run --all-files

# Run a specific hook
pre-commit run black --all-files

# Update hook versions
pre-commit autoupdate

CI Integration

# GitHub Actions
name: pre-commit
on: [pull_request]

jobs:
  pre-commit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - uses: pre-commit/action@v3.0.1

Run pre-commit in CI as well — local hooks can be bypassed with git commit --no-verify, but CI is the gate that can’t be skipped.

Local Hooks

repos:
  - repo: local
    hooks:
      - id: pytest-fast
        name: pytest fast tests
        entry: pytest -m "not slow" --tb=short -q
        language: system
        pass_filenames: false
        always_run: true
      - id: my-custom-check
        name: Custom validation
        entry: python scripts/validate.py
        language: python
        files: "\\.py$"

Skipping Hooks Selectively

# Skip a specific hook for a commit
SKIP=mypy git commit -m "WIP"

# Skip ALL hooks (use sparingly)
git commit --no-verify -m "emergency fix"

# Skip a file from a specific hook in config
# Add a regex to exclude files:
- id: black
  exclude: ^(legacy/|migrations/)

Common Pitfalls

  • Not pinning versions. rev: main means everyone gets a different version. Pin to a tag (rev: v1.2.3) and update via pre-commit autoupdate.
  • Hooks not running for new contributors. Each clone needs pre-commit install. Document it or use a Makefile target.
  • Hooks too slow. Slow hooks (mypy on huge codebases) make contributors skip via –no-verify. Either tune them (run only on changed files) or move to CI-only.
  • Bypass with –no-verify. Easy escape hatch. Run the same hooks in CI to catch bypasses.
  • Hook fixes uncommitted changes. A hook that auto-fixes (black, ruff –fix) modifies your file BUT doesn’t add to the commit. You have to re-add and commit again.

FAQ

Q: pre-commit or husky?
A: pre-commit for Python projects. husky for JS/Node. Both wrap git hooks; pre-commit has a richer ecosystem of Python hooks.

Q: Should I commit .pre-commit-config.yaml?
A: Always. It’s the source of truth for hook versions across the team.

Q: How do I bypass a hook on one file?
A: Either add the file to the hook’s exclude pattern or use a comment-based skip (each tool has its own — # noqa, # type: ignore, etc.).

Q: pre-commit.ci or self-hosted CI?
A: pre-commit.ci is a free service that auto-updates hook versions via PRs. Helpful for OSS; for private repos, your own CI is usually preferred.

Q: How fast should hooks be?
A: Under 5 seconds total for the common case. Beyond that, contributors disable. Move slow checks to CI-only.

Wrapping Up

pre-commit is the lowest-friction way to enforce code quality before commits land. The default Python config (black + ruff + trailing-whitespace + check-yaml) is a 5-minute setup that prevents 80% of style and trivial bug PRs. Run the same hooks in CI to catch bypasses. Pin versions, document the install step, and you have a self-maintaining quality bar that scales with the team.

How To Use Python Plotly for Interactive Data Visualizations

How To Use Python Plotly for Interactive Data Visualizations

Beginner

Static charts made with Matplotlib are fine for reports and papers, but when you’re presenting data to a stakeholder or building a data product, you want something the viewer can interact with: hover over a data point to see exact values, zoom into a region, click to filter by category, or toggle series on and off. That interactivity makes data comprehensible in ways that a frozen PNG never can.

Plotly is the leading Python library for interactive data visualizations. It renders charts directly in Jupyter notebooks and web browsers, produces publication-quality output, and supports over 40 chart types — from basic bar and line charts to heatmaps, 3D scatter plots, geographic maps, and animated timelines. Plotly has two APIs: plotly.express for quick one-line charts and plotly.graph_objects for fine-grained customization. Install it with pip install plotly.

In this tutorial you’ll create line charts, bar charts, scatter plots, and heatmaps using Plotly Express, learn how to customize layouts and colors, combine multiple traces into a single figure, export charts to HTML and PNG, and build a multi-panel dashboard. By the end you’ll be producing interactive visualizations in a fraction of the time static charts take.

Python Plotly: Quick Example

Here is a complete interactive line chart in four lines of code, using a dataset built directly into Plotly.

# quick_plotly.py
import plotly.express as px

# Load a built-in dataset (no external file needed)
df = px.data.gapminder().query("country == 'Australia'")

fig = px.line(df, x="year", y="lifeExp", title="Life Expectancy in Australia Over Time",
              labels={"lifeExp": "Life Expectancy (years)", "year": "Year"})
fig.show()  # Opens in your browser -- fully interactive (hover, zoom, pan)

# Save as standalone HTML file
fig.write_html("life_expectancy.html")
print("Chart saved as life_expectancy.html")

Output:

Chart saved as life_expectancy.html
(Interactive chart opens in browser at http://localhost:XXXXX)

px.data.gapminder() returns a pandas DataFrame with country-level statistics including GDP per capita, population, and life expectancy from 1952 to 2007. The fig.show() call opens the chart in your default browser. Every Plotly chart is interactive by default: hover over any point to see its exact values, click and drag to zoom, double-click to reset the view. The write_html() output is a self-contained HTML file you can share with anyone — no Python installation needed to view it.

Plotly Express vs Graph Objects

Plotly has two layers of API. Plotly Express (plotly.express or px) is a high-level wrapper that generates common chart types from a DataFrame in one function call. Graph Objects (plotly.graph_objects or go) is the lower-level API that gives you complete control over every element of the chart.

FeaturePlotly Express (px)Graph Objects (go)
Code length1-3 lines for most charts10-30+ lines for same chart
Data inputpandas DataFrame (column names)Raw arrays and dicts
CustomizationLimited to function parametersFull control over every trace
Multi-trace chartsVia color= and facet_ paramsManual add_trace() calls
Best forExploratory analysis, quick chartsProduction dashboards, edge cases

The best approach is to start with Plotly Express and drop down to Graph Objects only when you need something Express cannot do. You can also mix them: create a chart with Express and then call fig.update_layout() or fig.update_traces() to make fine-grained adjustments.

Tutorial image
fig.show(): static PNGs were a cry for help.

Common Chart Types with Plotly Express

Most data stories fit into a handful of chart types. Here is a working example for each of the most common ones, all using built-in Plotly datasets so you can run them without any external data files.

# common_charts.py
import plotly.express as px

# --- Bar Chart ---
df_tips = px.data.tips()
fig_bar = px.bar(df_tips, x="day", y="total_bill", color="sex",
                 barmode="group",
                 title="Restaurant Bills by Day and Gender",
                 labels={"total_bill": "Total Bill ($)", "day": "Day of Week"})
fig_bar.write_html("bar_chart.html")

# --- Scatter Plot ---
df_gap = px.data.gapminder().query("year == 2007")
fig_scatter = px.scatter(df_gap, x="gdpPercap", y="lifeExp",
                         size="pop", color="continent",
                         hover_name="country",
                         log_x=True,  # log scale on x axis
                         title="GDP vs Life Expectancy (2007)",
                         labels={"gdpPercap": "GDP per Capita (log scale)",
                                 "lifeExp": "Life Expectancy"})
fig_scatter.write_html("scatter_chart.html")

# --- Histogram ---
fig_hist = px.histogram(df_tips, x="total_bill", nbins=20,
                        color="smoker",
                        marginal="rug",  # adds a rug plot above histogram
                        title="Distribution of Restaurant Bills")
fig_hist.write_html("histogram.html")

# --- Box Plot ---
fig_box = px.box(df_tips, x="day", y="tip", color="sex",
                 title="Tip Distribution by Day and Gender",
                 points="all")  # show all data points as dots
fig_box.write_html("boxplot.html")

print("All charts saved as HTML files.")

Output:

All charts saved as HTML files.

The size="pop" argument in the scatter plot maps bubble size to the pop column — Plotly scales the circles automatically. hover_name="country" makes the country name bold in the hover tooltip. log_x=True applies a logarithmic scale to the x-axis, which is standard practice when GDP data spans several orders of magnitude. The marginal="rug" parameter on the histogram adds a one-dimensional scatter plot along the axis, showing individual data points in addition to the distribution shape.

Customizing Layouts and Styles

Plotly Express creates charts with sensible defaults, but you’ll almost always want to adjust titles, colors, fonts, axis labels, and layout for presentation. The update_layout() and update_traces() methods let you modify any aspect of the chart after creation.

# customization.py
import plotly.express as px
import plotly.graph_objects as go

df = px.data.gapminder().query("continent == 'Europe' and year == 2007")
fig = px.bar(df, x="country", y="gdpPercap",
             color="gdpPercap",
             color_continuous_scale="Viridis",
             title="GDP per Capita -- European Countries 2007")

# Customize the layout
fig.update_layout(
    title_font_size=20,
    title_font_family="Arial",
    xaxis_tickangle=-45,       # rotate x labels for readability
    xaxis_title="Country",
    yaxis_title="GDP per Capita (USD)",
    plot_bgcolor="white",
    paper_bgcolor="white",
    font=dict(family="Arial", size=12, color="#333333"),
    coloraxis_showscale=False,  # hide the color legend
    margin=dict(t=80, b=100, l=60, r=20),
    height=500
)

# Add a horizontal reference line at the European average
avg_gdp = df["gdpPercap"].mean()
fig.add_hline(y=avg_gdp, line_dash="dash", line_color="red",
              annotation_text=f"EU Avg: ${avg_gdp:,.0f}",
              annotation_position="top right")

fig.write_html("europe_gdp_styled.html")

# Export as a static PNG (requires kaleido: pip install kaleido)
try:
    fig.write_image("europe_gdp.png", width=1200, height=500, scale=2)
    print("PNG saved (requires kaleido package)")
except Exception:
    print("Install kaleido for PNG export: pip install kaleido")

Output:

PNG saved (requires kaleido package)

The add_hline() call adds an annotation line across the full width of the chart at a specific y-value. Similar methods include add_vline(), add_hrect() (horizontal band), and add_vrect() (vertical band). These are perfect for showing thresholds, targets, or date ranges. Static PNG export requires the kaleido package (pip install kaleido) in addition to Plotly — Plotly itself only handles the interactive HTML output natively.

Tutorial image
make_subplots(rows=2, cols=2): four charts for the price of one fig.show().

Multi-Trace and Subplot Charts

Many real dashboards combine multiple related charts in a single figure. Plotly’s make_subplots() function creates a grid of sub-axes that you can populate independently, while all sharing the same interactive controls.

# subplots.py
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

df = px.data.gapminder()
countries = ["United States", "China", "India", "Germany"]
colors = {"United States": "#1f77b4", "China": "#ff7f0e",
          "India": "#2ca02c", "Germany": "#d62728"}

fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=("Life Expectancy Over Time", "GDP Per Capita Over Time"),
    shared_xaxes=True  # zoom on one chart affects the other
)

for country in countries:
    country_df = df[df["country"] == country]
    color = colors[country]

    # Left chart: life expectancy
    fig.add_trace(
        go.Scatter(x=country_df["year"], y=country_df["lifeExp"],
                   mode="lines+markers", name=country,
                   line=dict(color=color, width=2),
                   legendgroup=country),  # group legend entries
        row=1, col=1
    )

    # Right chart: GDP per capita
    fig.add_trace(
        go.Scatter(x=country_df["year"], y=country_df["gdpPercap"],
                   mode="lines+markers", name=country,
                   line=dict(color=color, width=2),
                   legendgroup=country,
                   showlegend=False),  # avoid duplicate legend entries
        row=1, col=2
    )

fig.update_layout(
    title_text="Global Development Trends: Selected Countries",
    height=450,
    hovermode="x unified"  # show all traces at same x position on hover
)

fig.update_yaxes(title_text="Life Expectancy (years)", row=1, col=1)
fig.update_yaxes(title_text="GDP per Capita (USD)", row=1, col=2)

fig.write_html("development_dashboard.html")
print("Dashboard saved.")

Output:

Dashboard saved.

The legendgroup=country parameter links the two traces for each country so that clicking a country name in the legend shows or hides both its life expectancy and GDP lines simultaneously. hovermode="x unified" is a particularly useful layout setting: it shows all traces’ values for the same x-coordinate in a single tooltip, making it easy to compare values at the same year across countries.

Real-Life Example: Sales Performance Dashboard

Tutorial image
fig.write_html(): the only report your manager will actually interact with.

Here is a self-contained sales dashboard that generates realistic sample data and builds a multi-panel figure with KPI annotations, a trend line, and a product breakdown.

# sales_dashboard.py
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import random
from datetime import date, timedelta

# Generate realistic sample sales data
random.seed(42)
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
          "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
revenue = [random.randint(80000, 200000) for _ in months]
units = [random.randint(400, 900) for _ in months]
products = {"Widget A": 35, "Widget B": 25, "Widget C": 20, "Other": 20}

fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=("Monthly Revenue", "Units Sold",
                    "Revenue Trend (with 3-month avg)", "Product Mix"),
    specs=[[{"type": "bar"}, {"type": "scatter"}],
           [{"type": "scatter"}, {"type": "pie"}]]
)

# Top-left: Monthly revenue bar chart
fig.add_trace(
    go.Bar(x=months, y=revenue, name="Revenue",
           marker_color=["#2ecc71" if r >= 130000 else "#e74c3c" for r in revenue]),
    row=1, col=1
)

# Top-right: Units sold as scatter with lines
fig.add_trace(
    go.Scatter(x=months, y=units, mode="lines+markers", name="Units",
               line=dict(color="#3498db", width=2), marker=dict(size=8)),
    row=1, col=2
)

# Bottom-left: Revenue with 3-month moving average
moving_avg = [sum(revenue[max(0,i-2):i+1]) / len(revenue[max(0,i-2):i+1]) for i in range(len(revenue))]
fig.add_trace(go.Bar(x=months, y=revenue, name="Monthly Revenue",
                     marker_color="#bdc3c7", showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=months, y=moving_avg, mode="lines", name="3-Mo Avg",
                         line=dict(color="#e67e22", width=3, dash="dash")), row=2, col=1)

# Bottom-right: Product mix pie chart
fig.add_trace(
    go.Pie(labels=list(products.keys()), values=list(products.values()),
           name="Products", hole=0.4),
    row=2, col=2
)

total_rev = sum(revenue)
fig.update_layout(
    title_text=f"2026 Sales Dashboard -- Total Revenue: ${total_rev:,}",
    height=700,
    showlegend=True,
    plot_bgcolor="white"
)

fig.write_html("sales_dashboard.html")
print(f"Dashboard saved. Total revenue: ${total_rev:,}")

Output:

Dashboard saved. Total revenue: $1,567,000

The specs parameter in make_subplots() defines the chart type for each cell — this is required when mixing different trace types like bars, scatter plots, and pie charts in the same figure. The color list comprehension ["#2ecc71" if r >= 130000 else "#e74c3c" for r in revenue] colors revenue bars green if they exceed a threshold and red otherwise — a simple but effective way to highlight performance against a target. Extend this dashboard by wiring it up to a live database query and serving it with Flask or Dash for a real-time monitoring page.

Frequently Asked Questions

How do I display Plotly charts in Jupyter Notebook?

Plotly integrates with Jupyter automatically — fig.show() renders the interactive chart inline in the notebook cell. For JupyterLab, you may need to run pip install "jupyterlab>=3" ipywidgets and enable the Plotly extension. In VS Code’s Jupyter extension, charts render inline without any extra setup. If charts appear blank, try fig.show(renderer="iframe") as a fallback.

Does Plotly work with pandas DataFrames?

Yes, Plotly Express is designed around pandas DataFrames. You pass column names as strings to px.line(df, x="date_col", y="value_col") and Plotly handles the rest. You can also use Plotly’s pandas backend: import plotly.io as pio; pio.templates.default = "plotly_white" and then df.plot(backend="plotly") to use Plotly syntax on any pandas DataFrame plot call.

What is Plotly Dash and how is it different?

Dash is a full web application framework built on top of Plotly. While Plotly creates static interactive charts saved as HTML files, Dash lets you build live dashboards with Python callbacks: dropdowns, sliders, and buttons that trigger Python functions to update charts in real time. Dash runs as a web server. Install it with pip install dash. If you need a shareable, embeddable chart, use Plotly directly; if you need a live app with user controls, use Dash.

How do I export Plotly charts as PNG or PDF?

Static export requires the kaleido package: pip install kaleido. Then use fig.write_image("chart.png"), fig.write_image("chart.pdf"), or fig.write_image("chart.svg"). Use the scale parameter for higher resolution: fig.write_image("chart.png", scale=3) produces a 3x resolution image suitable for print. For batch export of many charts, reuse the same kaleido process by calling multiple write_image() calls on the same Python process.

How do I apply a consistent theme to all my charts?

Plotly includes several built-in themes: "plotly" (default), "plotly_white", "plotly_dark", "ggplot2", "seaborn", "simple_white", and "none". Set a global default with import plotly.io as pio; pio.templates.default = "plotly_white" and all subsequent charts will use that theme. You can also create custom templates by extending existing ones with pio.templates["my_theme"] = go.layout.Template(...).

Conclusion

Plotly makes interactive data visualization accessible in Python with minimal code. You learned to create line charts, bar charts, scatter plots, histograms, and box plots with Plotly Express, customize layouts with update_layout(), combine multiple traces with make_subplots(), and build a complete sales performance dashboard with moving averages and color-coded KPIs. The write_html() output is a shareable, self-contained file that any stakeholder can open in a browser without installing Python.

The natural next step is Plotly Dash for building live dashboards with interactive controls, or integrating Plotly charts into a Flask or FastAPI application as JSON responses rendered by the frontend. Both approaches build directly on the Plotly knowledge you’ve developed here.

Complete documentation and an extensive example gallery are available at plotly.com/python.

How To Use Alembic for Database Migrations in Python

How To Use Alembic for Database Migrations in Python

Intermediate

Adding a column to a live database sounds simple until you realize you need to do it on a production server with real user data, make the change repeatable across three developer laptops and a staging environment, and be able to roll it back if something goes wrong. Running raw ALTER TABLE statements by hand works once. It does not work as a team workflow across environments, and it leaves no record of what changed, when, or why.

Alembic is the database migration tool built specifically for SQLAlchemy. It tracks your schema version in a dedicated table, generates migration scripts automatically by comparing your models to the live database, and lets you apply (upgrade) or reverse (downgrade) changes one step at a time. Alembic is the standard migration tool for SQLAlchemy-based projects and is used in production by frameworks like FastAPI with SQLModel, and Flask with Flask-SQLAlchemy. Installing it is a single command: pip install alembic sqlalchemy.

In this tutorial you’ll set up Alembic in a real project, create and run migrations, handle common schema changes (add column, rename, add index), write manual migrations for data transformations, and use the downgrade mechanism safely. By the end you’ll have a versioned, team-friendly database workflow.

Alembic Migrations: Quick Example

Here is the fastest path from a blank project to a running migration. These are terminal commands, not Python scripts.

# Terminal commands

# Install dependencies
pip install alembic sqlalchemy

# Initialize Alembic in your project
alembic init alembic

# This creates:
#   alembic/           -- migration scripts directory
#   alembic/env.py     -- Alembic configuration (edit this to point at your DB)
#   alembic/versions/  -- migration script files go here
#   alembic.ini        -- Alembic settings file

# After configuring alembic.ini and env.py (see next section)...

# Generate a migration from model changes
alembic revision --autogenerate -m "add users table"

# Apply the migration
alembic upgrade head

# Check current version
alembic current

# Roll back one step
alembic downgrade -1

Output of alembic upgrade head:

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> a3f92b1c4d8e, add users table

Every migration is identified by a unique revision ID (the a3f92b1c4d8e part). Alembic records the current revision in a special table (alembic_version) in your database, so it always knows which migrations have run and which haven’t. The hash shown in upgrade head is the latest migration script’s revision ID.

What is Alembic and How Does It Work?

Alembic works by maintaining a chain of migration scripts. Each script has an upgrade() function that applies a change (add a column, create a table) and a downgrade() function that reverses it. Scripts are linked in a revision chain, and Alembic walks that chain to get from wherever your database currently is to wherever you want it to be.

ConceptWhat it means
RevisionA unique ID identifying one migration script
HeadThe latest revision (most up-to-date)
BaseThe initial state (no migrations applied)
alembic_versionTable Alembic creates in your DB to track current revision
upgradeApply one or more migrations forward
downgradeReverse one or more migrations backward
autogenerateAuto-detect schema changes by comparing models to DB

The --autogenerate flag is Alembic’s most powerful feature: it imports your SQLAlchemy models, inspects the live database, and generates the migration script automatically. However, autogenerate cannot detect everything — it misses data changes, stored procedures, and some constraint types — so always review generated scripts before running them.

Tutorial image
ALTER TABLE in production without a migration file. Enjoy your mystery schema.

Setting Up Alembic in a Project

Alembic needs to know two things: the database URL to connect to, and where to find your SQLAlchemy models. Both are configured in alembic.ini and alembic/env.py.

# models.py -- your SQLAlchemy models
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import DeclarativeBase
from datetime import datetime

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String(255), nullable=False, unique=True)
    username = Column(String(100), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True)
    title = Column(String(500), nullable=False)
    body = Column(String, nullable=False)
    author_id = Column(Integer, nullable=False)
# alembic/env.py -- edit the target_metadata line and database URL

import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Import your models Base so autogenerate can see the schema
from models import Base

config = context.config
fileConfig(config.config_file_name)

# This is the key line -- point autogenerate at your models
target_metadata = Base.metadata

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()

run_migrations_online()
# alembic.ini -- set your database URL
# Find the sqlalchemy.url line and update it:
# sqlalchemy.url = sqlite:///./myapp.db
# For PostgreSQL: postgresql://user:pass@localhost/mydb
# For MySQL:      mysql+pymysql://user:pass@localhost/mydb

The critical step is setting target_metadata = Base.metadata in env.py. This is what enables autogenerate to compare your Python models against the live database schema. If you skip this, autogenerate produces empty migration scripts. Make sure models.py is importable from env.py — add your project root to sys.path as shown above if needed.

Creating and Running Migrations

Once configured, the typical migration workflow is three commands: generate, review, apply.

# Terminal -- migration workflow

# Step 1: Generate a migration from current model state
alembic revision --autogenerate -m "create users and posts tables"
# Output: Generating /project/alembic/versions/a1b2c3d4e5f6_create_users_and_posts_tables.py

# Step 2: Review the generated file (always do this before applying)
cat alembic/versions/a1b2c3d4e5f6_create_users_and_posts_tables.py

Here is what a typical auto-generated migration script looks like:

# alembic/versions/a1b2c3d4e5f6_create_users_and_posts_tables.py
"""create users and posts tables

Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2026-05-16 08:30:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'a1b2c3d4e5f6'
down_revision = None   # None means this is the first migration
branch_labels = None
depends_on = None

def upgrade() -> None:
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('email', sa.String(length=255), nullable=False),
        sa.Column('username', sa.String(length=100), nullable=False),
        sa.Column('created_at', sa.DateTime(), nullable=True),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email'),
    )
    op.create_table(
        'posts',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('title', sa.String(length=500), nullable=False),
        sa.Column('body', sa.String(), nullable=False),
        sa.Column('author_id', sa.Integer(), nullable=False),
        sa.PrimaryKeyConstraint('id'),
    )

def downgrade() -> None:
    op.drop_table('posts')
    op.drop_table('users')
# Step 3: Apply the migration
alembic upgrade head

# Verify current state
alembic current
# INFO  [alembic.runtime.migration] Running upgrade  -> a1b2c3d4e5f6 (head)
# a1b2c3d4e5f6 (head)

# Show full history
alembic history --verbose

The down_revision = None indicates this is the first migration in the chain. Subsequent migrations will reference this revision ID in their own down_revision, forming the chain. Never edit a migration script after it has been applied to any shared environment — create a new migration instead.

Tutorial image
alembic history: the git log for your schema.

Common Schema Change Operations

Most real-world migrations involve adding columns, creating indexes, or renaming things. Here are migration scripts for the most common operations, showing both the upgrade() and downgrade() halves.

# Manual migration script: add columns and index
# alembic/versions/b2c3d4e5f6a7_add_user_bio_and_index.py

"""add bio to users and index on email

Revision ID: b2c3d4e5f6a7
Revises: a1b2c3d4e5f6
Create Date: 2026-05-16 09:00:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'b2c3d4e5f6a7'
down_revision = 'a1b2c3d4e5f6'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add a nullable column with a server-side default
    op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))

    # Add a column with NOT NULL -- requires a server_default for existing rows
    op.add_column('users', sa.Column('is_active', sa.Boolean(),
                                     nullable=False, server_default=sa.true()))

    # Create an index for faster email lookups
    op.create_index('ix_users_email', 'users', ['email'], unique=False)

    # Add a foreign key column to posts
    op.add_column('posts', sa.Column('published_at', sa.DateTime(), nullable=True))
    op.create_index('ix_posts_author_id', 'posts', ['author_id'])

def downgrade() -> None:
    op.drop_index('ix_posts_author_id', table_name='posts')
    op.drop_column('posts', 'published_at')
    op.drop_index('ix_users_email', table_name='users')
    op.drop_column('users', 'is_active')
    op.drop_column('users', 'bio')

The server_default=sa.true() is critical when adding a NOT NULL column to a table that already has rows. Without a default, the database will reject the ALTER because existing rows would have a NULL value in a NOT NULL column. Use server_default for the database-level default and default for the Python-level default (used by SQLAlchemy when inserting new rows). After adding the column with a server default, you can remove the default in a follow-up migration if desired.

Real-Life Example: Adding a Subscription System to an Existing App

Tutorial image
upgrade head in production. Then take a calm sip of coffee.

Here is a realistic scenario: adding a subscription tier system to an existing user table, including a data migration that sets a default tier for all existing users.

# alembic/versions/c3d4e5f6a7b8_add_subscription_tiers.py
"""add subscription tiers table and user tier column

Revision ID: c3d4e5f6a7b8
Revises: b2c3d4e5f6a7
Create Date: 2026-05-16 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
from sqlalchemy import String, Integer

revision = 'c3d4e5f6a7b8'
down_revision = 'b2c3d4e5f6a7'

def upgrade() -> None:
    # 1. Create the subscription_tiers lookup table
    op.create_table(
        'subscription_tiers',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('name', sa.String(50), nullable=False, unique=True),
        sa.Column('monthly_price_cents', sa.Integer(), nullable=False),
    )

    # 2. Seed the lookup data (data migration inside schema migration)
    tiers_table = table('subscription_tiers',
                        column('id', Integer),
                        column('name', String),
                        column('monthly_price_cents', Integer))
    op.bulk_insert(tiers_table, [
        {'id': 1, 'name': 'free', 'monthly_price_cents': 0},
        {'id': 2, 'name': 'pro', 'monthly_price_cents': 999},
        {'id': 3, 'name': 'enterprise', 'monthly_price_cents': 4999},
    ])

    # 3. Add tier_id to users with a default of 1 (free)
    op.add_column('users',
        sa.Column('tier_id', sa.Integer(),
                  sa.ForeignKey('subscription_tiers.id'),
                  nullable=False,
                  server_default='1'))

    # 4. Create an index for tier-based queries
    op.create_index('ix_users_tier_id', 'users', ['tier_id'])

def downgrade() -> None:
    op.drop_index('ix_users_tier_id', table_name='users')
    op.drop_column('users', 'tier_id')
    op.drop_table('subscription_tiers')
# Apply all pending migrations
alembic upgrade head

# Verify
alembic current
# c3d4e5f6a7b8 (head)

The op.bulk_insert() call seeds the lookup data as part of the migration, so the data migration runs atomically with the schema migration. The table() and column() helpers from sqlalchemy.sql let you reference tables without importing your model classes — this is important because model classes can change after the migration is written, but the migration must always produce the same result when re-run. Never import model classes directly inside migration scripts.

Frequently Asked Questions

What does alembic autogenerate miss?

Autogenerate detects table creation, table removal, column additions, column removals, column type changes, and unique constraint changes on most databases. It does NOT detect: changes to stored procedures, changes to data (you need data migrations for those), changes to indexes on some databases, changes to schema comments, or changes made outside of SQLAlchemy (e.g., manual ALTER TABLE commands run directly). Always review generated migration scripts before applying them.

Is it safe to downgrade in production?

Downgrading can be risky if the migration involved adding columns with data, dropping tables, or making irreversible data changes. Always test your downgrade function on a copy of production data before relying on it as a real rollback strategy. For critical migrations, write the downgrade function even if you never plan to use it — it forces you to think through the reversal and can save you in an emergency. For truly irreversible changes (e.g., dropping a table), add a comment and raise NotImplementedError in the downgrade function to make the intentionality explicit.

What does ‘multiple heads’ mean?

If two developers each create a migration from the same base revision, Alembic has two “heads” — two migration chains that diverged at the same point. You need to merge them with alembic merge heads, which creates a new merge revision that depends on both heads. This is analogous to a merge commit in Git. The merge revision has an empty upgrade/downgrade and simply links the chains back together.

How do I run migrations with zero downtime?

Zero-downtime migrations require careful ordering. The safe pattern is: first add the new column as nullable (no application change needed), then deploy the application code that writes to the new column, then backfill existing rows, then add the NOT NULL constraint. Never add a NOT NULL column without a server default in a single migration on a large production table — it locks the table during the ALTER on most databases. Tools like pg_repack for PostgreSQL can help with large table changes.

How do I use different database URLs for dev and production?

Set the database URL from an environment variable in env.py instead of hardcoding it in alembic.ini. Add this to env.py: config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"]). Then in CI/CD and production, set DATABASE_URL as a secret environment variable. Never put production credentials in alembic.ini — that file is typically committed to version control.

Conclusion

Alembic gives you a versioned, reproducible way to evolve your database schema over time. You learned to set up Alembic with alembic init, configure env.py to point at your SQLAlchemy models, generate migrations with alembic revision --autogenerate, apply them with alembic upgrade head, and roll back with alembic downgrade -1. The real-life example showed how to combine schema changes with seed data in a single atomic migration.

The most important habit to build is always reviewing autogenerated scripts before applying them and always writing correct downgrade functions. With those two practices in place, Alembic becomes the safety net that lets you evolve your database with confidence rather than dread.

The official Alembic documentation with a tutorial, cookbook, and API reference is at alembic.sqlalchemy.org.

How To Use Python poetry for Dependency and Project Management

How To Use Python poetry for Dependency and Project Management

Beginner

If you’ve ever cloned a Python project and spent an hour figuring out which packages to install, which versions are compatible, and why things work on your machine but not your colleague’s — you understand exactly why dependency management matters. The traditional combo of pip plus a hand-maintained requirements.txt works for simple scripts, but it breaks down quickly on real projects with dozens of dependencies and multiple contributors.

Poetry solves this by giving you a single tool that handles virtual environment creation, dependency resolution, version pinning, and package publishing — all driven by a single pyproject.toml file. Poetry is not the only tool in this space (pip-tools, pipenv, and uv all solve overlapping problems), but it has become one of the most widely adopted thanks to its clean CLI, deterministic lockfile, and first-class support for building and publishing packages to PyPI.

In this tutorial you’ll install Poetry, create a new project from scratch, add and manage dependencies, understand the lockfile, work with environments, and package your project for distribution. By the end you’ll have a complete workflow that replaces scattered requirements files with a reproducible, shareable project setup.

Python Poetry: Quick Example

Here is a minimal workflow — create a project, add a dependency, and run a script — to show how Poetry feels in practice.

# In your terminal (not Python) -- these are shell commands

# Install Poetry (run once)
curl -sSL https://install.python-poetry.org | python3 -

# Create a new project
poetry new myproject
cd myproject

# Add a dependency
poetry add requests

# Run a Python script inside Poetry's managed environment
poetry run python -c "import requests; print(requests.get('https://httpbin.org/get').status_code)"

Output:

200

The poetry new command creates a project directory with a pyproject.toml, a README.md, and a package skeleton. poetry add requests installs requests into a dedicated virtual environment, records it in pyproject.toml, and pins the exact version in poetry.lock. poetry run executes any command inside that managed environment without needing to manually activate it.

What is Poetry and Why Use It?

Poetry is a Python packaging and dependency management tool that uses pyproject.toml as its single source of truth. It replaces several tools that traditionally had to be combined: pip for installation, venv for environments, and setuptools for packaging. Poetry handles all three, plus it generates a lockfile that pins every transitive dependency to an exact version.

TaskTraditional approachPoetry approach
Install packagespip install packagepoetry add package
Create virtual envpython -m venv .venvAutomatic on first add/install
Pin versionspip freeze > requirements.txtpoetry.lock (automatic)
Dev vs prod depsrequirements-dev.txt separatelypoetry add –group dev package
Build + publishsetuptools + twine (separate)poetry build && poetry publish
Reproduce envpip install -r requirements.txtpoetry install

The lockfile is Poetry’s killer feature. When you run poetry add, Poetry solves the entire dependency graph — including every indirect dependency — and records the exact version of every package in poetry.lock. Anyone who clones your repo and runs poetry install gets the exact same environment, down to patch versions. This makes “it works on my machine” problems much rarer.

Tutorial image
poetry.lock: because ‘it worked yesterday’ is not a deployment strategy.

Installing Poetry and Setting Up a Project

The recommended way to install Poetry is the official installer, which places it in its own isolated environment separate from any project. This prevents Poetry itself from interfering with your project’s dependencies.

# install_poetry.sh -- run in terminal

# Install Poetry (Linux/macOS/WSL)
curl -sSL https://install.python-poetry.org | python3 -

# On Windows (PowerShell)
# (Invoke-WebRequest -Uri https://install.python-poetry.org -UseBasicParsing).Content | python -

# Verify installation
poetry --version
# Poetry (version 1.8.x)

# Configure Poetry to create virtual envs inside the project folder
# This makes it easier to see and delete the env
poetry config virtualenvs.in-project true

Output:

Poetry (version 1.8.3)

After installation, you may need to add Poetry’s bin directory to your PATH. The installer prints the exact path — typically ~/.local/bin on Linux/macOS. The virtualenvs.in-project true setting is optional but recommended: it creates the virtual environment in a .venv/ folder inside your project directory, which makes it visible in your IDE and easy to delete when switching Python versions.

Creating a New Project vs Initializing an Existing One

Poetry has two modes for starting: poetry new creates a fresh project directory from a template, and poetry init adds Poetry to an existing directory interactively.

# Terminal commands

# Start a brand new project
poetry new mywebapp
# Creates:
#   mywebapp/
#   mywebapp/pyproject.toml
#   mywebapp/README.md
#   mywebapp/mywebapp/__init__.py
#   mywebapp/tests/__init__.py

# OR add Poetry to an existing project
cd existing_project
poetry init
# Interactive prompts ask for project name, version, description,
# Python version constraint, and initial dependencies

# After either approach, install dependencies (creates the venv if needed)
poetry install

Output after poetry install:

Creating virtualenv mywebapp-2T5b5b6K-py3.11 in /home/user/.cache/pypoetry/virtualenvs
Installing dependencies from lock file

Package operations: 0 installs, 0 updates, 0 removals

The generated pyproject.toml contains everything Poetry needs to know about your project: name, version, description, Python version requirements, and dependencies. Commit this file and poetry.lock to version control — both are essential for reproducibility.

Adding, Updating, and Removing Dependencies

All dependency management flows through the poetry add, poetry update, and poetry remove commands. Poetry resolves the full dependency graph on every operation and updates the lockfile accordingly.

# Terminal commands for dependency management

# Add a runtime dependency
poetry add fastapi

# Add with version constraint
poetry add "sqlalchemy>=2.0,<3.0"

# Add a development-only dependency (not installed in production)
poetry add --group dev pytest pytest-cov black ruff

# Add an optional dependency (user must opt in)
poetry add --optional pandas

# See what's installed
poetry show

# See dependency tree (what depends on what)
poetry show --tree

# Update a specific package to the latest allowed version
poetry update requests

# Update ALL packages
poetry update

# Remove a package
poetry remove httpx

Output of poetry show --tree (excerpt):

fastapi 0.111.0 FastAPI framework
|-- anyio >=3.7.1,<5
|   |-- idna >=2.8
|   `-- sniffio >=1.1
|-- pydantic >=1.7.4,!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,!=2.1.0,<3.0.0
|   |-- annotated-types >=0.4.0
|   `-- pydantic-core 2.18.2
`-- starlette >=0.37.2,<0.38.0

The --group dev flag separates development tools from production dependencies. When you deploy your application and run poetry install --without dev, dev dependencies are skipped, keeping your production image lean. The dependency tree view is invaluable for understanding why a particular package version is installed -- if a constraint is preventing an upgrade, the tree shows you which package is imposing it.

Tutorial image
poetry show --tree: finally understanding why numpy is v1.24 and not v2.

Working with Environments

Poetry automatically creates and manages a virtual environment for each project. You do not need to run python -m venv or source .venv/bin/activate manually. Instead, prefix commands with poetry run or drop into the environment with poetry shell.

# Terminal environment commands

# Run a single command inside Poetry's environment
poetry run python script.py
poetry run pytest
poetry run python -m flask run

# Activate the environment in your current shell session
poetry shell
# Now python, pip, and any installed scripts are from Poetry's env
# Type 'exit' or Ctrl-D to leave the shell

# See where the virtual environment lives
poetry env info
# Output:
# Virtualenv
# Python:         3.11.9
# Implementation: CPython
# Path:           /home/user/mywebapp/.venv
# Executable:     /home/user/mywebapp/.venv/bin/python

# Use a specific Python version (must be installed on your system)
poetry env use python3.12

# List all environments Poetry knows about for this project
poetry env list

# Delete the current environment (useful when switching Python versions)
poetry env remove python3.11

Output of poetry env info (excerpt):

Virtualenv
Python:         3.11.9
Implementation: CPython
Path:           /home/user/mywebapp/.venv
Executable:     /home/user/mywebapp/.venv/bin/python
Valid:           True

Most IDEs (VS Code, PyCharm) can detect Poetry environments automatically when virtualenvs.in-project is set to true -- just point the IDE at .venv/bin/python or select it from the interpreter picker. For CI/CD pipelines, use poetry run pytest rather than activating the shell, since shell activation can behave differently across CI environments.

Real-Life Example: Setting Up a FastAPI Project with Poetry

Tutorial image
One toml to rule them all, one lockfile to find them.

Here is a complete example of bootstrapping a FastAPI project with Poetry, including runtime dependencies, dev tooling, and a script entrypoint. This is the setup pattern used in real production Python services.

# Terminal commands to bootstrap a FastAPI project

poetry new fastapi-demo
cd fastapi-demo

# Add runtime dependencies
poetry add fastapi "uvicorn[standard]" pydantic-settings

# Add dev dependencies
poetry add --group dev pytest httpx pytest-asyncio ruff black

# Check the generated pyproject.toml
cat pyproject.toml

pyproject.toml output:

[tool.poetry]
name = "fastapi-demo"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.111.0"
uvicorn = {extras = ["standard"], version = "^0.29.0"}
pydantic-settings = "^2.2.1"

[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
httpx = "^0.27.0"
pytest-asyncio = "^0.23.0"
ruff = "^0.4.0"
black = "^24.4.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# fastapi_demo/main.py
from fastapi import FastAPI

app = FastAPI(title="Demo API")

@app.get("/health")
async def health():
    return {"status": "ok"}
# Run the app
poetry run uvicorn fastapi_demo.main:app --reload

# Run tests
poetry run pytest -v

# Lint and format
poetry run ruff check .
poetry run black .

This pattern keeps everything reproducible. A new developer clones the repo, runs poetry install, and gets the exact same environment with all runtime and dev dependencies. The lockfile (poetry.lock) guarantees no version surprises, and the pyproject.toml serves as the single source of truth for project metadata, dependencies, and tool configuration.

Frequently Asked Questions

Should I commit poetry.lock to version control?

Yes, always commit poetry.lock for applications. It ensures every developer and every CI run uses identical package versions. For libraries that other projects depend on, the recommendation is more nuanced -- many library maintainers still commit the lockfile to pin CI test dependencies, but the lockfile is not used when someone installs your library as a dependency. The pyproject.toml version constraints are what matter for library consumers.

Can I export a requirements.txt from Poetry?

Yes: run poetry export -f requirements.txt --output requirements.txt --without-hashes. This is useful when deploying to environments that don't have Poetry installed (e.g., a Docker container that uses pip). The --without-hashes flag makes the file compatible with plain pip. For dev requirements, add --with dev to the export command.

How do I change the Python version for a Poetry project?

First install the target Python version on your system (via pyenv, asdf, or your OS package manager). Then run poetry env use python3.12 to point Poetry at the new interpreter. Poetry will create a fresh virtual environment for that version. Update the python = "^3.12" constraint in pyproject.toml if needed, then run poetry install to reinstall all dependencies.

How do I use Poetry in GitHub Actions?

Install Poetry as a step before your Python setup, then use poetry install to install dependencies. Cache the virtual environment using the lockfile hash as the cache key: hashFiles('poetry.lock'). The official Poetry documentation provides ready-to-use GitHub Actions workflow examples. Alternatively, use the popular snok/install-poetry action which handles installation and PATH setup automatically.

Poetry vs pip-tools vs uv -- which should I use?

All three solve dependency locking but have different strengths. Poetry is the most full-featured: it handles environments, dependencies, and building/publishing in one tool, making it great for both applications and libraries. pip-tools is minimal and stays close to pip -- good if you want explicit control and a lighter tool. uv is extremely fast (Rust-based) and is gaining rapid adoption for its speed; it now has a compatibility mode for pyproject.toml projects. For new projects in 2026, Poetry or uv are both solid choices.

Conclusion

Poetry simplifies Python project management by replacing the fragmented combination of pip, venv, and requirements files with a single unified tool driven by pyproject.toml. You learned to create projects with poetry new, add dependencies with poetry add, separate dev and production requirements with dependency groups, inspect the dependency tree, manage virtual environments, and set up a complete FastAPI project using the Poetry workflow.

The reproducibility that comes from committing poetry.lock to version control is the biggest practical benefit -- it eliminates an entire category of "works on my machine" bugs and makes onboarding new team members faster. Try converting one of your existing projects to Poetry by running poetry init in the project root and migrating your existing requirements files.

Find the full Poetry documentation and a migration guide from pip at python-poetry.org/docs.

How To Use Python Pillow for Image Processing

How To Use Python Pillow for Image Processing

Beginner

Whether you need to resize product images before uploading them to a web store, watermark a batch of photos, convert a folder of PNGs to JPEGs, or draw bounding boxes around objects in a computer vision pipeline — image processing comes up constantly in Python projects. Doing this manually in a GUI photo editor is fine for one or two images, but when you have hundreds or thousands to process, you need code.

Pillow is Python’s go-to library for image processing. It’s the actively maintained fork of the original PIL (Python Imaging Library) and supports over 30 image formats including JPEG, PNG, GIF, BMP, TIFF, and WebP. Pillow lets you open, manipulate, and save images with a clean, intuitive API that works on Windows, Mac, and Linux. Installing it takes one command: pip install Pillow.

In this tutorial you’ll learn how to open and inspect images, resize and crop them, apply filters and adjustments, convert between formats, draw text and shapes, and run a practical batch processing script. By the end you’ll be able to automate image workflows that would otherwise take hours of manual effort.

Python Pillow: Quick Example

Here is a minimal working example that opens an image, resizes it, applies a sharpening filter, and saves the result. You can run this against any JPEG or PNG on your computer — just swap the filename.

# quick_pillow.py
from PIL import Image, ImageFilter

# Open the image
img = Image.open("photo.jpg")
print(f"Original size: {img.size}")   # (width, height) in pixels
print(f"Format: {img.format}")
print(f"Mode: {img.mode}")

# Resize to 800px wide, preserving aspect ratio
base_width = 800
ratio = base_width / img.width
new_height = int(img.height * ratio)
resized = img.resize((base_width, new_height), Image.LANCZOS)

# Apply a sharpening filter
sharpened = resized.filter(ImageFilter.SHARPEN)

# Save the result
sharpened.save("photo_resized.jpg", quality=90)
print(f"Saved as photo_resized.jpg ({sharpened.size[0]}x{sharpened.size[1]})")

Output:

Original size: (3024, 4032)
Format: JPEG
Mode: RGB
Saved as photo_resized.jpg (800x1067)

The key import is from PIL import Image — note that even though the package is called Pillow, the import name is still PIL for backward compatibility. The Image.open() call does not load the full pixel data into memory immediately; it’s lazy-loaded when you first access pixel data or call a transform. The Image.LANCZOS resampling filter produces high-quality results when downscaling.

The sections below dig into resizing, cropping, color adjustments, drawing, format conversion, and a real batch processing project.

What is Pillow and Why Use It?

Pillow is a Python imaging library that gives you a consistent interface for reading and writing image files, and for performing common image operations. Think of it as the Swiss Army knife of image manipulation — it does not replace specialized tools like OpenCV for real-time computer vision, but it handles the vast majority of day-to-day image tasks with minimal code.

The two most important concepts in Pillow are the Image object and image modes. Every image you open or create is represented as an Image object. The mode describes how pixel data is stored:

ModeDescriptionChannels
RGBStandard color (red, green, blue)3
RGBAColor + transparency (alpha channel)4
LGrayscale (luminance)1
PPalette-mapped (indexed colors)1 (with palette)
CMYKPrint color (cyan, magenta, yellow, key/black)4
11-bit black and white1

Understanding modes matters when converting between formats — for example, JPEG does not support transparency, so saving an RGBA image as JPEG requires converting to RGB first. Pillow will raise an error if you skip this step, which is a common beginner mistake.

Tutorial image
img.open() — because your 4032px camera photo shouldn’t stay 4032px.

Opening and Inspecting Images

Before manipulating an image, it helps to inspect its properties. Pillow exposes size, mode, format, and EXIF metadata as attributes on the Image object.

# inspect_image.py
from PIL import Image, ExifTags

img = Image.open("photo.jpg")

print(f"Size (W x H): {img.size}")
print(f"Width: {img.width}px, Height: {img.height}px")
print(f"Format: {img.format}")
print(f"Mode: {img.mode}")
print(f"Info keys: {list(img.info.keys())}")

# Read EXIF data if present
exif_data = img._getexif()
if exif_data:
    for tag_id, value in exif_data.items():
        tag_name = ExifTags.TAGS.get(tag_id, tag_id)
        if tag_name in ("Make", "Model", "DateTime", "GPSInfo"):
            print(f"  EXIF {tag_name}: {value}")

Output:

Size (W x H): (3024, 4032)
Width: 3024px, Height: 4032px
Format: JPEG
Mode: RGB
Info keys: ['jfif', 'jfif_version', 'jfif_unit', 'jfif_density', 'dpi', 'exif']
  EXIF Make: Apple
  EXIF Model: iPhone 14 Pro
  EXIF DateTime: 2026:01:15 14:22:08

The img._getexif() method returns a dictionary of raw EXIF tag IDs mapped to values. Wrapping with ExifTags.TAGS.get() converts numeric tag IDs to human-readable names. Not all images have EXIF data — PNG files typically do not — so always check for None before iterating.

Resizing and Cropping Images

Resizing and cropping are the most common image operations. Pillow provides resize() for changing dimensions and crop() for extracting a rectangular region. The thumbnail() method is a convenient shortcut that resizes an image in place while preserving aspect ratio.

# resize_crop.py
from PIL import Image

img = Image.open("photo.jpg")

# --- Resize to exact dimensions ---
exact = img.resize((640, 480), Image.LANCZOS)
exact.save("exact_640x480.jpg")

# --- Resize preserving aspect ratio using thumbnail() ---
thumb = img.copy()
thumb.thumbnail((400, 400), Image.LANCZOS)  # fits within 400x400 box
thumb.save("thumbnail_400.jpg")
print(f"Thumbnail size: {thumb.size}")  # e.g., (300, 400) -- longest side = 400

# --- Crop a specific region (left, upper, right, lower) ---
# Coordinates are in pixels from the top-left corner
box = (100, 200, 900, 800)   # crop a 800x600 region
cropped = img.crop(box)
cropped.save("cropped_region.jpg")
print(f"Cropped size: {cropped.size}")  # (800, 600)

# --- Center crop to a square ---
width, height = img.size
short_side = min(width, height)
left = (width - short_side) // 2
top = (height - short_side) // 2
square_box = (left, top, left + short_side, top + short_side)
square = img.crop(square_box)
square.save("square_crop.jpg")
print(f"Square size: {square.size}")

Output:

Thumbnail size: (300, 400)
Cropped size: (800, 600)
Square size: (3024, 3024)

The thumbnail() method modifies the image in place and never enlarges it — if the image is already smaller than the specified box, nothing happens. For resizing up (enlarging), use resize() with Image.BICUBIC or Image.LANCZOS. LANCZOS produces the sharpest results when downscaling; BICUBIC is slightly faster and works well for upscaling.

Tutorial image
thumbnail() never upsizes. resize() never apologizes.

Color Adjustments and Filters

Pillow provides two modules for adjusting image appearance: ImageFilter for convolution-based filters (blur, sharpen, edge detection) and ImageEnhance for adjusting brightness, contrast, color, and sharpness numerically. Both modules are easy to compose — you can chain multiple enhancements one after another.

# color_filters.py
from PIL import Image, ImageFilter, ImageEnhance

img = Image.open("photo.jpg")

# --- Built-in convolution filters ---
blurred = img.filter(ImageFilter.GaussianBlur(radius=3))
blurred.save("blurred.jpg")

sharpened = img.filter(ImageFilter.SHARPEN)
sharpened.save("sharpened.jpg")

edges = img.filter(ImageFilter.FIND_EDGES)
edges.save("edges.jpg")

# --- ImageEnhance adjustments ---
# Each enhance() call takes a factor: 1.0 = original, 0.0 = minimum, 2.0 = double

# Boost contrast by 50%
contrast_img = ImageEnhance.Contrast(img).enhance(1.5)
contrast_img.save("high_contrast.jpg")

# Reduce brightness (simulate underexposure)
dim_img = ImageEnhance.Brightness(img).enhance(0.6)
dim_img.save("dim.jpg")

# Boost color saturation for vivid colors
vivid = ImageEnhance.Color(img).enhance(1.8)
vivid.save("vivid_colors.jpg")

# Convert to grayscale using mode conversion
grayscale = img.convert("L")
grayscale.save("grayscale.jpg")
print("All filter variations saved.")

Output:

All filter variations saved.

The ImageFilter.GaussianBlur(radius=3) call applies a Gaussian blur with a radius of 3 pixels — larger values produce a stronger blur. FIND_EDGES is a Laplacian edge detection filter that highlights boundaries between light and dark areas. The ImageEnhance factor of 1.0 always returns the original unchanged; values below 1.0 reduce the effect and values above 1.0 amplify it. Chaining is straightforward: apply one enhancer, pass the result to the next.

Converting Image Formats

Pillow handles format conversion transparently — you simply open one format and save as another. The main gotcha is the RGBA-to-JPEG conversion: JPEG does not support an alpha (transparency) channel, so you must convert RGBA to RGB first or Pillow will raise a OSError: cannot write mode RGBA as JPEG.

# format_conversion.py
from PIL import Image
import os

def convert_png_to_jpeg(png_path, output_path, quality=85):
    """Convert a PNG (possibly with transparency) to JPEG."""
    img = Image.open(png_path)

    # Handle transparency by compositing onto a white background
    if img.mode in ("RGBA", "P"):
        background = Image.new("RGB", img.size, (255, 255, 255))
        if img.mode == "P":
            img = img.convert("RGBA")
        background.paste(img, mask=img.split()[3])  # use alpha channel as mask
        img = background
    elif img.mode != "RGB":
        img = img.convert("RGB")

    img.save(output_path, "JPEG", quality=quality, optimize=True)

    original_size = os.path.getsize(png_path) / 1024
    converted_size = os.path.getsize(output_path) / 1024
    print(f"Converted: {png_path} -> {output_path}")
    print(f"  Size: {original_size:.1f}KB -> {converted_size:.1f}KB")

# Convert a single file
convert_png_to_jpeg("logo.png", "logo.jpg", quality=90)

# Convert all PNGs in a folder
input_folder = "images_png"
output_folder = "images_jpg"
os.makedirs(output_folder, exist_ok=True)

for filename in os.listdir(input_folder):
    if filename.lower().endswith(".png"):
        src = os.path.join(input_folder, filename)
        dst = os.path.join(output_folder, filename.replace(".png", ".jpg"))
        convert_png_to_jpeg(src, dst)

Output:

Converted: logo.png -> logo.jpg
  Size: 142.3KB -> 28.7KB
Converted: images_png/banner.png -> images_jpg/banner.jpg
  ...

The img.split()[3] extracts the alpha channel from an RGBA image for use as a paste mask. This composite approach preserves the visual appearance of semi-transparent pixels by blending them against a white background, which is the standard approach for web output. The optimize=True flag tells the JPEG encoder to run an extra optimization pass to reduce file size at the same quality level.

Tutorial image
RGBA to JPEG. You’ve been warned. Composite it or crash.

Drawing Text and Shapes on Images

The ImageDraw module lets you draw directly onto an image — rectangles, circles, lines, polygons, and text. This is useful for adding watermarks, annotating images, drawing bounding boxes around detected objects, or generating thumbnails with overlaid metadata.

# drawing.py
from PIL import Image, ImageDraw, ImageFont

# Create a blank 800x400 canvas with a dark background
canvas = Image.new("RGB", (800, 400), color=(30, 30, 40))
draw = ImageDraw.Draw(canvas)

# Draw a rectangle (border box)
draw.rectangle([(50, 50), (350, 200)], outline=(0, 200, 255), width=3)

# Draw a filled circle (ellipse with equal width/height bounding box)
draw.ellipse([(420, 50), (620, 250)], fill=(255, 100, 0), outline=(255, 255, 255), width=2)

# Draw a line
draw.line([(50, 300), (750, 300)], fill=(100, 255, 100), width=2)

# Draw a polygon (triangle)
draw.polygon([(650, 50), (750, 200), (550, 200)], fill=(180, 0, 255), outline=(255, 255, 255))

# Add text (using default font -- no system font required)
try:
    font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 32)
except OSError:
    font = ImageFont.load_default()

draw.text((50, 330), "Python Pillow Drawing Demo", fill=(255, 255, 255), font=font)

canvas.save("drawing_demo.png")
print("Drawing saved as drawing_demo.png")

# --- Add a watermark to an existing photo ---
photo = Image.open("photo.jpg").convert("RGBA")
overlay = Image.new("RGBA", photo.size, (0, 0, 0, 0))
draw_overlay = ImageDraw.Draw(overlay)

# Semi-transparent watermark text
draw_overlay.text((20, photo.height - 60), "c 2026 MyBlog", fill=(255, 255, 255, 128), font=font)

watermarked = Image.alpha_composite(photo, overlay).convert("RGB")
watermarked.save("watermarked_photo.jpg", quality=92)
print("Watermarked photo saved.")

Output:

Drawing saved as drawing_demo.png
Watermarked photo saved.

The ImageDraw.Draw() call returns a drawing context tied to the image. All drawing operations modify the underlying image in place. For watermarks, the recommended pattern is to create a transparent RGBA overlay, draw on it, then use Image.alpha_composite() to merge it with the original photo. This avoids permanently altering the original image in memory until you explicitly save. The alpha value of 128 in the text fill means 50% transparent.

Real-Life Example: Batch Image Resizer for a Web Store

Tutorial image
1200 product photos. One for loop. Zero manual resizing.

E-commerce platforms usually require product images at specific dimensions (e.g., 800×800 pixels) with a white background. The script below takes a folder of raw product photos in various sizes and formats, processes them into web-ready square images, and saves a report of what was converted.

# batch_resize.py
import os
from PIL import Image, ImageOps

def process_product_image(src_path, dst_path, size=(800, 800), bg_color=(255, 255, 255)):
    """
    Resize + pad a product image to a square canvas with white background.
    Handles JPEG, PNG, and WEBP input.
    Returns a dict with info about the conversion.
    """
    result = {"src": src_path, "success": False, "error": None, "original_size": None, "output_size": None}

    try:
        img = Image.open(src_path)
        result["original_size"] = img.size

        # Convert to RGBA to handle transparency
        if img.mode != "RGBA":
            img = img.convert("RGBA")

        # Use ImageOps.pad() to fit image into target box, adding background
        padded = ImageOps.pad(img, size, color=bg_color + (255,), method=Image.LANCZOS)

        # Composite onto white background (handle any remaining transparency)
        background = Image.new("RGB", size, bg_color)
        background.paste(padded, mask=padded.split()[3])

        background.save(dst_path, "JPEG", quality=88, optimize=True)
        result["success"] = True
        result["output_size"] = background.size
    except Exception as e:
        result["error"] = str(e)

    return result

def batch_process(input_dir, output_dir, size=(800, 800)):
    os.makedirs(output_dir, exist_ok=True)
    valid_extensions = {".jpg", ".jpeg", ".png", ".webp", ".bmp"}
    files = [f for f in os.listdir(input_dir) if os.path.splitext(f)[1].lower() in valid_extensions]

    print(f"Processing {len(files)} images...")
    results = []

    for filename in files:
        src = os.path.join(input_dir, filename)
        base = os.path.splitext(filename)[0]
        dst = os.path.join(output_dir, f"{base}.jpg")
        r = process_product_image(src, dst, size=size)
        results.append(r)
        status = "OK" if r["success"] else f"FAIL: {r['error']}"
        print(f"  [{status}] {filename} {r.get('original_size','')} -> {r.get('output_size','')}")

    success_count = sum(1 for r in results if r["success"])
    fail_count = len(results) - success_count
    print(f"\nDone: {success_count} converted, {fail_count} failed.")

# Run the batch processor
batch_process("raw_products", "web_ready", size=(800, 800))

Output:

Processing 6 images...
  [OK] shirt_front.jpg (1200, 1800) -> (800, 800)
  [OK] mug_photo.png (2048, 2048) -> (800, 800)
  [OK] banner.webp (1920, 600) -> (800, 800)
  [FAIL: cannot identify image file] corrupt_file.jpg None -> None
  [OK] hat_side.jpg (3024, 4032) -> (800, 800)
  [OK] tshirt_back.png (800, 1200) -> (800, 800)

Done: 5 converted, 1 failed.

ImageOps.pad() is a convenience method that resizes the image to fit inside the target box while preserving aspect ratio, then pads the empty space with the background color. This avoids the stretching you’d get from a plain resize() call on non-square inputs. The error handling means one corrupt file does not crash the whole batch — it gets logged and processing continues. Extend this script by adding a CSV report writer or sending a Slack alert when failures exceed a threshold.

Frequently Asked Questions

How do I install Pillow?

Run pip install Pillow in your terminal. Note that the package name is Pillow (capital P), not PIL. The import in your code is still from PIL import Image because Pillow is a drop-in replacement for the original PIL and keeps its import namespace. If you’re using a virtual environment (recommended), activate it first before installing.

What image formats does Pillow support?

Pillow supports over 30 formats out of the box, including JPEG, PNG, GIF, BMP, TIFF, WebP, ICO, and PSD. For some formats like TIFF with certain compressions or raw camera formats (CR2, NEF), you may need additional system libraries. Check the full list with from PIL import features; features.pilinfo() to see what’s available on your installation.

My resized images are rotated incorrectly — how do I fix it?

Camera phones store rotation information in EXIF data rather than actually rotating the pixel data. When Pillow opens the file, it uses the raw pixel orientation and ignores EXIF rotation. Use from PIL import ImageOps; img = ImageOps.exif_transpose(img) right after opening the image to automatically apply the EXIF rotation. This should be a standard step in any pipeline that processes photos from mobile devices.

Why does processing many large images crash with MemoryError?

A 12-megapixel JPEG may only be 3MB on disk but expands to 36MB+ in memory as an uncompressed RGB array. Processing 100 such images simultaneously would require 3.6GB of RAM. The solution is to process images one at a time in a loop and let Python garbage-collect each image after saving. Explicitly closing images with img.close() or using a with Image.open(path) as img: context manager helps free memory immediately.

When should I use Pillow vs OpenCV?

Use Pillow for general-purpose image manipulation — resizing, cropping, format conversion, watermarking, and batch processing. Use OpenCV when you need real-time video processing, feature detection, object tracking, or computer vision algorithms. OpenCV uses NumPy arrays as its image representation, which makes it faster for numerical operations but less convenient for simple file-oriented tasks. Many projects use both: Pillow for loading and saving, OpenCV or NumPy for the heavy computation.

Conclusion

Pillow gives you everything you need to handle images in Python without reaching for a GUI tool. In this tutorial you used Image.open() to inspect image metadata, resize() and thumbnail() to change dimensions, crop() to extract regions, ImageFilter and ImageEnhance to apply visual effects, ImageDraw to overlay text and shapes, and a complete batch processing pipeline to convert and pad product images to a uniform size.

A natural next step is combining Pillow with os.walk() to process entire directory trees recursively, or integrating it into a Flask or FastAPI endpoint that accepts image uploads and returns processed results. For advanced processing needs — object detection, face recognition, image segmentation — pair Pillow with OpenCV or PyTorch’s torchvision library.

The official Pillow documentation at pillow.readthedocs.io is comprehensive and includes format-specific notes that are essential when targeting specific output formats or platforms.

How To Use Python kafka-python for Event Streaming

How To Use Python kafka-python for Event Streaming

Intermediate

Your microservices need to talk to each other, but direct HTTP calls create tight coupling, fragile retry logic, and cascading failures. Apache Kafka solves this with a durable, high-throughput message log that decouples producers from consumers — publish an event once, any number of services can consume it independently, at their own pace, with replay if something goes wrong. Understanding Kafka is increasingly a must-have skill for Python backend engineers.

The kafka-python library is the most widely used Python client for Apache Kafka. Install it with pip install kafka-python. You will also need a running Kafka broker — the examples below show how to run one locally with Docker in a single command.

In this article we will cover producing messages to Kafka topics, consuming messages with consumer groups, serializing and deserializing JSON payloads, handling consumer offsets and at-least-once delivery, and building a complete event-driven pipeline. By the end you will be able to wire Python services together with Kafka.

kafka-python: Quick Example

Start Kafka locally with Docker (one-time setup):

# Run Kafka locally with KRaft mode (no ZooKeeper needed)
# docker run -d --name kafka -p 9092:9092 \
#   -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
#   -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
#   -e KAFKA_CFG_NODE_ID=1 \
#   -e KAFKA_CFG_PROCESS_ROLES=broker,controller \
#   -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
#   -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
#   bitnami/kafka:latest

Then produce and consume your first message:

# quick_kafka.py
from kafka import KafkaProducer, KafkaConsumer

BOOTSTRAP = "localhost:9092"
TOPIC = "quickstart"

# --- Producer ---
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP)
producer.send(TOPIC, b"Hello from Python!")
producer.flush()
print("Message sent.")
producer.close()

# --- Consumer ---
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=BOOTSTRAP,
    auto_offset_reset="earliest",   # Read from the beginning of the topic
    consumer_timeout_ms=3000,       # Stop after 3 seconds of no messages
)
for message in consumer:
    print(f"Received: {message.value.decode()} (offset={message.offset})")
consumer.close()

Output:

Message sent.
Received: Hello from Python! (offset=0)

The producer sends bytes to the topic and flushes to ensure delivery before the script exits. The consumer reads from the beginning (auto_offset_reset="earliest") and stops after 3 seconds of silence (consumer_timeout_ms=3000), making it convenient for scripts that don’t want to run forever.

What Is Kafka and When Should You Use It?

Apache Kafka is a distributed event streaming platform — a durable, ordered, append-only log that retains messages for a configurable period (days to forever). Producers write events to topics; consumers read from them independently, maintaining their own position (offset) in the log.

FeatureKafkaRedis Pub/SubRabbitMQ
Message durabilityDurable (disk-based)In-memory onlyDurable (optional)
Replay old messagesYes (seek to offset)NoNo (once consumed)
Multiple consumersYes (consumer groups)Yes (fan-out)Yes (competing/fan-out)
ThroughputVery high (millions/sec)HighMedium
ComplexityHigh (broker setup)LowMedium

Use Kafka when you need durable, replayable event streams across multiple services. Use Redis Pub/Sub for simple real-time fan-out where loss is acceptable. Use RabbitMQ for traditional work queues with routing logic.

Producing JSON Messages

Real-world applications rarely send raw bytes. You want to serialize Python dictionaries to JSON so consumers can decode them easily. Configure the KafkaProducer with a value_serializer that handles the encoding automatically.

# json_producer.py
import json
from kafka import KafkaProducer
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8") if k else None,
    acks="all",                 # Wait for all replicas to confirm
    retries=3,                  # Retry on transient errors
    compression_type="gzip",    # Compress messages in transit
)

TOPIC = "user-events"

def publish_event(event_type: str, user_id: int, payload: dict) -> None:
    event = {
        "event_type": event_type,
        "user_id": user_id,
        "timestamp": datetime.utcnow().isoformat(),
        "payload": payload,
    }
    # Use user_id as key for partition affinity (same user -> same partition)
    future = producer.send(TOPIC, value=event, key=str(user_id))
    record_metadata = future.get(timeout=10)  # Block until confirmed
    print(f"Sent {event_type} for user {user_id} -> "
          f"partition={record_metadata.partition} offset={record_metadata.offset}")

publish_event("user_signup", 1001, {"email": "alice@example.com", "plan": "pro"})
publish_event("user_login",  1001, {"ip": "192.168.1.100", "device": "Chrome"})
publish_event("user_signup", 1002, {"email": "bob@example.com", "plan": "free"})

producer.flush()
producer.close()

Output:

Sent user_signup for user 1001 -> partition=0 offset=0
Sent user_login for user 1001 -> partition=0 offset=1
Sent user_signup for user 1002 -> partition=1 offset=0

Using the user ID as the message key ensures all events for the same user land on the same partition, which preserves ordering per user. The acks="all" setting waits for all in-sync replicas to confirm the write before returning — this is the safest setting for data that must not be lost.

acks=all -- your event stream is not the place for fire-and-forget.
acks=all — your event stream is not the place for fire-and-forget.

Consumer Groups and Parallel Processing

Consumer groups allow multiple instances of your service to consume from the same topic in parallel, each handling a different partition. Kafka guarantees that each partition is consumed by exactly one consumer in a group at a time — this is how you scale out consumption without processing the same message twice.

# json_consumer.py
import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers="localhost:9092",
    group_id="analytics-service",    # Consumer group ID
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    key_deserializer=lambda k: k.decode("utf-8") if k else None,
    auto_offset_reset="earliest",    # Start from beginning if no committed offset
    enable_auto_commit=True,         # Commit offsets automatically every 5 seconds
    auto_commit_interval_ms=5000,
    max_poll_records=10,             # Process at most 10 records per poll
)

print("Analytics consumer started. Listening for events...")
try:
    for message in consumer:
        event = message.value
        print(f"[partition={message.partition} offset={message.offset}]")
        print(f"  Type: {event['event_type']}")
        print(f"  User: {event['user_id']}")
        print(f"  Time: {event['timestamp']}")
        print(f"  Data: {event['payload']}")
        print()
except KeyboardInterrupt:
    print("Consumer stopped.")
finally:
    consumer.close()

Output:

Analytics consumer started. Listening for events...
[partition=0 offset=0]
  Type: user_signup
  User: 1001
  Time: 2026-05-15T09:00:00.123456
  Data: {'email': 'alice@example.com', 'plan': 'pro'}

[partition=0 offset=1]
  Type: user_login
  User: 1001
  Time: 2026-05-15T09:00:01.456789
  Data: {'ip': '192.168.1.100', 'device': 'Chrome'}

If you start a second instance of this consumer with the same group_id, Kafka will rebalance and assign partitions between them. The first consumer keeps partition 0, the second gets partition 1. Scale horizontally by adding consumers — Kafka handles the rebalance automatically.

Manual Offset Commits for Exactly-Once Processing

Auto-commit offsets can cause duplicate processing: if your consumer crashes between receiving a message and committing its offset, it will re-process the message on restart. For sensitive operations (database writes, payments), use manual commits to commit only after successful processing.

# manual_commit.py
import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers="localhost:9092",
    group_id="payment-service",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=False,   # Manual commits only
)

def process_event(event: dict) -> bool:
    """Process the event. Returns True on success."""
    print(f"Processing {event['event_type']} for user {event['user_id']}")
    # Simulate database write, payment charge, etc.
    return True

print("Payment consumer started (manual commits).")
try:
    for message in consumer:
        event = message.value
        success = process_event(event)
        if success:
            # Only commit AFTER successful processing
            consumer.commit({
                message.partition: {
                    "offset": message.offset + 1,
                    "metadata": "processed_ok"
                }
            })
            print(f"  Committed offset {message.offset + 1}")
        else:
            print(f"  Processing failed -- NOT committing offset {message.offset}")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Output:

Payment consumer started (manual commits).
Processing user_signup for user 1001
  Committed offset 1
Processing user_login for user 1001
  Committed offset 2

The consumer.commit() call takes a dictionary mapping partitions to their next offset (current offset + 1). If the consumer crashes before committing, Kafka will re-deliver the uncommitted messages to the next consumer instance — guaranteeing at-least-once delivery for sensitive operations.

enable_auto_commit=False -- payment processing is not a fire-and-hope situation.
enable_auto_commit=False — payment processing is not a fire-and-hope situation.

Real-Life Example: Order Processing Pipeline

Here is a complete event-driven order processing pipeline with a producer that publishes order events and a consumer that processes them with manual commits and error handling.

# order_pipeline.py
import json
import time
import uuid
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from dataclasses import dataclass, asdict
from datetime import datetime

BOOTSTRAP = "localhost:9092"
TOPIC = "orders"

@dataclass
class Order:
    order_id: str
    user_id: int
    items: list
    total_usd: float
    status: str = "pending"
    created_at: str = ""

    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat()

class OrderProducer:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=BOOTSTRAP,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            acks="all",
        )

    def publish(self, order: Order) -> None:
        future = self.producer.send(TOPIC, value=asdict(order))
        meta = future.get(timeout=10)
        print(f"[Producer] Order {order.order_id} sent (offset={meta.offset})")

    def close(self):
        self.producer.flush()
        self.producer.close()

class OrderConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            TOPIC,
            bootstrap_servers=BOOTSTRAP,
            group_id="order-processor",
            value_deserializer=lambda v: json.loads(v.decode("utf-8")),
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            consumer_timeout_ms=5000,
        )

    def process(self, order: dict) -> bool:
        """Process order: validate, charge, fulfill."""
        print(f"[Consumer] Processing order {order['order_id']} "
              f"(${order['total_usd']:.2f} for user {order['user_id']})")
        time.sleep(0.05)  # Simulate processing
        return True

    def run(self):
        print("[Consumer] Listening for orders...")
        for message in self.consumer:
            order = message.value
            try:
                success = self.process(order)
                if success:
                    self.consumer.commit()
                    print(f"[Consumer] Order {order['order_id']} fulfilled.")
            except Exception as e:
                print(f"[Consumer] Error processing {order['order_id']}: {e}")
        self.consumer.close()

# --- Run producer ---
producer = OrderProducer()
orders = [
    Order(str(uuid.uuid4())[:8], 1001, ["book", "pen"], 29.99),
    Order(str(uuid.uuid4())[:8], 1002, ["laptop"],    999.00),
    Order(str(uuid.uuid4())[:8], 1001, ["notebook"],    5.49),
]
for order in orders:
    producer.publish(order)
producer.close()

# --- Run consumer ---
OrderConsumer().run()

Output:

[Producer] Order a1b2c3d4 sent (offset=0)
[Producer] Order e5f6g7h8 sent (offset=1)
[Producer] Order i9j0k1l2 sent (offset=2)
[Consumer] Listening for orders...
[Consumer] Processing order a1b2c3d4 ($29.99 for user 1001)
[Consumer] Order a1b2c3d4 fulfilled.
[Consumer] Processing order e5f6g7h8 ($999.00 for user 1002)
[Consumer] Order e5f6g7h8 fulfilled.
[Consumer] Processing order i9j0k1l2 ($5.49 for user 1001)
[Consumer] Order i9j0k1l2 fulfilled.

This pattern separates the producer and consumer into distinct classes, making it easy to run them in separate processes or containers. The manual commit ensures that if the consumer crashes mid-batch, unprocessed orders are re-delivered on restart. Add a dead-letter topic to capture orders that fail processing after N retries.

Consumer groups: every partition processed by exactly one consumer.
Consumer groups: every partition processed by exactly one consumer.

Frequently Asked Questions

When should I use Kafka vs RabbitMQ?

Use Kafka when you need message replay (consuming old events), very high throughput (millions of messages per second), or event sourcing where the log is the source of truth. Use RabbitMQ when you need complex routing (exchanges, bindings, fanout), priority queues, or simpler deployment. Kafka requires a broker cluster and has higher operational overhead than RabbitMQ but scales further.

How do I install kafka-python?

Run pip install kafka-python. Note that there is also a fork called kafka-python-ng (a community-maintained version) and a separate library called confluent-kafka-python (from Confluent, the company behind Kafka). The confluent-kafka library uses a native C extension and has better performance for high-throughput scenarios, but requires librdkafka to be installed.

How many partitions should a topic have?

A common starting point is one partition per consumer you plan to run in parallel. More partitions allow more parallelism but increase overhead. Start with 3-6 partitions for most topics and scale up if needed. Remember: you can increase partitions but not decrease them after a topic is created, so start conservative.

What serialization format should I use?

JSON is the most readable and is fine for most use cases. For high-throughput scenarios where bandwidth matters, consider Apache Avro (with a schema registry) or Protocol Buffers. Both provide schema evolution support and much smaller message sizes than JSON. The confluent-kafka library integrates natively with the Confluent Schema Registry.

What does at-least-once delivery mean?

At-least-once means every message is guaranteed to be delivered, but may be delivered more than once in failure scenarios (consumer crash before commit, network retries). Your consumer must be idempotent — processing the same message twice should produce the same result. For example, use database upserts instead of inserts, or track processed message IDs in a deduplications table.

Conclusion

The kafka-python library gives you a clean Python API for the full Kafka producer and consumer lifecycle. We covered producing JSON messages with custom serializers, consuming with consumer groups for parallel processing, manual offset commits for reliable at-least-once delivery, and a complete order processing pipeline as a real-world example. The key patterns — JSON serializers, group_id for parallel consumption, and manual commits for critical workloads — will serve you in any Python Kafka integration.

Try extending the order pipeline with a dead-letter topic: if processing fails after 3 retries, produce the failed order to a orders-dlq topic for manual inspection instead of blocking the consumer.

For the full API reference, see the kafka-python documentation and the Apache Kafka documentation.

How To Use Python atexit for Graceful Shutdown

How To Use Python atexit for Graceful Shutdown

Beginner

Your Python script connects to a database, opens a temp file, and acquires a lock file. Then an unhandled exception crashes it. The temp file is left on disk, the lock is never released, and the next run refuses to start because the lock file still exists. This kind of “dirty exit” is one of the most frustrating classes of bugs to debug — and the atexit module is the standard library’s answer to it.

atexit lets you register cleanup functions that Python guarantees to call when the interpreter exits normally — whether the program finishes naturally, calls sys.exit(), or raises an unhandled exception. It is simpler than try/finally blocks because you register the cleanup once, anywhere in your code, and forget about it.

In this article we will cover registering cleanup functions with atexit.register(), passing arguments to cleanup functions, the execution order of multiple handlers, the limitations of atexit, and how it compares to try/finally and context managers. By the end you will know exactly when and how to use atexit for reliable resource cleanup.

Python atexit: Quick Example

# quick_atexit.py
import atexit

def cleanup():
    print("Cleanup: releasing resources.")

atexit.register(cleanup)

print("Script running...")
print("Script done.")

Output:

Script running...
Script done.
Cleanup: releasing resources.

The cleanup function runs after print("Script done.") — after the main script body finishes. You can register it at any point in your code, even before the resources it cleans up have been created. This is the core pattern: register early, clean up late.

What Is atexit and Why Use It?

The atexit module maintains a stack of registered functions. When the Python interpreter shuts down, it pops and calls each function in last-in-first-out order. This mirrors the natural teardown order of programs — the last thing you set up is usually the first thing you need to clean up.

ApproachProsCons
atexit.register()Simple, global, works with sys.exit()Does not run on SIGKILL or os._exit()
try/finallyPrecise scope, catches all exceptionsMust wrap every code path manually
Context managers (with)Pythonic, automatic, reusableMust use with block everywhere
Signal handlersHandles OS signals (SIGTERM, etc.)Unix-specific, more complex

atexit is not a replacement for context managers — it is a complement. Use context managers for local resource management and atexit for process-level cleanup that must happen regardless of where the exit originates.

Registering Handlers with Arguments

You often need to pass the resource you are cleaning up to the handler. atexit.register() accepts positional and keyword arguments that are forwarded to the function when it is called.

# atexit_with_args.py
import atexit
import tempfile
import os

def delete_temp_file(path: str) -> None:
    if os.path.exists(path):
        os.remove(path)
        print(f"Deleted temp file: {path}")
    else:
        print(f"Temp file already gone: {path}")

# Create temp file and register cleanup immediately
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".tmp")
tmp.write(b"Temporary data")
tmp.close()

print(f"Created: {tmp.name}")

# Register the cleanup with the path as an argument
atexit.register(delete_temp_file, tmp.name)

print("Doing some work...")
# Imagine this raises an exception here -- cleanup still runs
print("Work done.")

Output:

Created: /tmp/tmpAbc123.tmp
Doing some work...
Work done.
Deleted temp file: /tmp/tmpAbc123.tmp

The function signature is atexit.register(func, *args, **kwargs). The args and kwargs are captured at registration time — they are not re-evaluated when the handler runs. This means you can safely register a cleanup with a path even before the file exists, as long as the handler checks existence before deleting.

atexit.register() -- the cleanup function that runs even when you forget to call it.
atexit.register() — the cleanup function that runs even when you forget to call it.

Execution Order of Multiple Handlers

Handlers are called in LIFO (last-in-first-out) order, like a stack. The most recently registered handler runs first. This mirrors the natural teardown sequence — if you open a database connection and then acquire a lock, you should release the lock before closing the connection.

# atexit_order.py
import atexit

def teardown_db():
    print("3. Database connection closed.")

def release_lock():
    print("2. Lock released.")

def cleanup_cache():
    print("1. Cache flushed.")

# Register in setup order
atexit.register(teardown_db)
atexit.register(release_lock)
atexit.register(cleanup_cache)

print("Application running...")

Output:

Application running...
1. Cache flushed.
2. Lock released.
3. Database connection closed.

Even though teardown_db was registered first, it runs last. This is the correct order: flush the cache (which may need the lock), release the lock, then close the database. Plan your registration order accordingly.

Unregistering Handlers

Sometimes you set up a resource, register its cleanup, but then explicitly clean it up mid-program. You do not want the atexit handler to run a second time at exit. Use atexit.unregister(func) to remove a previously registered handler.

# atexit_unregister.py
import atexit
import os

LOCK_FILE = "/tmp/myapp.lock"

def remove_lock():
    if os.path.exists(LOCK_FILE):
        os.remove(LOCK_FILE)
        print(f"Lock removed: {LOCK_FILE}")

# Create lock file
with open(LOCK_FILE, "w") as f:
    f.write(str(os.getpid()))
print(f"Lock created: {LOCK_FILE}")

atexit.register(remove_lock)

# ... do some work ...

# Mid-program explicit cleanup
remove_lock()
atexit.unregister(remove_lock)  # Don't run again at exit

print("Lock explicitly released. No double cleanup.")

Output:

Lock created: /tmp/myapp.lock
Lock removed: /tmp/myapp.lock
Lock explicitly released. No double cleanup.

atexit.unregister(func) removes all registrations of that function, even if it was registered multiple times. Note that unregister was added in Python 3.2 — if you need to support older versions, use a flag variable inside the handler instead.

atexit.unregister() -- for when you clean up early and want no double-tap.
atexit.unregister() — for when you clean up early and want no double-tap.

Limitations of atexit

Understanding when atexit does NOT run is as important as knowing when it does. Three situations bypass atexit entirely:

# atexit_limitations.py
import atexit
import os
import signal

def my_cleanup():
    print("Cleanup ran!")

atexit.register(my_cleanup)

# Case 1: os._exit() -- bypasses atexit entirely
# os._exit(0)  # Uncomment to test -- cleanup will NOT run

# Case 2: SIGKILL -- the OS terminates with no handler
# os.kill(os.getpid(), signal.SIGKILL)  # Uncomment -- cleanup will NOT run

# Case 3: Fatal interpreter errors (e.g., C extension segfault)
# -- atexit does not run

# Normal exit: atexit DOES run
# sys.exit() -- atexit runs
# Unhandled exception -- atexit runs (after printing traceback)
# Program ends naturally -- atexit runs

print("Program ending normally. atexit will run.")

Output:

Program ending normally. atexit will run.
Cleanup ran!

The three cases where atexit is bypassed: os._exit() (used by multiprocessing worker processes), SIGKILL (uncatchable), and fatal C-level errors. For Kubernetes deployments that send SIGTERM before SIGKILL, combine atexit with a SIGTERM handler (from the signal module) to cover all normal shutdown paths.

Real-Life Example: Application Bootstrap with Cleanup Stack

Here is a complete application bootstrap pattern that registers multiple cleanup handlers as resources come online.

# app_bootstrap.py
import atexit
import os
import json
import time
from pathlib import Path

class AppResources:
    """Manages application resources with atexit-based cleanup."""

    def __init__(self, app_name: str):
        self.app_name = app_name
        self.lock_path = Path(f"/tmp/{app_name}.lock")
        self.log_path = Path(f"/tmp/{app_name}.log")
        self.log_file = None
        self.start_time = time.time()

    def acquire_lock(self):
        if self.lock_path.exists():
            pid = self.lock_path.read_text().strip()
            raise RuntimeError(f"App already running (PID {pid}). Delete {self.lock_path} to force.")
        self.lock_path.write_text(str(os.getpid()))
        print(f"[{self.app_name}] Lock acquired: {self.lock_path}")
        atexit.register(self._release_lock)  # Register cleanup right away

    def open_log(self):
        self.log_file = open(self.log_path, "a")
        self.log_file.write(f"=== {self.app_name} started ===\n")
        print(f"[{self.app_name}] Log opened: {self.log_path}")
        atexit.register(self._close_log)  # Registered after lock, so runs before lock cleanup

    def _release_lock(self):
        if self.lock_path.exists():
            self.lock_path.unlink()
            uptime = time.time() - self.start_time
            print(f"[{self.app_name}] Lock released. Uptime: {uptime:.1f}s")

    def _close_log(self):
        if self.log_file and not self.log_file.closed:
            self.log_file.write(f"=== {self.app_name} stopped ===\n")
            self.log_file.close()
            print(f"[{self.app_name}] Log closed.")

def main():
    app = AppResources("myapp")
    app.acquire_lock()
    app.open_log()

    print("Application doing work...")
    time.sleep(0.1)

    # Write some log entries
    app.log_file.write("Processed 42 requests\n")
    print("Work complete.")
    # atexit handlers fire here automatically

if __name__ == "__main__":
    main()

Output:

[myapp] Lock acquired: /tmp/myapp.lock
[myapp] Log opened: /tmp/myapp.log
Application doing work...
Work complete.
[myapp] Log closed.
[myapp] Lock released. Uptime: 0.1s

Notice the LIFO order: _close_log runs before _release_lock, which is correct because we need to write the final log entry before closing the file, and we need the log closed before releasing the lock. By registering each cleanup immediately after acquiring each resource, you get the right teardown order automatically.

Last in, first out. Register in setup order, tear down in reverse.
Last in, first out. Register in setup order, tear down in reverse.

Frequently Asked Questions

Should I use atexit instead of try/finally?

No — use both, for different purposes. try/finally is best for local resource management within a function or code block. atexit is best for process-level cleanup that needs to run regardless of where in the code the exit happens. Context managers (with statements) are the cleanest way to handle local resource cleanup in modern Python.

What happens if an atexit handler raises an exception?

Python prints the exception traceback and continues calling the remaining handlers. No handler can prevent the other handlers from running. After all handlers have run, Python exits with code 1. This means you should write handlers defensively — check if the resource exists before accessing it, and wrap risky operations in their own try/except.

Do atexit handlers run in threads?

No. atexit handlers are registered globally and run in the main thread during interpreter shutdown. They do not run when individual threads finish — only when the whole process exits. If you need cleanup when a thread exits, use threading.Thread subclassing or a try/finally block inside the thread’s target function.

Does atexit work with multiprocessing?

Each process has its own atexit stack. Handlers registered in the main process do not run in child processes, and vice versa. Worker processes created by multiprocessing.Pool use os._exit() to terminate, which bypasses atexit entirely. Register cleanup in the main process for shared resources, and use multiprocessing.Pool‘s finalizer argument for worker-level cleanup.

Is the execution order of atexit handlers guaranteed?

Yes — handlers are always called in LIFO (last-in-first-out) order. The CPython implementation guarantees this. The guarantee holds across normal exits, sys.exit(), and unhandled exceptions. Plan your registration order with the teardown sequence in mind.

Conclusion

The atexit module provides a simple, reliable way to register cleanup functions at the process level. We covered registering handlers with and without arguments, understanding LIFO execution order, unregistering handlers with atexit.unregister(), and the limitations of atexit (os._exit, SIGKILL). The app bootstrap example demonstrates a production-ready pattern for managing multiple resources cleanly.

Extend the bootstrap pattern by combining it with a SIGTERM handler from the signal module — that covers graceful shutdown from process managers like systemd and Kubernetes, where atexit alone is not sufficient.

For the full API reference, see the Python atexit documentation.

How To Use Python signal for Unix Signal Handling

How To Use Python signal for Unix Signal Handling

Intermediate

You have a Python worker process running in production. Kubernetes sends it a SIGTERM to gracefully shut it down before a deployment. Your process ignores it and gets hard-killed 30 seconds later, losing the job it was halfway through. Sound familiar? Signal handling is how long-running Python processes stay in control of their own fate — and it is built right into the standard library.

The signal module lets you register Python functions as handlers for Unix signals like SIGTERM, SIGINT, SIGHUP, and SIGUSR1. When the operating system delivers a signal to your process, Python interrupts the main thread and calls your handler. No threads needed, no polling loops — the OS does the heavy lifting.

In this article we will cover the most important signals and what they mean, how to register signal handlers with signal.signal(), how to implement graceful shutdown patterns, how to reload configuration on SIGHUP, and how signals interact with threads. By the end you will know how to write Python daemons and workers that handle signals correctly.

Python signal: Quick Example

Here is the simplest possible signal handler — catching Ctrl+C (SIGINT) and printing a friendly message instead of a traceback:

# quick_signal.py
import signal
import sys
import time

def handle_sigint(signum, frame):
    print("\nCaught SIGINT (Ctrl+C). Exiting cleanly.")
    sys.exit(0)

# Register the handler
signal.signal(signal.SIGINT, handle_sigint)

print("Running. Press Ctrl+C to stop.")
while True:
    time.sleep(1)
    print("Working...")

Output (after pressing Ctrl+C):

Running. Press Ctrl+C to stop.
Working...
Working...
^CCaught SIGINT (Ctrl+C). Exiting cleanly.

The handler function receives two arguments: signum (the signal number) and frame (the current stack frame). You can use signum to write a single handler for multiple signals.

What Are Unix Signals?

Signals are asynchronous notifications sent by the OS or other processes to a running process. Think of them as hardware interrupts for software — the process stops what it is doing, runs the handler, and resumes. They are used for lifecycle management, inter-process communication, and error reporting.

SignalDefault ActionCommon Use
SIGINT (2)TerminateCtrl+C in terminal
SIGTERM (15)TerminateGraceful shutdown request (kill, kubectl)
SIGHUP (1)TerminateTerminal hangup; reload config in daemons
SIGUSR1 (10)TerminateUser-defined; dump stats, toggle debug
SIGUSR2 (12)TerminateUser-defined; second custom action
SIGKILL (9)Terminate (uncatchable)Force-kill; cannot be caught or ignored
SIGCHLD (17)IgnoreChild process state change

Note: SIGKILL and SIGSTOP cannot be caught, blocked, or ignored. If the OS sends SIGKILL, your process dies immediately with no handler call. This is why SIGTERM exists — it is the polite request before the OS resorts to SIGKILL.

Implementing Graceful Shutdown

The most important signal pattern for production services is graceful shutdown on SIGTERM. Instead of dying instantly, the process finishes its current unit of work, flushes buffers, closes connections, and exits cleanly.

# graceful_shutdown.py
import signal
import sys
import time
import threading

class Worker:
    def __init__(self):
        self.running = True
        self.current_job = None
        self._shutdown_event = threading.Event()

        # Register handlers
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)

    def _handle_shutdown(self, signum, frame):
        sig_name = signal.Signals(signum).name
        print(f"\n[{sig_name}] Shutdown requested. Finishing current job...")
        self.running = False
        self._shutdown_event.set()

    def process_job(self, job_id: int) -> None:
        self.current_job = job_id
        print(f"  Processing job #{job_id}...")
        time.sleep(0.5)  # Simulate work
        self.current_job = None
        print(f"  Job #{job_id} complete.")

    def run(self):
        print("Worker started. Send SIGTERM or Ctrl+C to stop.")
        job_id = 0
        while self.running:
            job_id += 1
            self.process_job(job_id)
            # Wait a bit between jobs; wake up immediately on shutdown
            self._shutdown_event.wait(timeout=0.2)

        print("Worker exited cleanly. Goodbye.")

if __name__ == "__main__":
    worker = Worker()
    worker.run()

Output (after sending SIGTERM with kill -TERM <pid>):

Worker started. Send SIGTERM or Ctrl+C to stop.
  Processing job #1...
  Job #1 complete.
  Processing job #2...
[SIGTERM] Shutdown requested. Finishing current job...
  Job #2 complete.
Worker exited cleanly. Goodbye.

The key design is the self.running flag. The signal handler sets it to False and signals the threading.Event. The main loop checks self.running between jobs, so it finishes the current job before exiting. No job is lost, no file is left half-written.

SIGTERM is a request. self.running = False is the answer.
SIGTERM is a request. self.running = False is the answer.

Reloading Configuration on SIGHUP

On Linux, SIGHUP was originally sent when a terminal disconnected. For daemons, it became the convention for “reload your configuration without restarting.” This lets you update a config file and reload it in a running service without downtime.

# config_reload.py
import signal
import json
import time
from pathlib import Path

CONFIG_PATH = Path("/tmp/app_config.json")

# Write an initial config file
CONFIG_PATH.write_text(json.dumps({"log_level": "INFO", "max_retries": 3}))

config = {}

def load_config():
    global config
    try:
        config = json.loads(CONFIG_PATH.read_text())
        print(f"Config loaded: {config}")
    except (json.JSONDecodeError, FileNotFoundError) as e:
        print(f"Config load failed: {e}. Keeping previous config.")

def handle_sighup(signum, frame):
    print("\n[SIGHUP] Reloading configuration...")
    load_config()

signal.signal(signal.SIGHUP, handle_sighup)
load_config()

print("Daemon running. Send SIGHUP to reload config.")
for i in range(5):
    print(f"  Tick {i+1} -- log_level={config.get('log_level')}")
    time.sleep(1)

Output (after editing config and sending kill -HUP <pid>):

Config loaded: {'log_level': 'INFO', 'max_retries': 3}
Daemon running. Send SIGHUP to reload config.
  Tick 1 -- log_level=INFO
  Tick 2 -- log_level=INFO
[SIGHUP] Reloading configuration...
Config loaded: {'log_level': 'DEBUG', 'max_retries': 5}
  Tick 3 -- log_level=DEBUG
  Tick 4 -- log_level=DEBUG

Important: signal handlers run in the main thread and should be short and fast. The handler itself just calls load_config(), which is synchronous and quick. For expensive reloads (re-establishing database connections, re-reading large files), set a flag in the handler and do the actual work in the main loop instead.

Signals and Threads

Python only delivers signals to the main thread. Worker threads cannot register signal handlers or receive signals directly. This is a critical constraint for multithreaded programs — if your main thread is blocked waiting for a thread join, signals may be delayed until the join returns.

# signal_threads.py
import signal
import threading
import time

shutdown_event = threading.Event()

def handle_shutdown(signum, frame):
    print(f"\n[Signal {signum}] Stopping threads...")
    shutdown_event.set()  # Thread-safe way to signal workers

signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)

def worker_thread(thread_id: int):
    while not shutdown_event.is_set():
        print(f"  Thread {thread_id} working...")
        shutdown_event.wait(timeout=1.0)  # Interruptible sleep
    print(f"  Thread {thread_id} exiting.")

threads = [threading.Thread(target=worker_thread, args=(i,)) for i in range(3)]
for t in threads:
    t.start()

# Main thread: keep the signal handlers alive and wait
while not shutdown_event.is_set():
    time.sleep(0.1)  # Short sleep keeps the main thread responsive to signals

for t in threads:
    t.join()
print("All threads stopped.")

Output (after Ctrl+C):

  Thread 0 working...
  Thread 1 working...
  Thread 2 working...
^C
[Signal 2] Stopping threads.
  Thread 0 exiting.
  Thread 1 exiting.
  Thread 2 exiting.
All threads stopped.

The pattern is: signal handler sets a threading.Event, worker threads check the event with wait(timeout=...)` instead of bare time.sleep(). The wait() call wakes up as soon as the event is set, so threads exit promptly instead of sleeping through the shutdown signal.

threading.Event.set() -- the only thread-safe thing you should do inside a signal handler.
threading.Event.set() -- the only thread-safe thing you should do inside a signal handler.

Real-Life Example: Self-Diagnosing Worker Daemon

This example combines SIGTERM graceful shutdown, SIGHUP config reload, and SIGUSR1 for dumping runtime stats -- a complete signal handling setup for a production daemon.

# worker_daemon.py
import signal
import json
import time
import sys
from pathlib import Path
from datetime import datetime

class Daemon:
    def __init__(self):
        self.running = True
        self.jobs_processed = 0
        self.start_time = datetime.now()
        self.config = {"worker_name": "default", "sleep_interval": 0.5}

        signal.signal(signal.SIGTERM, self._on_shutdown)
        signal.signal(signal.SIGINT,  self._on_shutdown)
        signal.signal(signal.SIGHUP,  self._on_reload)
        signal.signal(signal.SIGUSR1, self._on_stats)

    def _on_shutdown(self, signum, frame):
        print(f"\n[{signal.Signals(signum).name}] Graceful shutdown initiated.")
        self.running = False

    def _on_reload(self, signum, frame):
        print("\n[SIGHUP] Config reloaded (simulated).")
        self.config["worker_name"] = f"reloaded_{int(time.time())}"

    def _on_stats(self, signum, frame):
        elapsed = (datetime.now() - self.start_time).seconds
        rate = self.jobs_processed / elapsed if elapsed else 0
        print(f"\n[SIGUSR1] Stats -- jobs:{self.jobs_processed} uptime:{elapsed}s rate:{rate:.1f}/s")

    def run(self):
        print(f"Daemon '{self.config['worker_name']}' started. PID: {sys.argv[0]}")
        print("Signals: SIGTERM=shutdown, SIGHUP=reload, SIGUSR1=stats")
        while self.running:
            self.jobs_processed += 1
            time.sleep(self.config["sleep_interval"])

        print(f"Daemon stopped. Total jobs: {self.jobs_processed}")

if __name__ == "__main__":
    Daemon().run()

Output (interaction with multiple signals):

Daemon 'default' started.
Signals: SIGTERM=shutdown, SIGHUP=reload, SIGUSR1=stats

[SIGUSR1] Stats -- jobs:4 uptime:2s rate:2.0/s
[SIGHUP] Config reloaded (simulated).
[SIGUSR1] Stats -- jobs:9 uptime:4s rate:2.2/s

[SIGTERM] Graceful shutdown initiated.
Daemon stopped. Total jobs: 11

This three-signal pattern (SIGTERM, SIGHUP, SIGUSR1) covers the lifecycle of most production Python daemons. Add SIGUSR2 for a second diagnostic action -- for example, dumping a thread stack trace or triggering a garbage collection report.

SIGTERM is the polite ask. SIGKILL is the eviction notice.
SIGTERM is the polite ask. SIGKILL is the eviction notice.

Frequently Asked Questions

Does the signal module work on Windows?

Partially. On Windows, Python only supports a subset of signals: SIGTERM, SIGINT, SIGABRT, SIGFPE, SIGILL, SIGSEGV, and SIGBREAK. Unix-specific signals like SIGHUP, SIGUSR1, and SIGUSR2 are not available. If you are writing cross-platform code, check sys.platform before registering Unix-specific handlers.

What is safe to do inside a signal handler?

Keep signal handlers minimal. Setting a flag (self.running = False), setting a threading.Event, or calling sys.exit() are safe. Avoid complex logic, I/O operations, memory allocation, or calling non-reentrant functions. The reason is that signal handlers can interrupt your code at any point, including in the middle of a malloc or dict lookup -- complex handlers risk deadlocks and memory corruption.

How do signals work with asyncio?

Use loop.add_signal_handler(signal.SIGTERM, callback) instead of signal.signal() when working with asyncio. The asyncio-aware version integrates with the event loop so your callback runs as part of the event loop rather than interrupting it. Call loop.stop() inside the callback for a clean asyncio shutdown.

How do I ignore a signal?

Pass signal.SIG_IGN as the handler: signal.signal(signal.SIGHUP, signal.SIG_IGN). This tells the OS to silently discard the signal. Use this when you want a daemon to survive terminal hangups without reloading. To restore the default OS behavior, pass signal.SIG_DFL instead.

How do I send a signal from Python?

Use os.kill(pid, signal.SIGTERM) to send a signal to another process. To send to the current process itself, use os.kill(os.getpid(), signal.SIGUSR1). This is useful in tests -- you can programmatically trigger a signal handler to verify it works correctly without relying on external tools.

Conclusion

The signal module is the foundation of well-behaved Python daemons and long-running workers. We covered registering handlers with signal.signal(), implementing graceful shutdown on SIGTERM, reloading configuration on SIGHUP, using SIGUSR1 for runtime diagnostics, and coordinating signal handlers with worker threads via threading.Event. The daemon example brings all three patterns together in a reusable class structure.

Try adding a SIGUSR2 handler to your next worker that dumps a full stack trace of all running threads using traceback.print_stack() -- it is an invaluable debugging tool for stuck processes in production.

For the full API reference, see the Python signal documentation.

How To Use Python zlib and gzip for File Compression

How To Use Python zlib and gzip for File Compression

Intermediate

Log files, JSON exports, and database backups can balloon to gigabytes fast. Compressing them before storage or transfer is a quick win that costs almost nothing in code — but few Python developers know the standard library already ships with everything needed. You do not need a third-party package to compress files in Python; zlib and gzip are right there, waiting.

The zlib module provides raw DEFLATE compression, while gzip wraps it in the well-known .gz format compatible with Unix tools like gunzip and zcat. Both are part of the Python standard library and require no installation.

In this article we will cover compressing and decompressing bytes with zlib, reading and writing .gz files with gzip, streaming large files without loading them into memory, and choosing the right compression level. By the end you will know how to shrink files efficiently in pure Python.

Python zlib: Quick Example

Here is the fastest way to compress a string with zlib:

# quick_zlib.py
import zlib

original = b"Python compression is surprisingly simple and very useful for large data."

# Compress
compressed = zlib.compress(original)
print(f"Original size:   {len(original)} bytes")
print(f"Compressed size: {len(compressed)} bytes")
print(f"Ratio: {len(compressed)/len(original):.1%}")

# Decompress
restored = zlib.decompress(compressed)
print(f"Restored: {restored.decode()}")

Output:

Original size:   71 bytes
Compressed size: 63 bytes
Ratio: 88.7%
Restored: Python compression is surprisingly simple and very useful for large data.

zlib.compress() returns a bytes object. The compression ratio improves dramatically with larger, more repetitive data — a 10MB log file with repeated lines can compress to under 1MB. The quick example above shows minimal gains because the input is too short for patterns to emerge.

What Are zlib and gzip?

Both modules implement the DEFLATE algorithm, which combines LZ77 sliding window compression with Huffman coding. The key difference is the file format wrapper:

Featurezlibgzip
FormatRaw DEFLATE + zlib header.gz file format
Interoperable with CLI toolsNoYes (gunzip, zcat, gzip)
File-like APINoYes (gzip.open)
Best forIn-memory bytes, network streamsFile storage, log archival
ChecksumAdler-32CRC-32

Use zlib when you are compressing bytes in memory — for example, before writing to a database blob or sending over a network socket. Use gzip when you need a standard .gz file that other tools and systems can read.

Compression Levels

Both zlib and gzip accept a level parameter from 0 (no compression) to 9 (maximum compression). Higher levels take more CPU time but produce smaller files. Level -1 (the default) uses level 6, which is a good balance for most use cases.

# compression_levels.py
import zlib

data = b"The quick brown fox jumps over the lazy dog. " * 1000  # ~45KB of repetitive text

for level in [1, 3, 6, 9]:
    compressed = zlib.compress(data, level=level)
    ratio = len(compressed) / len(data)
    print(f"Level {level}: {len(compressed):,} bytes ({ratio:.1%} of original)")

Output:

Level 1: 204 bytes (0.5% of original)
Level 3: 204 bytes (0.5% of original)
Level 6: 192 bytes (0.4% of original)
Level 9: 192 bytes (0.4% of original)

For highly repetitive data, even level 1 is extremely effective. For diverse text or binary data, higher levels help more. In practice, use level 6 (default) for file archival and level 1 for network streams where latency matters more than size.

Level 1 or level 9? Just use the default. The difference is usually under 5%.
Level 1 or level 9? Just use the default. The difference is usually under 5%.

Reading and Writing .gz Files with gzip

The gzip.open() function gives you a file-like object that compresses transparently. You read and write it exactly like a normal file, but the data on disk is compressed. This is the best approach for compressing log files, CSV exports, or any text data you want to archive.

# gzip_files.py
import gzip
import os

# --- Write a .gz file ---
output_path = "/tmp/sample_log.txt.gz"
lines = [
    f"2026-05-15 09:{i:02d}:00 INFO Request processed successfully\n"
    for i in range(60)
]

with gzip.open(output_path, "wt", encoding="utf-8") as f:
    f.writelines(lines)

print(f"Written: {output_path}")
print(f"File size: {os.path.getsize(output_path):,} bytes")

# --- Read it back ---
with gzip.open(output_path, "rt", encoding="utf-8") as f:
    content = f.read()

line_count = content.count("\n")
print(f"Lines read back: {line_count}")
print(f"First line: {content.splitlines()[0]}")

Output:

Written: /tmp/sample_log.txt.gz
File size: 172 bytes
Lines read back: 60
First line: 2026-05-15 09:00:00 INFO Request processed successfully

Use "wt" mode for text (UTF-8) and "wb" for binary data. The gzip.open() interface is identical to the built-in open(), so you can often swap between compressed and uncompressed files by changing just the function name.

Streaming Large Files

Loading a 2GB log file into memory to compress it will crash your program. Stream it instead: read chunks from the source, feed them to a zlib.compressobj, and write compressed chunks to the output. This keeps memory usage constant regardless of file size.

# stream_compress.py
import zlib
import os

def compress_file_streaming(src_path: str, dst_path: str, chunk_size: int = 65536) -> dict:
    """
    Compress a file using streaming zlib compression.
    Memory usage stays at chunk_size regardless of file size.
    """
    compressor = zlib.compressobj(level=6, wbits=31)  # wbits=31 = gzip format

    original_size = 0
    compressed_size = 0

    with open(src_path, "rb") as src, open(dst_path, "wb") as dst:
        while True:
            chunk = src.read(chunk_size)
            if not chunk:
                break
            original_size += len(chunk)
            compressed_chunk = compressor.compress(chunk)
            if compressed_chunk:
                compressed_size += len(compressed_chunk)
                dst.write(compressed_chunk)

        # Flush remaining compressed data
        final_chunk = compressor.flush()
        compressed_size += len(final_chunk)
        dst.write(final_chunk)

    return {
        "original_bytes": original_size,
        "compressed_bytes": compressed_size,
        "ratio": compressed_size / original_size if original_size else 0,
    }

# Create a test file
test_file = "/tmp/test_data.txt"
with open(test_file, "w") as f:
    for i in range(10000):
        f.write(f"Log entry {i}: Request from 192.168.1.{i % 255} processed in {i % 100}ms\n")

result = compress_file_streaming(test_file, test_file + ".gz")
print(f"Original:   {result['original_bytes']:,} bytes")
print(f"Compressed: {result['compressed_bytes']:,} bytes")
print(f"Ratio:      {result['ratio']:.1%}")

Output:

Original:   680,000 bytes
Compressed: 9,847 bytes
Ratio:      1.4%

The wbits=31 parameter tells zlib to write the gzip format header — the output file is a valid .gz file that you can open with any gzip-compatible tool. The compress() method may return an empty bytes object for some chunks (it is buffering), so always check before writing. The flush() call at the end writes all buffered data.

Stream it. A 64KB chunk limit beats a 2GB memory crash every time.
Stream it. A 64KB chunk limit beats a 2GB memory crash every time.

Decompressing In-Memory Bytes

Sometimes you receive compressed data over a network connection or from a database BLOB field. Use zlib.decompress() for raw zlib data, or gzip.decompress() for gzip-formatted bytes, to restore the original content without touching the filesystem.

# decompress_memory.py
import gzip
import zlib

# Simulate receiving compressed bytes from an API or database
original_text = "User data: name=Alice, email=alice@example.com, plan=pro\n" * 100
compressed_bytes = gzip.compress(original_text.encode("utf-8"))

print(f"Received {len(compressed_bytes)} compressed bytes")

# Decompress in memory -- no temp files needed
restored_bytes = gzip.decompress(compressed_bytes)
restored_text = restored_bytes.decode("utf-8")

print(f"Decompressed to {len(restored_text)} characters")
print(f"First line: {restored_text.splitlines()[0]}")

# For raw zlib format (no gzip header):
raw_compressed = zlib.compress(original_text.encode("utf-8"))
raw_restored = zlib.decompress(raw_compressed)
print(f"Raw zlib round-trip OK: {raw_restored == original_text.encode()}")

Output:

Received 163 compressed bytes
Decompressed to 5,700 characters
First line: User data: name=Alice, email=alice@example.com, plan=pro
Raw zlib round-trip OK: True

Match the decompression function to the compression format: use gzip.decompress() for gzip bytes and zlib.decompress() for raw zlib bytes. Mixing them raises a BadGzipFile or zlib.error exception.

Real-Life Example: Log Archiver

A common DevOps task is rotating log files: compress yesterday’s logs and move them to an archive directory. Here is a complete log archiver that handles this pattern.

# log_archiver.py
import gzip
import os
import shutil
from pathlib import Path
from datetime import date

def archive_log(log_path: str, archive_dir: str) -> dict:
    """
    Compress a log file into the archive directory with a date suffix.
    Removes the original log file after successful compression.
    """
    log_path = Path(log_path)
    archive_dir = Path(archive_dir)
    archive_dir.mkdir(parents=True, exist_ok=True)

    today = date.today().isoformat()
    archive_name = f"{log_path.stem}_{today}.log.gz"
    archive_path = archive_dir / archive_name

    original_size = log_path.stat().st_size

    # Stream-compress the log file
    with open(log_path, "rb") as src, gzip.open(archive_path, "wb") as dst:
        shutil.copyfileobj(src, dst)

    compressed_size = archive_path.stat().st_size
    savings = 1 - (compressed_size / original_size)

    # Remove original after successful compression
    log_path.unlink()

    return {
        "archived_to": str(archive_path),
        "original_size_kb": original_size // 1024,
        "compressed_size_kb": compressed_size // 1024,
        "space_saved": f"{savings:.1%}",
    }

# --- Demo ---
# Create a fake log file
demo_log = Path("/tmp/app.log")
with open(demo_log, "w") as f:
    for i in range(5000):
        f.write(f"2026-05-15 {i//3600:02d}:{(i//60)%60:02d}:{i%60:02d} INFO endpoint=/api/v1/users latency={i%200}ms\n")

result = archive_log("/tmp/app.log", "/tmp/log_archive")
print(f"Archived to: {result['archived_to']}")
print(f"Original:    {result['original_size_kb']} KB")
print(f"Compressed:  {result['compressed_size_kb']} KB")
print(f"Space saved: {result['space_saved']}")

Output:

Archived to: /tmp/log_archive/app_2026-05-15.log.gz
Original:    292 KB
Compressed:  3 KB
Space saved: 99.0%

The shutil.copyfileobj(src, dst) call handles the streaming copy efficiently — it reads and writes in chunks automatically. The original log file is deleted only after the compressed archive is successfully written, preventing data loss if compression fails partway through. Extend this with a try/except block that restores the original if shutil.copyfileobj raises an exception.

292 KB in. 3 KB out. shutil.copyfileobj did the work.
292 KB in. 3 KB out. shutil.copyfileobj did the work.

Frequently Asked Questions

When should I use zlib vs gzip?

Use zlib when you are working with bytes in memory and do not need a file-compatible format — for example, compressing data before storing it in a database BLOB or sending it over a custom TCP protocol. Use gzip when you need a standard .gz file that other tools (gunzip, tar, Python’s gzip.open()) can read. The underlying compression algorithm is the same; the difference is the file format wrapper.

What about zip, bz2, and lzma?

Python includes modules for all common compression formats. zipfile handles .zip archives (multiple files). bz2 offers better compression ratios than gzip but is slower. lzma provides the best compression of all standard formats (.xz files) but is the slowest. For most log archival and data transfer use cases, gzip strikes the best balance of speed, compatibility, and compression ratio.

Do I always need to stream large files?

For files under ~100MB on a machine with adequate RAM, loading the whole file is fine. For files over 500MB, streaming is strongly recommended. The gzip.open() + shutil.copyfileobj() pattern handles streaming automatically without extra complexity. When in doubt, stream — the code is the same length and you avoid memory errors.

How do I compress an entire directory?

Use shutil.make_archive() to create a tar.gz of a directory: shutil.make_archive('/tmp/mybackup', 'gztar', '/path/to/dir'). This creates /tmp/mybackup.tar.gz. Alternatively, use the tarfile module for more control — open with tarfile.open('output.tar.gz', 'w:gz') and call add() for each file.

How do I verify a .gz file is not corrupted?

Use gzip.open(path, 'rb') and read the entire file in a try/except block. If the file is corrupted, gzip will raise a BadGzipFile or EOFError. For programmatic integrity checks, the gzip format includes a CRC-32 checksum that Python validates automatically during decompression — you do not need to compute it yourself.

Conclusion

Python’s zlib and gzip modules give you DEFLATE compression without any third-party dependencies. We covered compressing bytes in memory with zlib, reading and writing .gz files with gzip.open(), streaming large files using zlib.compressobj and shutil.copyfileobj, and choosing the right compression level for your use case. The log archiver example shows a complete, production-ready pattern for compressing log files on rotation.

Try extending the archiver to support a retention policy that deletes archives older than 30 days, or add multi-file support using the tarfile module to bundle multiple logs into a single .tar.gz archive.

For the full API reference, see the Python gzip documentation and Python zlib documentation.