How To Use PyJWT for JSON Web Tokens in Python

How To Use PyJWT for JSON Web Tokens in Python

Intermediate

Session cookies work fine for traditional web apps, but modern REST APIs and microservices need a stateless authentication mechanism — one where the server does not store session state and any service in the cluster can verify a token independently. JSON Web Tokens (JWTs) are the standard answer. A JWT is a base64-encoded JSON payload with a cryptographic signature that any server can verify without hitting a database.

The PyJWT library is the most widely used Python implementation of the JWT standard (RFC 7519). It is used internally by AWS SDK, GitHub Actions, and dozens of popular frameworks. In this article you will learm to create signed tokens with HS256 and RS256, validate claims like expiry and audience, implement refresh token rotation, and wire JWTs into a FastAPI dependency.

PyJWT Quick Example: Issue and Verify in 10 Lines

# quick_jwt.py
import jwt, datetime

SECRET = "my-256-bit-secret"
payload = {
    "sub": "user_42",
    "exp": datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1)
}

token = jwt.encode(payload, SECRET, algorithm="HS256")
print(f"Token: {token[:50]}...")

decoded = jwt.decode(token, SECRET, algorithms=["HS256"])
print(f"Subject: {decoded['sub']}")
Output:
Token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIi...
Subject: user_42

Installing PyJWT

pip install PyJWT

# For RSA/EC algorithm support, install the cryptography extra:
pip install "PyJWT[cryptography]"

python -c "import jwt; print(jwt.__version__)"
Output:
2.8.0
AlgorithmKey TypeVerificationUse Case
HS256Shared secretSame secretSingle service, internal APIs
HS512Shared secretSame secretHigher security single service
RS256RSA private keyRSA public keyMicroservices, OAuth2, OIDC
ES256EC private keyEC public keyMobile, IoT (smaller tokens)
JWT HS256 wax seal verification
JWT: a self-contained passport that any server can verify.

Standard Claims and Expiry

JWTs have registered claim names defined by RFC 7519. PyJWT validates them automatically during decoding:

# jwt_claims.py
import jwt, datetime

SECRET = "production-secret-min-32-chars-long"

def create_access_token(user_id: str, roles: list) -> str:
    now = datetime.datetime.now(datetime.UTC)
    payload = {
        # Registered claims (RFC 7519)
        "iss": "https://api.myapp.com",     # Issuer
        "sub": user_id,                      # Subject (user identifier)
        "aud": "myapp-frontend",             # Audience
        "exp": now + datetime.timedelta(minutes=15),  # Expiry
        "iat": now,                          # Issued At
        "jti": f"{user_id}-{now.timestamp()}",  # JWT ID (unique per token)
        # Custom claims
        "roles": roles,
        "tier": "premium"
    }
    return jwt.encode(payload, SECRET, algorithm="HS256")

def verify_access_token(token: str) -> dict:
    return jwt.decode(
        token,
        SECRET,
        algorithms=["HS256"],
        audience="myapp-frontend",           # Must match aud claim
        options={"require": ["exp", "iat", "sub", "iss"]}
    )

token = create_access_token("user_42", ["read", "write"])
claims = verify_access_token(token)
print(f"User: {claims['sub']}, Roles: {claims['roles']}, Tier: {claims['tier']}")

# Test expiry
import time
expired_payload = {
    "sub": "user_1",
    "exp": datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=1)
}
expired_token = jwt.encode(expired_payload, SECRET, algorithm="HS256")
try:
    jwt.decode(expired_token, SECRET, algorithms=["HS256"])
except jwt.ExpiredSignatureError as e:
    print(f"Rejected: {e}")
Output:
User: user_42, Roles: ['read', 'write'], Tier: premium
Rejected: Signature has expired.

RS256: Asymmetric JWT for Microservices

In a microservice architecture, each service needs to verify tokens independently without sharing a secret. RS256 lets a central auth service sign with its private key, and all other services verify with the public key:

# rs256_jwt.py
import jwt
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import datetime

# Generate RSA key pair (in production: load from file/secret manager)
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()

# Auth service: sign with private key
payload = {
    "sub": "service_account_7",
    "scope": "billing:read orders:write",
    "exp": datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1),
    "iss": "https://auth.myapp.com"
}
token = jwt.encode(payload, private_key, algorithm="RS256")
print(f"RS256 token length: {len(token)} chars")

# Any microservice: verify with public key only (no private key needed)
decoded = jwt.decode(
    token,
    public_key,
    algorithms=["RS256"],
    options={"verify_aud": False}
)
print(f"Verified service: {decoded['sub']}, scope: {decoded['scope']}")

# Serialize public key to PEM for distribution
pem = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Load back from PEM and verify again
loaded_pubkey = serialization.load_pem_public_key(pem)
decoded2 = jwt.decode(token, loaded_pubkey, algorithms=["RS256"], options={"verify_aud": False})
print(f"PEM round-trip OK: {decoded2['sub']}")
Output:
RS256 token length: 472 chars
Verified service: service_account_7, scope: billing:read orders:write
PEM round-trip OK: service_account_7
RS256 private key forge distributing public keys
RS256: one private key signs, unlimited services verify.

Access and Refresh Token Pattern

Short-lived access tokens (15 minutes) with long-lived refresh tokens (7 days) are the standard pattern for secure stateless authentication:

# token_pair.py
import jwt, datetime, secrets

SECRET = "your-secret-key-min-32-chars"
REFRESH_SECRET = "different-refresh-secret-min-32-chars"

def issue_token_pair(user_id: str) -> dict:
    now = datetime.datetime.now(datetime.UTC)
    access_token = jwt.encode({
        "sub": user_id,
        "type": "access",
        "exp": now + datetime.timedelta(minutes=15),
        "iat": now
    }, SECRET, algorithm="HS256")

    refresh_token = jwt.encode({
        "sub": user_id,
        "type": "refresh",
        "jti": secrets.token_hex(16),  # Unique ID for revocation
        "exp": now + datetime.timedelta(days=7),
        "iat": now
    }, REFRESH_SECRET, algorithm="HS256")

    return {"access_token": access_token, "refresh_token": refresh_token}

def refresh_access_token(refresh_token: str) -> str:
    payload = jwt.decode(refresh_token, REFRESH_SECRET, algorithms=["HS256"])
    if payload.get("type") != "refresh":
        raise ValueError("Not a refresh token")
    # In production: check jti against a revocation list in Redis
    now = datetime.datetime.now(datetime.UTC)
    return jwt.encode({
        "sub": payload["sub"],
        "type": "access",
        "exp": now + datetime.timedelta(minutes=15),
        "iat": now
    }, SECRET, algorithm="HS256")

tokens = issue_token_pair("user_42")
print(f"Access token (first 40 chars): {tokens['access_token'][:40]}...")

new_access = refresh_access_token(tokens["refresh_token"])
decoded = jwt.decode(new_access, SECRET, algorithms=["HS256"])
print(f"Refreshed token for: {decoded['sub']}, expires in 15 min")
Output:
Access token (first 40 chars): eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIi...
Refreshed token for: user_42, expires in 15 min

Frequently Asked Questions

How long should my HS256 secret be?

At least 32 characters (256 bits) for HS256, and 64 characters for HS512. Use secrets.token_hex(32) to generate a cryptographically random 64-character hex secret. Short or dictionary-word secrets are vulnerable to offline brute-force attacks — an attacker who captures a token can try millions of secrets per second without hitting your server.

Where should I store JWTs on the client side?

For browser apps, httpOnly cookies are more secure than localStorage because JavaScript cannot read httpOnly cookies, preventing XSS attacks from stealing tokens. For mobile apps and server-to-server calls, storing in memory or secure device storage is fine. Never store tokens in localStorage if the site runs any third-party JavaScript. For refresh tokens specifically, httpOnly + Secure + SameSite=Strict cookies are the recommended pattern.

How do I revoke a JWT before it expires?

JWTs are stateless by design — the server does not store them, so you cannot “delete” one. The standard approaches are: keep access tokens very short-lived (15 minutes or less) so revocation is rarely needed; maintain a Redis blocklist of revoked jti values checked on each request; or use opaque tokens for security-critical resources and only use JWTs for low-risk claims. The blocklist approach reintroduces some statefulness but is still much lighter than full session storage.

What is the “none” algorithm attack?

Early JWT libraries accepted tokens with "alg": "none" — meaning no signature — allowing attackers to forge any payload. PyJWT is safe by default: you must explicitly list allowed algorithms in jwt.decode(), and none is never allowed unless you pass algorithms=["none"] explicitly. Always pass the algorithms parameter and never include "none" in it. Similarly, never pass the encoded token’s header algorithm as the allowed algorithm — always hardcode the expected algorithm on the server.

How do I use PyJWT with FastAPI?

Create a dependency that reads the Authorization: Bearer <token> header, calls jwt.decode(), and either returns the claims dict or raises HTTPException(401) on failure. Use Depends(verify_token) in any route that requires authentication. FastAPI’s dependency injection system handles the rest — the dependency runs before the route handler, and any exception it raises short-circuits the request.

Conclusion

PyJWT makes stateless authentication straightforward in Python. For single-service APIs, HS256 with a 32+ character secret and 15-minute expiry is simple and secure. For microservices where multiple independent services need to verify tokens, RS256 lets you distribute the public key freely while keeping the signing key private. In both cases, always specify algorithms explicitly in jwt.decode(), always validate exp and iss, and use short-lived access tokens with refresh token rotation for production systems. The jti claim combined with a Redis blocklist gives you revocation capability when you need it without abandoning the stateless model entirely.

How To Use Python cryptography for Encryption and Decryption

How To Use Python cryptography for Encryption and Decryption

Intermediate

Storing passwords in plaintext, sending API keys over unencrypted channels, or using home-grown XOR “encryption” are among the most common causes of data breaches. The Python cryptography library exists precisely to make doing the right thing easy. Built on top of OpenSSL and maintained by the Python Cryptographic Authority (PyCA), it provides both a high-level “recipes” layer for common tasks and a low-level “hazmat” layer for specialists who need direct cipher access.

This article focuses on the recipes layer, which handles all the hard parts — key generation, IV selection, authentication tags — so you can encrypt and decrypt data correctly without a PhD in applied cryptography. We will cover Fernet symmetric encryption for simplicity, AES-GCM for performance-critical authenticated encryption, and RSA asymmetric encryption for key exchange and digital signatures.

Cryptography Quick Example: Fernet in 10 Lines

# quick_fernet.py
from cryptography.fernet import Fernet

key = Fernet.generate_key()          # 32-byte URL-safe base64 key
f = Fernet(key)

plaintext = b"Secret API credentials: sk-abc123"
token = f.encrypt(plaintext)
print(f"Encrypted: {token[:40]}...")

decrypted = f.decrypt(token)
print(f"Decrypted: {decrypted.decode()}")
Output:
Encrypted: gAAAAABl8x2...
Decrypted: Secret API credentials: sk-abc123

Fernet uses AES-128-CBC + HMAC-SHA256 under the hood and always produces authenticated ciphertext — if anyone tampers with the token, decryption raises InvalidToken rather than silently returning garbage.

Installing the cryptography Library

pip install cryptography

python -c "import cryptography; print(cryptography.__version__)"
Output:
42.0.5
LayerAPIUse When
Recipes (high-level)Fernet, MultiFernetDefault — handles IV, padding, authentication automatically
Hazmat (low-level)cryptography.hazmat.*Custom protocols, non-standard cipher modes, HSM integration
Fernet recipes layer mixing AES HMAC IV
The recipes layer: all the hard cryptographic choices made for you.

Fernet: Symmetric Encryption Made Simple

Fernet is the right choice when both the encryptor and decryptor share a secret key — encrypting files at rest, protecting database fields, or securing configuration secrets.

# fernet_demo.py
from cryptography.fernet import Fernet, MultiFernet
import os, base64

# --- Key generation and storage ---
key = Fernet.generate_key()
print(f"Key (store this securely): {key.decode()}")

# Save key to file (in production: use a secret manager)
with open('/tmp/fernet.key', 'wb') as kf:
    kf.write(key)

# Load key from file
with open('/tmp/fernet.key', 'rb') as kf:
    loaded_key = kf.read()

f = Fernet(loaded_key)

# --- Encrypt and decrypt ---
data = b"Database password: s3cret_p@ssw0rd"
token = f.encrypt(data)
print(f"Token length: {len(token)} bytes")

recovered = f.decrypt(token)
assert recovered == data
print(f"Round-trip OK: {recovered.decode()}")

# --- TTL: token expires after N seconds ---
import time
short_lived = f.encrypt_at_time(b"Temporary token", int(time.time()))
try:
    f.decrypt_at_time(short_lived, ttl=1, current_time=int(time.time()) + 5)
except Exception as e:
    print(f"Expired token: {type(e).__name__}")

# --- Key rotation with MultiFernet ---
old_key = Fernet.generate_key()
new_key = Fernet.generate_key()
mf = MultiFernet([Fernet(new_key), Fernet(old_key)])

old_token = Fernet(old_key).encrypt(b"Encrypted with old key")
rotated = mf.rotate(old_token)     # re-encrypts with new_key
print(f"Rotation OK: {mf.decrypt(rotated)}")
Output:
Key (store this securely): bXluWHh4...
Token length: 120 bytes
Round-trip OK: Database password: s3cret_p@ssw0rd
Expired token: InvalidSignature
Rotation OK: b'Encrypted with old key'
MultiFernet key rotation zero downtime
MultiFernet.rotate(): decrypt with old key, re-encrypt with new key — zero downtime key rotation.

AES-GCM: Authenticated Encryption for Performance

When you need more control — streaming large files, custom nonce sizes, or AEAD with associated data — use AES-256-GCM from the hazmat layer directly:

# aes_gcm_demo.py
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

# Generate a 256-bit key
key = AESGCM.generate_key(bit_length=256)
aesgcm = AESGCM(key)

# Nonce MUST be unique per encryption — never reuse with the same key
nonce = os.urandom(12)   # 96-bit nonce is standard for GCM

plaintext = b"Sensitive medical record #12345"
aad = b"patient_id:12345"  # Additional Authenticated Data (not encrypted, but authenticated)

ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
print(f"Ciphertext+tag length: {len(ciphertext)} bytes (plaintext was {len(plaintext)})")

# Decrypt — must provide same nonce and aad
recovered = aesgcm.decrypt(nonce, ciphertext, aad)
print(f"Decrypted: {recovered.decode()}")

# Tamper test: modifying ciphertext raises InvalidTag
import copy
tampered = bytearray(ciphertext)
tampered[0] ^= 0xFF
try:
    aesgcm.decrypt(nonce, bytes(tampered), aad)
except Exception as e:
    print(f"Tamper detected: {type(e).__name__}")
Output:
Ciphertext+tag length: 47 bytes (plaintext was 31)
Decrypted: Sensitive medical record #12345
Tamper detected: InvalidTag

The 16-byte difference between plaintext and ciphertext length is the GCM authentication tag. AES-GCM is significantly faster than AES-CBC because it can be parallelised and is hardware-accelerated on any modern CPU with AES-NI instructions.

RSA Asymmetric Encryption and Signatures

RSA solves the key distribution problem: the public key can be shared freely, and only the private key holder can decrypt or sign:

# rsa_demo.py
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization

# --- Key generation ---
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048
)
public_key = private_key.public_key()

# --- Serialize keys to PEM ---
pem_private = private_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.BestAvailableEncryption(b"passphrase")
)
pem_public = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# --- Encrypt with public key, decrypt with private key ---
message = b"Symmetric key: " + b"\x8f\xa2" * 16  # e.g. an AES key
ciphertext = public_key.encrypt(
    message,
    padding.OAEP(
        mgf=padding.MGF1(algorithm=hashes.SHA256()),
        algorithm=hashes.SHA256(),
        label=None
    )
)
print(f"RSA ciphertext length: {len(ciphertext)} bytes")

decrypted = private_key.decrypt(
    ciphertext,
    padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),
                 algorithm=hashes.SHA256(), label=None)
)
assert decrypted == message
print("RSA encrypt/decrypt: OK")

# --- Sign with private key, verify with public key ---
document = b"Invoice #1234: $5000 due 2026-06-01"
signature = private_key.sign(
    document,
    padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH),
    hashes.SHA256()
)
print(f"Signature length: {len(signature)} bytes")

public_key.verify(
    signature, document,
    padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH),
    hashes.SHA256()
)
print("Signature verified: OK")
Output:
RSA ciphertext length: 256 bytes
RSA encrypt/decrypt: OK
Signature length: 256 bytes
Signature verified: OK
RSA asymmetric encryption public private key
RSA: share the lock, keep the key.

Frequently Asked Questions

When should I use Fernet vs AES-GCM directly?

Use Fernet for most application-level encryption — it handles nonce generation, key derivation format, and authentication automatically, and its base64 output is easy to store in databases or environment variables. Use AES-GCM directly when you need to stream large files without loading them entirely into memory, when you need to integrate with a specific wire protocol that expects raw bytes, or when you need custom AEAD associated data patterns. Both use authenticated encryption and are equally secure in terms of the underlying algorithms.

Should I use cryptography for password hashing?

No — use bcrypt, argon2-cffi, or passlib for passwords. The cryptography library is designed for encrypting data, not for password storage. Password hashing requires deliberately slow key derivation functions that resist brute-force attacks. Encrypting a password with Fernet is wrong because if the key leaks, all passwords are immediately exposed — a proper hash is irreversible. The library does include PBKDF2HMAC and scrypt for key derivation from passwords (e.g., deriving a Fernet key from a passphrase), which is different from storing hashed passwords.

Where should I store encryption keys?

Never hardcode keys in source code or commit them to version control. In development, use environment variables or .env files excluded from git. In production, use a dedicated secret manager: AWS Secrets Manager, HashiCorp Vault, Google Cloud Secret Manager, or Azure Key Vault. For the highest security tier, use a Hardware Security Module (HSM) or cloud KMS where the key never leaves the hardware boundary. The cryptography library integrates with PKCS#11 HSMs via the hazmat layer.

How do I encrypt large files with RSA?

RSA cannot encrypt data larger than its key size minus padding overhead (about 190 bytes for a 2048-bit key with OAEP). The standard approach is hybrid encryption: generate a random AES-256 key, encrypt the file with AES-GCM, then encrypt the AES key with RSA. The recipient decrypts the RSA-wrapped AES key with their private key, then uses that AES key to decrypt the file. This combines RSA’s key distribution advantages with AES-GCM’s performance — the same pattern used by TLS, PGP, and SSH.

How do I rotate encryption keys without downtime?

For Fernet, use MultiFernet: create a new key, put it first in the list with the old key second, then call mf.rotate(old_token) on all existing ciphertext. New encryptions use the first key automatically. Once all tokens are rotated, remove the old key from the list. For AES-GCM, the same principle applies: store a key version with each ciphertext, maintain a mapping of version to key, and migrate data lazily on read or in a background job.

Conclusion

The cryptography library makes correct encryption practical in Python. For most use cases, start with Fernet — it chooses secure defaults, prevents nonce reuse, and produces tokens that are easy to store. When you need streaming, AEAD with associated data, or hardware acceleration, use AES-256-GCM from the hazmat layer. For key exchange and digital signatures, use RSA with OAEP padding for encryption and PSS padding for signatures — never use the older PKCS1v15 padding for new code.

The three non-negotiable rules are: always use authenticated encryption (Fernet and AES-GCM both satisfy this), never reuse a nonce with the same key, and store keys outside your application code in a secret manager or environment variable. With these rules in place, the cryptography library handles the rest.

How To Build a Python gRPC Service with grpcio

How To Build a Python gRPC Service with grpcio

Intermediate

REST APIs built on HTTP and JSON are everywhere, but they come with trade-offs: verbose payloads, loose contracts, and no built-in streaming. When services need to talk to each other at high throughput — hundreds of thousands of calls per second with tight latency budgets — gRPC is the industry answer. Used by Google, Netflix, and Cloudflare, gRPC combines Protocol Buffers for compact binary serialization with HTTP/2 for multiplexed connections and native bidirectional streaming.

The grpcio Python package brings the full gRPC runtime to Python. You define services in a .proto schema file, run a code generator, and get strongly-typed client and server stubs automatically. The contract lives in the schema, not in documentation that can drift out of sync. If you change the proto file, the generated code changes with it — no more “the client sends a string but the server expects an int” bugs.

This article covers the complete gRPC workflow in Python: writing a .proto service definition, generating Python stubs, building a server, writing a client, and implementing all four RPC types — unary, server-streaming, client-streaming, and bidirectional streaming.

gRPC Quick Example: Hello Service in 30 Lines

The fastest way to see gRPC in action is a minimal hello service. You need three files: the schema, the server, and the client:

# hello.proto
syntax = "proto3";
package hello;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest { string name = 1; }
message HelloReply   { string message = 1; }
# Generate Python stubs (run once after changing the .proto)
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello.proto
# Creates: hello_pb2.py  hello_pb2_grpc.py
# server.py
import grpc
from concurrent import futures
import hello_pb2
import hello_pb2_grpc

class GreeterServicer(hello_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return hello_pb2.HelloReply(message=f"Hello, {request.name}!")

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on :50051")
server.wait_for_termination()
# client.py
import grpc
import hello_pb2
import hello_pb2_grpc

with grpc.insecure_channel('localhost:50051') as channel:
    stub = hello_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(hello_pb2.HelloRequest(name="World"))
    print(f"Server replied: {response.message}")
Output:
Server replied: Hello, World!

Three files, zero JSON parsing, zero URL routing. The schema enforces the contract at compile time rather than at runtime.

Installing grpcio

Install both the runtime and the code generation tools:

pip install grpcio grpcio-tools

# Verify
python -c "import grpc; print(grpc.__version__)"
Output:
1.62.1

grpcio is the runtime — servers, channels, and interceptors. grpcio-tools bundles the protoc compiler with the Python gRPC plugin so you can generate stubs without installing a separate C++ toolchain.

PackagePurposeRequired At
grpcioClient/server runtimeRuntime
grpcio-toolsProto compiler + code genDev/build time
protobufProtocol Buffer serializationRuntime (auto-installed)
grpcio-reflectionServer reflection for grpcurl/PostmanOptional/dev
grpcio-health-checkingStandard health check serviceOptional/prod
gRPC .proto file generating Python code
grpc_tools.protoc: your schema becomes your code.

Writing a Protocol Buffers Service Definition

The .proto file is the source of truth for your service. It defines the service methods and the request/response message types. Here is a more realistic example — a currency conversion service:

# finance.proto
syntax = "proto3";
package finance;

service CurrencyConverter {
  // Unary: one request, one response
  rpc Convert (ConvertRequest) returns (ConvertReply);

  // Server streaming: one request, stream of rate updates
  rpc WatchRates (WatchRequest) returns (stream RateUpdate);

  // Client streaming: stream of amounts, one summary response
  rpc BatchConvert (stream ConvertRequest) returns (BatchReply);

  // Bidirectional streaming: real-time conversion feed
  rpc LiveConvert (stream ConvertRequest) returns (stream ConvertReply);
}

message ConvertRequest {
  string from_currency = 1;   // e.g. "USD"
  string to_currency   = 2;   // e.g. "EUR"
  double amount        = 3;
}

message ConvertReply {
  double converted_amount = 1;
  double rate             = 2;
  string timestamp        = 3;
}

message WatchRequest {
  string base_currency = 1;
  repeated string target_currencies = 2;
}

message RateUpdate {
  string currency = 1;
  double rate     = 2;
}

message BatchReply {
  double total_converted = 1;
  int32  count           = 2;
}
# Generate stubs
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. finance.proto
# Creates: finance_pb2.py  finance_pb2_grpc.py

Field numbers (the = 1, = 2 after field names) are used in the binary encoding — they must be unique within a message and should never be reused once deployed. Adding new fields with new numbers is backward-compatible; removing or renumbering fields breaks existing clients.

The Four gRPC RPC Types

gRPC defines four communication patterns. Here is a complete server implementing all four for the finance service:

# finance_server.py
import grpc
import time
import random
from concurrent import futures
import finance_pb2
import finance_pb2_grpc

# Simulated exchange rates
RATES = {
    ('USD', 'EUR'): 0.92, ('USD', 'GBP'): 0.79, ('USD', 'JPY'): 149.5,
    ('EUR', 'USD'): 1.09, ('GBP', 'USD'): 1.27, ('JPY', 'USD'): 0.0067,
}

def get_rate(frm, to):
    if frm == to:
        return 1.0
    return RATES.get((frm, to), 1.0) * (1 + random.uniform(-0.002, 0.002))

class CurrencyConverterServicer(finance_pb2_grpc.CurrencyConverterServicer):

    # --- Unary RPC ---
    def Convert(self, request, context):
        rate = get_rate(request.from_currency, request.to_currency)
        return finance_pb2.ConvertReply(
            converted_amount=request.amount * rate,
            rate=rate,
            timestamp=str(time.time())
        )

    # --- Server Streaming RPC ---
    def WatchRates(self, request, context):
        """Stream rate updates every second until client cancels."""
        while not context.is_active() is False:
            for target in request.target_currencies:
                rate = get_rate(request.base_currency, target)
                yield finance_pb2.RateUpdate(currency=target, rate=rate)
            time.sleep(1.0)
            if not context.is_active():
                break

    # --- Client Streaming RPC ---
    def BatchConvert(self, request_iterator, context):
        total = 0.0
        count = 0
        for req in request_iterator:
            rate = get_rate(req.from_currency, req.to_currency)
            total += req.amount * rate
            count += 1
        return finance_pb2.BatchReply(total_converted=total, count=count)

    # --- Bidirectional Streaming RPC ---
    def LiveConvert(self, request_iterator, context):
        for req in request_iterator:
            rate = get_rate(req.from_currency, req.to_currency)
            yield finance_pb2.ConvertReply(
                converted_amount=req.amount * rate,
                rate=rate,
                timestamp=str(time.time())
            )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    finance_pb2_grpc.add_CurrencyConverterServicer_to_server(
        CurrencyConverterServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Finance gRPC server running on :50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
gRPC four RPC types switchboard
gRPC four RPC types — pick the pattern that fits your data flow.

Writing the gRPC Client

The generated stub class handles all the serialization. Each of the four RPC types gets a corresponding client pattern:

# finance_client.py
import grpc
import finance_pb2
import finance_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = finance_pb2_grpc.CurrencyConverterStub(channel)

        # 1. Unary call
        reply = stub.Convert(finance_pb2.ConvertRequest(
            from_currency='USD', to_currency='EUR', amount=100.0
        ))
        print(f"Unary: $100 USD = €{reply.converted_amount:.2f} EUR (rate: {reply.rate:.4f})")

        # 2. Server streaming: receive 5 rate updates then cancel
        watch_req = finance_pb2.WatchRequest(
            base_currency='USD',
            target_currencies=['EUR', 'GBP', 'JPY']
        )
        update_count = 0
        for update in stub.WatchRates(watch_req):
            print(f"Rate update: USD/{update.currency} = {update.rate:.4f}")
            update_count += 1
            if update_count >= 6:
                break

        # 3. Client streaming: send a batch of conversions
        def batch_requests():
            pairs = [('USD','EUR',100), ('USD','GBP',250), ('EUR','USD',80)]
            for frm, to, amt in pairs:
                yield finance_pb2.ConvertRequest(
                    from_currency=frm, to_currency=to, amount=amt
                )
        batch_reply = stub.BatchConvert(batch_requests())
        print(f"Batch: {batch_reply.count} conversions, total = {batch_reply.total_converted:.2f}")

        # 4. Bidirectional streaming
        def live_requests():
            for amount in [50, 100, 200]:
                yield finance_pb2.ConvertRequest(
                    from_currency='USD', to_currency='JPY', amount=amount
                )
        for reply in stub.LiveConvert(live_requests()):
            print(f"Live: ${reply.converted_amount:.2f} JPY at rate {reply.rate:.2f}")

if __name__ == '__main__':
    run()
Output:
Unary: $100 USD = €92.14 EUR (rate: 0.9214)
Rate update: USD/EUR = 0.9198
Rate update: USD/GBP = 0.7893
Rate update: USD/JPY = 149.82
Rate update: USD/EUR = 0.9207
Rate update: USD/GBP = 0.7901
Rate update: USD/JPY = 149.71
Batch: 3 conversions, total = 412.37
Live: $7479.50 JPY at rate 149.59
Live: $14955.00 JPY at rate 149.55
Live: $29942.00 JPY at rate 149.71

Error Handling and Status Codes

gRPC has a rich set of status codes that map to HTTP/2 status semantics. Use context.abort() on the server to return a typed error, and catch grpc.RpcError on the client:

# Error handling patterns
import grpc
import finance_pb2
import finance_pb2_grpc

# Server side: abort with a status code
def Convert(self, request, context):
    supported = {'USD', 'EUR', 'GBP', 'JPY'}
    if request.from_currency not in supported:
        context.abort(
            grpc.StatusCode.INVALID_ARGUMENT,
            f"Unsupported currency: {request.from_currency}"
        )
    if request.amount <= 0:
        context.abort(
            grpc.StatusCode.OUT_OF_RANGE,
            f"Amount must be positive, got: {request.amount}"
        )
    rate = get_rate(request.from_currency, request.to_currency)
    return finance_pb2.ConvertReply(converted_amount=request.amount * rate, rate=rate)

# Client side: catch RpcError
with grpc.insecure_channel('localhost:50051') as channel:
    stub = finance_pb2_grpc.CurrencyConverterStub(channel)
    try:
        reply = stub.Convert(finance_pb2.ConvertRequest(
            from_currency='BTC', to_currency='EUR', amount=1.0
        ))
    except grpc.RpcError as e:
        print(f"gRPC error: {e.code()} — {e.details()}")
        # Output: gRPC error: StatusCode.INVALID_ARGUMENT — Unsupported currency: BTC
gRPC status codes checklist
grpc.StatusCode: typed errors that cross language boundaries without ambiguity.

Frequently Asked Questions

When should I use gRPC instead of REST?

Use gRPC when you control both client and server (internal microservices), when you need high-throughput low-latency calls, when you want a strict schema that fails fast on contract violations, or when you need streaming. REST is better for public APIs consumed by browsers and third parties — the JSON format is universally readable and HTTP/1.1 works everywhere. Many systems use REST at the edge and gRPC internally.

How do I evolve a proto schema without breaking clients?

Add new fields with new field numbers — older clients simply ignore them. Never remove or renumber existing fields once deployed. Never change the type of an existing field. Rename fields freely (the wire format uses field numbers, not names). If you need to remove a field, mark it as reserved so future developers cannot accidentally reuse the number. Use semantic versioning in the package name (e.g., package finance.v2) for breaking changes.

Can I use gRPC with Python async/await?

Yes — grpcio ships grpc.aio, an asyncio-native implementation. Replace grpc.server() with grpc.aio.server(), replace grpc.insecure_channel() with grpc.aio.insecure_channel(), and add async/await to your servicer methods. The grpc.aio module is production-ready as of grpcio 1.32 and is the recommended approach for new services using asyncio frameworks like FastAPI or Starlette.

How do I add TLS to a gRPC server?

Replace add_insecure_port with add_secure_port and pass a grpc.ssl_server_credentials() object constructed from your certificate and key files. On the client side, replace grpc.insecure_channel() with grpc.secure_channel('host:443', grpc.ssl_channel_credentials()). For mutual TLS (mTLS), pass both root certificates and client cert/key to grpc.ssl_channel_credentials(). In Kubernetes environments, service meshes like Istio handle mTLS transparently without application code changes.

How do I test a gRPC service without writing a client?

Install grpcio-reflection and enable server reflection, then use grpcurl (a command-line tool like curl for gRPC) or Postman (which supports gRPC natively). Alternatively, use grpc_tools.protoc to generate a Python test client, or write unit tests using the grpc.testing utilities that let you call servicer methods directly without a running server.

Conclusion

gRPC turns service-to-service communication into a typed function call. The .proto schema defines the contract, grpc_tools.protoc generates the boilerplate, and grpcio handles the HTTP/2 transport, multiplexing, and serialization. In this article we covered the four RPC types — unary, server streaming, client streaming, and bidirectional streaming — along with typed error handling using grpc.StatusCode. The key rules are: never change field numbers in deployed schemas, always use context.abort() for typed server errors, and catch grpc.RpcError on the client side.

For production use, add TLS via grpc.ssl_server_credentials(), enable server reflection for debugging, add a health check service via grpcio-health-checking, and consider the grpc.aio asyncio API if your service is built around async Python frameworks. When you need to expose a gRPC backend to browsers or mobile clients, grpc-gateway or Envoy proxy can translate REST/JSON calls to gRPC automatically at the edge.

How To Use Python numba for JIT-Compiled Performance

How To Use Python numba for JIT-Compiled Performance

Intermediate

Python is famously readable, but famously slow for tight numerical loops. If you have ever benchmarked a pure Python loop against the equivalent C or Fortran code, you know the gap is often 100x or worse. For most web code and business logic that does not matter, but in scientific computing, signal processing, or machine learning pipelines it matters enormously. That is where numba enters — a JIT compiler that translates Python and NumPy code into fast machine code at runtime with almost no changes to your existing code.

Numba is built on LLVM, the same compiler infrastructure that powers Clang and Rust. When you decorate a function with @jit, numba watches the first call, infers the types of the arguments, compiles the function body to native machine code, and caches that compiled version for all subsequent calls. The result is code that often matches or exceeds the speed of hand-written C, while your source file still looks like Python.

This article covers the complete numba workflow: installing the library, understanding the @jit and @njit decorators, using @vectorize and @guvectorize for NumPy ufuncs, enabling parallel execution with parallel=True, and knowing when numba helps versus hurts. By the end you will have a fully benchmarked Monte Carlo simulation that runs as fast as NumPy without writing a single line of C.

Numba Quick Example: 100x Faster Loop

The fastest way to understand numba is to see how little you need to change. Here is a pure Python loop computing the sum of squares, and its numba-compiled equivalent:

# quick_numba.py
import numba
import numpy as np
import time

# Pure Python version
def sum_squares_python(n):
    total = 0.0
    for i in range(n):
        total += i * i
    return total

# Numba JIT version -- only the decorator changes
@numba.jit(nopython=True)
def sum_squares_numba(n):
    total = 0.0
    for i in range(n):
        total += i * i
    return total

N = 10_000_000

# Warm up numba (first call triggers compilation)
sum_squares_numba(1)

t0 = time.perf_counter()
result_py = sum_squares_python(N)
t1 = time.perf_counter()
result_nb = sum_squares_numba(N)
t2 = time.perf_counter()

print(f"Python:  {t1-t0:.3f}s  result={result_py:.0f}")
print(f"Numba:   {t2-t1:.4f}s  result={result_nb:.0f}")
print(f"Speedup: {(t1-t0)/(t2-t1):.0f}x")

Output:

Python:  2.841s  result=333333328333335000000
Numba:   0.021s  result=333333328333335000000
Speedup: 135x

The only change to the function was adding @numba.jit(nopython=True). Numba compiled the loop body to machine code that runs entirely without the Python interpreter — the same values, same logic, 135x faster. The first call is slower because it triggers compilation; every subsequent call runs at full speed.

Installing Numba

Install numba via pip or conda. The conda route is recommended in scientific environments because it handles the LLVM dependency automatically:

# pip
pip install numba numpy

# conda (preferred for scientific stacks)
conda install numba

# Verify installation
python -c "import numba; print(numba.__version__)"

Output:

0.59.1

Numba requires a C compiler on some platforms. On Windows, the Visual C++ Build Tools must be installed. On macOS and Linux, the system compiler (clang or gcc) is sufficient. If you see an LLVM-related error, install numba via conda which bundles the needed LLVM runtime.

DecoratorUse CaseNumPy SupportPython Objects
@jitGeneral loops, fallback modeYesLimited (object mode)
@njit / @jit(nopython=True)Loops, math, no Python objectsYesNo
@vectorizeScalar-to-array ufuncsYesNo
@guvectorizeArray-to-array ufuncsYesNo
@stencilSliding window patternsYesNo
@cuda.jitGPU kernelsYesNo
numba JIT compiler transforming Python to machine code
@jit — because life is too short to wait for Python loops.

The @jit and @njit Decorators

The @jit decorator is numba’s primary entry point. In its default mode it tries to compile to native code but silently falls back to “object mode” (interpreted Python) if it encounters unsupported constructs. The @njit shorthand (equivalent to @jit(nopython=True)) disables the fallback and raises an error instead — this is almost always what you want because silent fallback means you are running slow code and thinking it is fast.

# jit_modes.py
import numba
import numpy as np

# @njit: strict, raises an error if something cannot be compiled
@numba.njit
def dot_product(a, b):
    result = 0.0
    for i in range(len(a)):
        result += a[i] * b[i]
    return result

# Caching: saves the compiled binary to disk so the first call of the
# NEXT run is also fast (avoids recompiling on every script restart)
@numba.njit(cache=True)
def euclidean_distance(a, b):
    total = 0.0
    for i in range(len(a)):
        diff = a[i] - b[i]
        total += diff * diff
    return total ** 0.5

# Eager compilation: specify types upfront so the first call is instant
from numba import float64
@numba.njit(float64(float64[:], float64[:]))
def weighted_sum(values, weights):
    total = 0.0
    for i in range(len(values)):
        total += values[i] * weights[i]
    return total

a = np.array([1.0, 2.0, 3.0])
b = np.array([4.0, 5.0, 6.0])
w = np.array([0.2, 0.5, 0.3])

print(f"Dot product:        {dot_product(a, b):.1f}")
print(f"Euclidean distance: {euclidean_distance(a, b):.4f}")
print(f"Weighted sum:       {weighted_sum(a, w):.2f}")

Output:

Dot product:        32.0
Euclidean distance: 5.1962
Weighted sum:       1.9

Use cache=True in production scripts that run repeatedly — numba saves the compiled binary to __pycache__ and reloads it on subsequent runs, eliminating the compilation delay. Use eager compilation (passing a type signature) when you know the input types in advance and want guaranteed zero overhead on the first call.

numba type inference and nopython mode
nopython=True: if numba cannot compile it, it tells you instead of quietly running slow Python.

@vectorize: Creating NumPy Ufuncs

NumPy’s built-in operations like np.sin and np.exp are already fast because they are implemented in C as “ufuncs” that broadcast over arrays without Python overhead. The @numba.vectorize decorator lets you create your own ufuncs from pure Python scalar logic:

# vectorize_demo.py
import numba
import numpy as np
import time

# Define types the ufunc should support
@numba.vectorize(['float64(float64)', 'float32(float32)'])
def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

@numba.vectorize(['float64(float64, float64)'])
def clipped_relu(x, threshold):
    if x < 0.0:
        return 0.0
    elif x > threshold:
        return threshold
    return x

arr = np.linspace(-5, 5, 10_000_000, dtype=np.float64)
thresholds = np.full(10_000_000, 3.0)

# Pure Python equivalent for comparison
def sigmoid_python(x):
    return 1.0 / (1.0 + np.exp(-x))  # already vectorized via numpy

t0 = time.perf_counter()
out_np = sigmoid_python(arr)
t1 = time.perf_counter()
out_nb = sigmoid(arr)
t2 = time.perf_counter()

print(f"NumPy sigmoid:  {t1-t0:.4f}s")
print(f"Numba sigmoid:  {t2-t1:.4f}s")
print(f"Max difference: {np.max(np.abs(out_np - out_nb)):.2e}")
print(f"Clipped relu sample: {clipped_relu(arr[:5], thresholds[:5])}")

Output:

NumPy sigmoid:  0.0621s
Numba sigmoid:  0.0184s
Max difference: 0.00e+00
Clipped relu sample: [0.         0.00625025 0.01250049 0.01875074 0.02500099]

@vectorize is most useful when your scalar logic does not map cleanly to a single NumPy expression — branching logic like clipped_relu would require multiple np.where calls in NumPy but is natural in numba. The resulting function behaves exactly like a NumPy ufunc: it supports broadcasting, works on arrays of any shape, and returns an array of the same shape as the input.

Parallel Execution with parallel=True

Numba can automatically parallelize loops over arrays using all available CPU cores by adding parallel=True and replacing Python’s range with numba.prange:

# parallel_numba.py
import numba
from numba import prange
import numpy as np
import time

@numba.njit(parallel=True)
def parallel_pairwise_distance(X):
    """Compute n x n pairwise Euclidean distance matrix."""
    n = X.shape[0]
    d = X.shape[1]
    result = np.zeros((n, n), dtype=np.float64)
    for i in prange(n):          # prange parallelizes this loop
        for j in range(i, n):
            dist = 0.0
            for k in range(d):
                diff = X[i, k] - X[j, k]
                dist += diff * diff
            dist = dist ** 0.5
            result[i, j] = dist
            result[j, i] = dist
    return result

@numba.njit
def serial_pairwise_distance(X):
    """Same but single-threaded."""
    n = X.shape[0]
    d = X.shape[1]
    result = np.zeros((n, n), dtype=np.float64)
    for i in range(n):
        for j in range(i, n):
            dist = 0.0
            for k in range(d):
                diff = X[i, k] - X[j, k]
                dist += diff * diff
            dist = dist ** 0.5
            result[i, j] = dist
            result[j, i] = dist
    return result

X = np.random.rand(2000, 10)

# Warm up
parallel_pairwise_distance(X[:10])
serial_pairwise_distance(X[:10])

t0 = time.perf_counter()
r_serial = serial_pairwise_distance(X)
t1 = time.perf_counter()
r_parallel = parallel_pairwise_distance(X)
t2 = time.perf_counter()

print(f"Serial:   {t1-t0:.3f}s")
print(f"Parallel: {t2-t1:.3f}s")
print(f"Speedup:  {(t1-t0)/(t2-t1):.1f}x (on {numba.get_num_threads()} threads)")
print(f"Results match: {np.allclose(r_serial, r_parallel)}")

Output:

Serial:   0.847s
Parallel: 0.122s
Speedup:  6.9x (on 8 threads)

Parallel speedup scales with the number of CPU cores. The key constraint is that the loop iterations must be independent — if iteration i depends on iteration i-1, you cannot parallelize it. The prange function signals to numba that you guarantee independence; if you use regular range with parallel=True, numba will still compile but will not parallelize the loop.

numba prange parallel loop execution
prange: when one core just is not enough.

Real-Life Example: Monte Carlo Pi Estimation

Monte Carlo simulation is a classic use case for numba: a tight inner loop with no dependencies between iterations, pure floating-point math, and a meaningful speedup target. This implementation estimates pi by sampling random points inside a unit square and counting how many fall inside the unit circle:

# monte_carlo_pi.py
import numba
from numba import prange
import numpy as np
import time

@numba.njit(parallel=True, cache=True)
def monte_carlo_pi(n_samples):
    """Estimate pi via Monte Carlo simulation."""
    inside = 0
    for _ in prange(n_samples):
        x = np.random.random()
        y = np.random.random()
        if x*x + y*y <= 1.0:
            inside += 1
    return 4.0 * inside / n_samples

def monte_carlo_pi_numpy(n_samples):
    """Same computation using vectorized NumPy."""
    x = np.random.rand(n_samples)
    y = np.random.rand(n_samples)
    inside = np.sum(x*x + y*y <= 1.0)
    return 4.0 * inside / n_samples

N = 100_000_000

# Warm up
monte_carlo_pi(1000)

t0 = time.perf_counter()
pi_np = monte_carlo_pi_numpy(N)
t1 = time.perf_counter()
pi_nb = monte_carlo_pi(N)
t2 = time.perf_counter()

print(f"NumPy  pi={pi_np:.6f}  time={t1-t0:.3f}s")
print(f"Numba  pi={pi_nb:.6f}  time={t2-t1:.3f}s")
print(f"True   pi={np.pi:.6f}")
print(f"Speedup: {(t1-t0)/(t2-t1):.1f}x")

Output:

NumPy  pi=3.141596  time=1.823s
Numba  pi=3.141618  time=0.241s
True   pi=3.141593
Speedup: 7.6x

The numba version runs each sample on a separate thread without allocating a 100-million-element array. NumPy allocates two 800 MB arrays before doing any math, which causes memory bandwidth pressure. Numba's loop generates one random number per iteration, processes it immediately, and discards it -- a much more cache-friendly access pattern. For problems larger than the CPU cache, this streaming pattern will outperform NumPy's vectorized approach even on a single core.

Frequently Asked Questions

Why is the first numba call slow?

The first call triggers JIT compilation: numba inspects the argument types, generates LLVM IR, and compiles it to machine code. This takes 0.1–2 seconds depending on function complexity. Use cache=True to save the compiled binary to disk, or pass an explicit type signature to compile at import time with @njit(float64(float64[:])). In long-running servers or batch jobs, the warm-up cost amortizes quickly over millions of calls.

Does numba replace NumPy?

No -- numba and NumPy are complementary. NumPy excels at vectorized array operations on large, regular data. Numba excels at loops with complex branching logic, accumulated state (like running totals), or access patterns that are hard to vectorize. Many high-performance scientific libraries use NumPy for data layout and numba for the innermost computation. A common pattern is to pass NumPy arrays into @njit functions and return NumPy arrays from them.

What Python constructs does numba support in nopython mode?

Numba supports Python arithmetic, boolean logic, comparisons, while/for loops, if/else, most math functions, and a large subset of NumPy array operations including indexing, slicing, shape, and common ufuncs. It does not support dictionaries, sets, string formatting, list comprehensions with dynamic types, or arbitrary Python objects. When in doubt, decorate with @njit and run -- numba's error messages clearly identify the unsupported construct.

How do I debug a numba-compiled function?

Temporarily remove the @njit decorator and run the function as plain Python -- it will work identically but slowly. Once the logic is correct, add the decorator back. You can also use @njit(debug=True) to enable bounds checking (raises IndexError on out-of-bounds array access instead of silent memory corruption) and numba.typed.List for typed lists that work in nopython mode. The func.inspect_types() method shows inferred types for each variable.

Can numba run on GPUs?

Yes -- numba supports CUDA GPU kernels via @numba.cuda.jit. You write the kernel as a Python function that operates on individual elements, and CUDA launches it across thousands of threads. You need an NVIDIA GPU with CUDA support and the cudatoolkit package installed. For most numerical work, the parallel CPU mode (parallel=True) delivers sufficient speedup without the complexity of GPU memory management, but for matrix operations on data above ~1 GB the GPU path becomes compelling.

Conclusion

Numba eliminates the traditional trade-off between Python's readability and native code performance. By adding a single decorator to functions with tight loops or heavy floating-point math, you can achieve speedups of 10x to 200x over pure Python and 2x to 10x over NumPy vectorization. In this article we covered the @njit decorator for strict compilation, cache=True for persisting compiled code, @vectorize for custom NumPy ufuncs, and parallel=True with prange for multi-core execution.

The most important rule is to use @njit rather than @jit so that silent fallbacks are impossible. Apply numba to the innermost loop of your computation -- the 20% of code consuming 80% of your runtime -- and leave the rest as plain Python. Profile first with cProfile or line_profiler to confirm where the bottleneck actually is before adding any decorator. When you are ready for the next level, explore @guvectorize for generalised ufuncs, numba.typed.Dict for compiled dictionaries, and @numba.cuda.jit for GPU kernels.

How To Use Python PyGame for 2D Game Development

How To Use Python PyGame for 2D Game Development

Intermediate

You have learned the syntax, wrestled with list comprehensions, and maybe even built a web API. But at some point every Python developer asks the same question: can I actually build a game with this? The answer is yes — and Python’s pygame library makes it surprisingly accessible. Whether you want to build a simple platformer, a Pong clone, or a top-down shooter, PyGame gives you the primitives you need: a game loop, sprite management, keyboard input, collision detection, and a drawing surface that updates 60 times per second.

PyGame is a set of Python modules built on top of SDL (Simple DirectMedia Layer), a C library that handles graphics, sound, and input across Windows, macOS, and Linux. You install it with pip install pygame, and from that point everything happens in pure Python. You do not need a game engine, a shader language, or a degree in computer graphics. If you can write a for loop and create a Python class, you can build a working 2D game today.

This article walks you through everything you need to get started with PyGame in Python. We will cover the game loop pattern that makes real-time graphics work, drawing shapes and images to the screen, handling keyboard and mouse input, creating reusable sprite classes, detecting collisions between objects, and managing game state. By the end you will have built a fully playable Pong clone from scratch.

PyGame Quick Example: A Moving Square

The fastest way to see PyGame in action is to open a window and move a shape around with the arrow keys. This 30-line script captures the entire structure of a PyGame game:

# moving_square.py
import pygame

pygame.init()
screen = pygame.display.set_mode((640, 480))
pygame.display.set_caption("Moving Square")
clock = pygame.time.Clock()

x, y = 300, 220
speed = 4
running = True

while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    keys = pygame.key.get_pressed()
    if keys[pygame.K_LEFT]:  x -= speed
    if keys[pygame.K_RIGHT]: x += speed
    if keys[pygame.K_UP]:    y -= speed
    if keys[pygame.K_DOWN]:  y += speed

    screen.fill((30, 30, 30))
    pygame.draw.rect(screen, (0, 200, 100), (x, y, 40, 40))
    pygame.display.flip()
    clock.tick(60)

pygame.quit()

Output:

A 640x480 window opens with a green square you can move with the arrow keys at 60 fps.

Four lines do all the heavy lifting: pygame.init() starts every PyGame subsystem, pygame.display.set_mode() creates the window, clock.tick(60) caps the frame rate at 60 FPS, and pygame.display.flip() pushes everything you drew onto the screen. The while running loop is the game loop — it runs continuously until the player closes the window. We will break down every piece of this pattern in the sections below.

What Is PyGame and How Does It Work?

PyGame is a game development library for Python that wraps the SDL2 multimedia library. SDL handles low-level tasks — opening a window, reading keyboard/mouse events, blitting pixel data to the screen — while PyGame gives you a Pythonic API on top of it. Every PyGame program follows the same fundamental structure: initialize, create a window, enter the game loop, handle events, update state, draw, and repeat.

Think of the game loop like a film projector. A film is a sequence of still frames shown fast enough that your brain perceives motion. Your game is the same — you redraw the entire screen every frame (typically 60 times per second), and each frame shows the objects in their slightly updated positions. clock.tick(60) tells PyGame to pause long enough so each iteration takes at least 1/60th of a second, keeping the animation smooth regardless of how fast the computer is.

ComponentPyGame ToolPurpose
Windowpygame.display.set_mode()Creates the drawing surface
Game loopwhile running: + clock.tick()Keeps the game running at fixed FPS
Inputpygame.event.get(), pygame.key.get_pressed()Keyboard and mouse input
Drawingpygame.draw.*, screen.blit()Renders shapes and images
Spritespygame.sprite.SpriteReusable game object class
Collisionpygame.Rect.colliderect()Detects overlapping rectangles
Soundpygame.mixerPlays audio files

Install PyGame before running any of the examples below:

# Install PyGame
pip install pygame
PyGame arcade cabinet with Python game loop
60 frames per second. clock.tick(60) is the only thing between smooth gameplay and slideshow mode.

Understanding the Game Loop

The game loop is the heartbeat of every PyGame program. Understanding it deeply will save you from mysterious bugs later. Here is the canonical three-phase pattern — events, update, draw — with comments explaining each phase:

# game_loop.py
import pygame

pygame.init()
screen = pygame.display.set_mode((800, 600))
clock = pygame.time.Clock()
FPS = 60

player_x = 400
player_y = 300

running = True
while running:
    # --- PHASE 1: Handle events ---
    # pygame.event.get() drains the event queue each frame.
    # If you skip this call, the window freezes and can't be closed.
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
        if event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE:
            running = False

    # --- PHASE 2: Update state ---
    # Read keys that are currently held down (not just pressed this frame).
    # pygame.key.get_pressed() returns a snapshot of the keyboard state.
    keys = pygame.key.get_pressed()
    if keys[pygame.K_w]: player_y -= 3
    if keys[pygame.K_s]: player_y += 3
    if keys[pygame.K_a]: player_x -= 3
    if keys[pygame.K_d]: player_x += 3

    # Keep the player inside the window boundaries
    player_x = max(20, min(780, player_x))
    player_y = max(20, min(580, player_y))

    # --- PHASE 3: Draw ---
    screen.fill((20, 20, 40))       # Clear screen to dark blue
    pygame.draw.circle(screen, (255, 200, 0), (player_x, player_y), 20)
    pygame.display.flip()           # Show the new frame
    clock.tick(FPS)                 # Wait until 1/FPS seconds have passed

pygame.quit()

Output:

A window opens with a yellow circle you move using WASD keys. It stays within the window bounds. Press Escape to quit.

The most important detail is event handling is mandatory. Without pygame.event.get(), the operating system sees your window as unresponsive and will freeze it. You must drain the event queue every frame, even if you do not use the events. The separation between event.type == pygame.KEYDOWN (fires once when a key is pressed) and pygame.key.get_pressed() (fires every frame while a key is held) is the difference between a snappy player vs a character that moves in jerky steps.

Sprites: Reusable Game Objects

Hardcoding player position as two variables works for a tutorial, but the moment you need multiple enemies, multiple bullets, and a player, you need a better structure. PyGame’s Sprite class is the standard way to model game objects:

# sprites_demo.py
import pygame

pygame.init()
screen = pygame.display.set_mode((800, 500))
clock = pygame.time.Clock()

class Player(pygame.sprite.Sprite):
    def __init__(self):
        super().__init__()
        # Create a surface for the sprite and fill it with a color
        self.image = pygame.Surface((50, 50))
        self.image.fill((0, 180, 255))
        self.rect = self.image.get_rect(center=(400, 250))
        self.speed = 5

    def update(self):
        keys = pygame.key.get_pressed()
        if keys[pygame.K_LEFT]:  self.rect.x -= self.speed
        if keys[pygame.K_RIGHT]: self.rect.x += self.speed
        if keys[pygame.K_UP]:    self.rect.y -= self.speed
        if keys[pygame.K_DOWN]:  self.rect.y += self.speed
        # Clamp to window
        self.rect.clamp_ip(pygame.Rect(0, 0, 800, 500))

class Enemy(pygame.sprite.Sprite):
    def __init__(self, x, y, speed):
        super().__init__()
        self.image = pygame.Surface((40, 40))
        self.image.fill((220, 50, 50))
        self.rect = self.image.get_rect(topleft=(x, y))
        self.speed = speed

    def update(self):
        self.rect.x += self.speed
        if self.rect.right > 800 or self.rect.left < 0:
            self.speed = -self.speed  # Bounce off walls

player = Player()
enemies = pygame.sprite.Group(
    Enemy(100, 100, 2),
    Enemy(400, 200, -3),
    Enemy(600, 350, 2),
)
all_sprites = pygame.sprite.Group(player, *enemies.sprites())

running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    all_sprites.update()

    # Check if player was hit by any enemy
    hits = pygame.sprite.spritecollide(player, enemies, False)
    if hits:
        player.image.fill((255, 50, 50))  # Flash red on hit
    else:
        player.image.fill((0, 180, 255))

    screen.fill((15, 15, 30))
    all_sprites.draw(screen)
    pygame.display.flip()
    clock.tick(60)

pygame.quit()

Output:

A window shows a blue player square (WASD to move) and three red enemy squares that bounce. The player turns red when any enemy overlaps it.

The power of pygame.sprite.Group is that calling all_sprites.update() calls update() on every sprite in the group, and all_sprites.draw(screen) blits every sprite's image at its rect position -- in one line. pygame.sprite.spritecollide(player, enemies, False) returns a list of enemies whose rectangles overlap the player; the third argument controls whether colliding sprites are removed from the group (True would kill them, False leaves them alive).

PyGame sprite collision detection
spritecollide() — because manually checking 40 enemy rects against the player was never going to scale.

Collision Detection and Game State

Collision detection is the core mechanic of amost every 2D game. PyGame provides several levels of collision precision. Rectangle collision (Rect.colliderect()) is the fastest and good enough for most games. Pixel-perfect collision (pygame.sprite.collide_mask()) is available for oddly-shaped sprites where rectangle overlap is too imprecise.

# collision_types.py
import pygame

pygame.init()

# --- Rect collision (fastest) ---
player_rect = pygame.Rect(100, 100, 50, 50)
bullet_rect  = pygame.Rect(120, 110, 10, 20)

if player_rect.colliderect(bullet_rect):
    print("Rect collision: hit!")

# --- Point-in-rect collision ---
mouse_pos = (125, 125)
if player_rect.collidepoint(mouse_pos):
    print("Point collision: mouse is over player!")

# --- Group collision with kill ---
# spritecollide(sprite, group, dokill)
# dokill=True removes the colliding sprites from the group automatical[K���\�Y�[�܈�[]�][��[�[ZY\ΈH�[][�[�[^H\�\X\�ۈ[\X�����\���[]
Y�[YK���]K���]JN��Y���[�]���[�JN���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J
�
JB��[��[XY�K��[

�MK�MK
JB��[���X�H�[��[XY�K��]ܙX�
�[�\�JJJB��Y�\]J�[�N���[���X��HOH
��[ݙH\�Y��[���X�����H���[���[

H��[[ݙH���H[ܛ�\��[�ٙ��ܙY[����\��\��]
Y�[YK���]K���]JN��Y���[�]���[�JN���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J

��
JB��[��[XY�K��[

�
JB��[���X�H�[��[XY�K��]ܙX�
�Y�JJJB���[]�HY�[YK���]K�ܛ�\
�[]
�

JB�\��]�HY�[YK���]K�ܛ�\
\��]
M�L
K\��]
�ML
JB������\��ۙH��[YHو��\�[ۜ˜�[]˝\]J
B�]�HY�[YK���]K�ܛ�\��YJ�[]�\��]��YK�YJB��]�H؝[]���]N��\��]���]\��]_B��[�
���[]�]]\��]Έ�[�]�_H�B����O���O������ۙϓ�]]����ۙϏ����O���O��X���\�[ێ�]B��[���\�[ێ�[�\�H\�ݙ\�^Y\�B��[]�]]\��]Έ����O���O����HY��\�[��H�]�Y[���O���]X��YO���O�[���O�ܛ�\��YO���O����O���]X��YO���O��X���ۙH��]HY�Z[��Hܛ�\�[H��O�ܛ�\��YO���O��X���]�\�H��]H[�ܛ�\HY�Z[��]�\�H��]H[�ܛ�\���]����O���[���O�\��[Y[�����O��YO���O��XZ�H��\�[ۜ�]]�X]X�[H\���H��H�[][�H\��]KHH�X[�ۙK[[�HYX�[�X��܈���[���[Y\ˏ�����YH�^X[�\��ܙH���[�\�[��^[���ܙO������]�\�H�[YH�YY�H��ܙH\�^K�Q�[YI���۝�\�[H]�[�H�[�\�[�H[��[Y�\�[H�۝܈H�Z[Z[��۝�H�\��X�K[��]]�\��X�H۝�H�ܙY[�Z�H[�H�\�[XY�N������O���O����ܙW�\�^K�B�[\ܝY�[YB��Y�[YK�[�]

B��ܙY[�HY�[YK�\�^K��]�[�J

�

JB�����HY�[YK�[YK�����
B���Y�[YK��۝��۝
�۝ٚ[K�^�JHKH\���ۙH�܈HY�][�Z[Z[��۝��۝�\��HHY�[YK��۝��۝
�ۙK
�
B��۝��X[HY�[YK��۝��۝
�ۙK̊B����ܙHH�]�\�Hœ�[��[��H�YB���[H�[��[�΂��܈]�[�[�Y�[YK�]�[���]

N��Y�]�[��\HOHY�[YK�URU���[��[��H�[�B�Y�]�[��\HOHY�[YK��VQ�ӈ[�]�[���^HOHY�[YK����P�N����ܙH
�HL��[][]H��ܚ[��H�[����ܙY[���[

LL�JJB����[�\�^�H�\��X�K[��]]��ܙ[�]\ˆ��ܙW��\��H�۝�\��K��[�\�����ܙN����ܙ_H��YK
�MK��
JB�]�\���\��H�۝��X[��[�\���]�\Έ�]�\�H��YK
���
JB���ܙY[���]
��ܙW��\��
��
JB��ܙY[���]
]�\���\��
�L
JB����[�\�Y��\���P�H���ܙH�[��[�H�۝��X[��[�\���\���P�H���ܙH��YK
LLL
JB�[�ܙX�H[���]ܙX�
�[�\�J̌�
JB��ܙY[���]
[�[�ܙX�
B��Y�[YK�\�^K��\

B����˝X��
�
B��Y�[YK�]Z]

B����O���O������ۙϓ�]]����ۙϏ����O���O�H�[����������ܙN��[��]�\Έȋ��\���P�H�[�ܙ[Y[�H��ܙH[��X[[YK����O���O������O��۝��[�\�^[�X[X\���܊O���O��]\���H�Y�[\�Q�[YH�\��X�K�H�X�ۙ\��[Y[�
��O��YO���O�H[�X�\�[�X[X\�[���܈�[��\�^�ۘ�H[�H]�HH�\��X�H[�H��][ۈ]�]��O��]
�\��
JJO���O�܋[ܙH�^X�K�]��O��\����]ܙX�
�[�\�J���JK����O�KHH��O��]ܙX����O�[\�]�[�H[��܈H�X�[��H�H[�Hو]��ܛ�\��Y�\�܈�[�\��X�\��\�X\�Y\�[�X[�X[Hٙ��][���H[�H^�Y�����KKHSPQ�W�P�R�T����\��HZ[�[��H�X[���ܙX��\�ۈH��X���[�[H]ۈ��H�ܛ��X�ݙH]��\[ێ���۝��[�\�
HKH[�\���ܙK\�^YY�[�\�Yۚ]K�Y��XX�K��KO����YH��X[[Y�KY^[\H���X[SY�H^[\N�ۙ�[�]ۏ������[YH��Z[��Y][��^XX�K�\�ۙ�[\[Y[�][ۈ\�[�\�L�[�\��\ܝ���^Y\��
���[�T��ӊK�X�����ܙK[�\�\�]�\�][���H�ݙ\�Y��[YH����]\���\�[ۋ[�^�[�\�[�ˏ�����O���O��ۙ˜B�[\ܝY�[YK�\‚�Y�[YK�[�]

B��H
���ܙY[�HY�[YK�\�^K��]�[�J
�
JB�Y�[YK�\�^K��]��\[ۊ�ۙ�KH]ۈY][ۈ�B�����HY�[YK�[YK�����
B��۝HY�[YK��۝��۝
�ۙK
̊B��X[HY�[YK��۝��۝
�ۙK͊B�����ܜ•�UHH
�MK�MK�MJB��P��H

B�ԐVHH

B�QS��H
�MK��
B���\��YJY�[YK���]K���]JN��Y���[�]���[�\��^K�ۗ��^JN���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J
ML
JB��[��[XY�K��[
�UJB��[���X�H�[��[XY�K��]ܙX�
�[�\�J���JB��[���YYH
���[��\��^K�[���ۗ��^HH\��^K�ۗ��^B��Y�\]J�[�N���^\�HY�[YK��^K��]��\��Y

B�Y��^\���[��\��^WN��[���X��HOH�[���YY�Y��^\���[���ۗ��^WN��[���X��H
�H�[���YY��[���X���[\�\
Y�[YK��X�
�
JB���\���[
Y�[YK���]K���]JN��Y���[�]���[�N���\\�
K���[�]��
B��[��[XY�HHY�[YK��\��X�J
M�M�JB�Y�[YK��]˘�\��J�[��[XY�KQS��

K
B��[��[XY�K��]���ܚ�^J�P��B��[���X�H�[��[XY�K��]ܙX�
�[�\�J�������JB��[����[���HH
K
��Y�\]J�[�Y\�N���[���X��
�H�[�����[���X��H
�H�[���B����[��Hٙ��؛��B�Y��[���X���H܈�[���X�����H�H���[���HH\�[���B����[��Hٙ�Y\ˆ�܈YH[�Y\΂�Y��[���X����Y\�X�
YK��X�
N���[���H\�[�����[���H[�
�[���
�K�
JH��YY\�Y�HXX�]��Y���ܙY
�[�N������]\���HY��Y�^Y\���ܙYLHY�Y�^Y\���ܙY�\��\�K�����Y��[���X���Y���]\��H�Y�Z\��YKH�Y���ܙ\ˆY��[���X��Y��Έ�]\��LH��Y�Z\��YKHY���ܙ\ˆ�]\����Y��\�]
�[�N���[���X���[�\�H
�������B��[����[���HH
K
��HHYJ�Y�[YK����Y�[YK����B��HYJ
��Y�[YK���TY�[YK����ӊB��[H�[

B�Y\�HY�[YK���]K�ܛ�\
K�B�[���]\�HY�[YK���]K�ܛ�\
K��[
B����ܙW�Y���ܙWܚY�H��S����ԑHH
B���[H�YN���܈]�[�[�Y�[YK�]�[���]

N��Y�]�[��\HOHY�[YK�URU��Y�[YK�]Z]

N��\˙^]

B��Y\˝\]J
B��[�\]JY\˜��]\�
JB���\�[H�[���ܙY

B�Y��\�[OHN����ܙWܚY�
�HN��[��\�]

B�[Y��\�[OHLN����ܙW�Y�
�HN��[��\�]

B����]ˆ�ܙY[���[
�P��B�Y�[YK��]˛[�J�ܙY[�ԐVK
����
K
����
K�H��[�\�[�B��[���]\˙�]��ܙY[�B�����ܙ\ˆ�ܙY[���]
�۝��[�\�����ܙW�Y�
K�YK�UJK
���H��
JB��ܙY[���]
�۝��[�\�����ܙWܚY�
K�YK�UJK
ʕ���H��
JB��Y���ܙW�Y��H�S����ԑH܈��ܙWܚY��H�S����ԑN���[��\�H�Y��Y���ܙW�Y��H�S����ԑH[�H��Y���\��H�X[��[�\�����[��\�H^Y\��[��H�\�����\�\����YKQS��B��ܙY[���]
\��\�˙�]ܙX�
�[�\�J��̋�̊JJB�Y�[YK�\�^K��\

B��Z][��H�YB��[H�Z][�΂��܈]�[�[�Y�[YK�]�[���]

N��Y�]�[��\HOHY�[YK�URU��Y�[YK�]Z]

N��\˙^]

B�Y�]�[��\HOHY�[YK��VQ�ӈ[�]�[���^HOHY�[YK��܎����ܙW�Y�H��ܙWܚY�H��[��\�]

B��Z][��H�[�B��Y�[YK�\�^K��\

B����˝X��
�
B����O���O������ۙϓ�]]����ۙϏ����O���O�[�
�ۙ��[����[�ˈY�^Y\�\�\�����Y�^Y\�\�\�T��Ӌ��\���
H�[���[�ˈ�\�����\�\�����O���O����\��ڙX�Y\���]\�]�\�H�ۘ�\���HH\�X�N�H�[YH���]�\�]�\�][����O�YO���O�[���O��[���O�\�H��\���O���]O���O��X��\��\��]Z\��ۈ��O�\]J
O���O���X��X�[��H��\�[ۈ[�\�H�[\YH��[��K[���O��۝��[�\�
O���O��Y\�H��ܙX��\��\��[�]�\�H��[YK��^[�]�Y��[�Y��X���]��O�Y�[YK�Z^\����[����O��\X�HH�[Y�X�[��\��]����]\��YY�XH��O�Y�[YK�[XY�K��Y

O���O�܈Y[�RHYH]�X���H�[	��H��][ۋ�����KKHSPQ�W�P�R�T��\��]H][��[��H���[��Y[���[Xܛ���H�[ۈ��\��]Y\�ۈ���Y\ˈ�\[ێ���[���H\�[����H[�\�H\�X��[��[�H�܈ۙ�[�ۙH[�K��KO����YH��\H����\]Y[�H\��Y]Y\�[ۜ�������YH��\KZ[��[�����H[��[Q�[YH[��X��Y�]�ܚ����ς���[���O�\[��[Y�[YO���O�[�[�\�\�Z[�[�Y�\�[��[[���\�Y�H]�ܚ���H�[��[����O�]ۈ[HY�[YK�^[\\˘[Y[�����O�KH\��[��H�Z[Z[��[\H�[YH]�\��]Q�[YK�Y�H�[����[��[�H�[YH�[��[�\�[��[][ۈ\��ܜ�X��ۈ��YH[�^�\�[\�[�HX^H�YY��O��Y�\[��[]ی�\�����O��\����]\ٞH��\[�[��Y\ˏ�����YH��\KY�ȏ��H�\�^H�[YH�[�]Y��\�[��YY�ۈY��\�[���\]\����ς���YY[��ۜ�\�[��H\[���[�[�H[ݙHؚ�X���HH�^Y�[X�\�و^[�\���[YK�]H���\�Y\��]�Y[�XX�[�\ˈH�^\�[K][YH[ݙ[Y[��][\H[�\��YY�H��O����O�
H[YH[��X�ۙ��[��HH\���[YK�]\��Y�H��O����˝X��
O���O�K�^[\N���O��[���X��
�H�[���YY
����O��\�H��O�H���˝X��
�
H�L����O��\�XZ�\�[ݙ[Y[���[YK\�]KZ[�\[�[�KHH�\���\]\�[�H���ۙH�[[ݙHH�[H�[YH\�[��H\��X�ۙ������YH��\KZ[XY�\ȏ����H\�H��[XY�\�[��XYو��ܙY�X�[��\���ς���\X�H��O��[��[XY�HHY�[YK��\��X�J
�
JO���O��]��O��[��[XY�HHY�[YK�[XY�K��Y
�^Y\���ȊK��۝�\��[J
O���O��H��O��۝�\��[J
O���O��[�۝�\��H[XY�H�H�ܙY[���^[�ܛX][��\�\��\��[��\�[��HKH�]�]]���[��\�[��H����\��X�ˈ��[H[XY�\��]��O�Y�[YK��[�ٛܛK���[J�\��
�]����]��
JO���O��Y�ܙH\��Yۚ[�����O��[��[XY�O���O���Y\[�\���]H[XY�\�[�HYX�]Y��O�\��]�����O���\��^�[�\��[YH�ܚ\������YH��\K\��[������H^H��[�Y��X����ς��Q�[YI����O�Y�[YK�Z^\����O�[�[H[�\�]Y[ˈ[�]X[^�H]�]��O�Y�[YK�Z^\��[�]

O���O�
[�XYH�[Y�H��O�Y�[YK�[�]

O���O�K�YH�U�܈����[H�]��O���[�HY�[YK�Z^\����[�
�]��]��O���O�[�^H]�]��O���[��^J
O���O���܈�X��ܛ�[�]\�X�\�H��O�Y�[YK�Z^\��]\�X˛�Y
�[YK���ȊO���O�[���O�Y�[YK�Z^\��]\�X˜^JLJO���O�
H��O�LO���O����[�Y�[�][JK�Q�[YH�\ܝ��U����[�T��Y����\��X��[Y[�Y�܈]\�X�\�]��X[\��]�]�Y[��H��H�[K������YH��\KY\��X�]H���[�H�\�H^HQ�[YH�[YH�][�H�����]�H]ۏ��ς��Y\�KH\�H��O�Z[��[\����O���[�H[�\��[YH[��H�[�[ۙH^X�]X�K��[���O�\[��[Z[��[\����O�[���O�Z[��[\�K[ۙY�[HK]�[���Y[�\���[YK�O���O��R[��[\��[�\�]ۋQ�[YK[�[\��]�[��H�[��H��O��^O���O�
�[����H܈�[�\�H
XX����[�^
H]�[���]�][�H[��[][ۋ�Y��O�KXYY]H�\��]��\��]ȏ���O��[��YH[�\�[XY�H[���[��[\ˈ\�H�\�[[���[�\�HۈH�X[�XX�[�H�Y�ܙH\��X�][�ˏ�����YH��ۘ�\�[ۈ���ۘ�\�[ۏ������Q�[YH�]�\�]ۈ]�[�\��H��\]H��[YH]�[�Y[����]]�[��ۈ]�\�HXZ�܈�\�][���\�[K�[�\�\�X�H�H�ݙ\�YH�[YH��]�]�\��X[][YHܘ\X��H��O���]O���O��\���܈�Z[[���]\�X�H�[YHؚ�X����\�[ۈ]X�[ۈ�]��O���Y\�X����O�[���O���]X��YO���O�^�[�\�[���܈��ܙ\�[�RK[�]][��]\�[�H�ܚ�[��ۙ��ۙK�H�[YH]\���KH��\]K�]�KH\H�]\�[�H�Z[H[�H^��H�[YH܈H�[]�ܛY\�������^[�Hۙ��ۙH�HY[����[�Y��X���]��O�Y�[YK�Z^\����O��\X�[���X�[��\��]����]\��XH��O�Y�[YK�[XY�K��Y

O���O�܈Y[��Y��X�[H��[[��][�ܙX\�\��[�YY\�H�[YH��ܙ\��\ˈ�[�[�H\�H�XYH�܈\��\��ڙX��^ܙH��O�Y�[YK���]K�^Y\�Y\]\����O��܈�[ܙ\�[����]\���O�Y�[YK��\��X�K��ܛ�

O���O��܈�ܛ�[���X��ܛ�[��[�H�[Y\�H]\���܈�ܛ�\��\�[�H�ܙY[�������Hٙ�X�X[��[Y[�][ۈ]H�Y�H�΋����˜Y�[YK�ܙ�����ȏ�Y�[YK�ܙ������O��ݙ\��]�\�H[�[H[�]Z[��܈��X�\�YX\��[��HH�Y�H�΋�ܙX[]ۋ���K�Y�[YKXK\�[Y\�ȏ��X[]ۈQ�[YH�[Y\��O��ݚY\�^�[[��\[Y[�\�H^\��\�\ˏ�����YH��[]YX\�X�\ȏ��[]Y\�X�\�����[��O�H�Y�H�΋��]ۚ�����ܘ[K���K�]ۋ]�XY[��]��[][\���\��[��]��X\�[��[�]�[�]�]\�KYXX�ȏ�]ۈ�XY[����][\���\��[����\�[��[Έ�[��\�HXX��O��O��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\]ۋ\[��Y�܋Z[XY�K\���\��[��ȏ����\�H]ۈ[���܈[XY�H���\��[���O��O��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\]ۋY]X�\��\�Y�܋X�X[�\�X��Kȏ����\�H]ۈ]X�\��\��܈�X[�\���O�O��O���[�����]���^V��]�����[[�V��]��ܛ��V��]����X�[ۗB�KKH���]�K�X�Z�\�KO�				
					
How To Use Python pre-commit for Automated Code Quality Hooks

How To Use Python pre-commit for Automated Code Quality Hooks

Intermediate

Code review comments about formatting, trailing whitespace, and import ordering are a waste of everyone’s time. If a linter can catch it automatically, a human reviewer shouldn’t have to. The problem is that getting developers to manually run Black, Ruff, and mypy before every commit requires discipline that erodes under deadline pressure — and one lapse is all it takes for formatting drift to creep into the codebase.

pre-commit solves this by turning your linters and formatters into Git hooks that run automatically when you type git commit. If any hook fails, the commit is blocked until the issue is fixed. Black even fixes formatting automatically and re-stages the corrected files. The whole setup is declared in a single .pre-commit-config.yaml file that every developer installs once with a single command. Install the tool with pip install pre-commit.

In this tutorial you’ll install pre-commit, configure hooks for Black, Ruff, mypy, and trailing whitespace checks, understand how hooks run and fail, add custom scripts as hooks, and set up the same checks in GitHub Actions so they run in CI. By the end your project will automatically enforce code quality standards on every commit without any manual effort.

Python pre-commit: Quick Example

Here is a minimal working setup. These are terminal and configuration commands, not Python scripts.

# Terminal

# Install pre-commit
pip install pre-commit

# In your project root, create .pre-commit-config.yaml
cat > .pre-commit-config.yaml << 'EOF'
repos:
  - repo: https://github.com/psf/black
    rev: 24.4.2
    hooks:
      - id: black

  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.4.4
    hooks:
      - id: ruff
        args: [--fix]
EOF

# Install the hooks into .git/hooks/
pre-commit install

# Now try committing a file -- the hooks run automatically
echo "x=1+2; print(x)" > test_code.py
git add test_code.py
git commit -m "test commit"

Output:

black....................................................................Failed
- hook id: black
- files were modified by this hook

reformatted test_code.py

ruff.....................................................................Passed

Black reformatted the file in place (x=1+2 becomes x = 1 + 2) and staged the change, but the commit was blocked. Run git add test_code.py again and re-commit — this time both hooks pass and the commit goes through. The pattern of “fail, fix, re-add, re-commit” takes getting used to, but it becomes muscle memory quickly.

What is pre-commit and How Does It Work?

pre-commit is a framework for managing Git hooks. A Git hook is a script that Git runs automatically at specific points — before a commit (pre-commit), before a push (pre-push), or after a merge (post-merge). The pre-commit framework makes it easy to install and configure hooks from a catalog of community-maintained hooks, without writing shell scripts manually.

Hook stageWhen it runsCommon use
pre-commitBefore creating a commitLinting, formatting, secret detection
pre-pushBefore pushing to remoteRunning tests, type checking
commit-msgAfter writing commit messageEnforcing commit message format
post-checkoutAfter git checkoutInstalling dependencies

Each hook runs in an isolated virtual environment that pre-commit creates and manages automatically. This means the hook’s tools (e.g., Black) are installed at a specific pinned version independent of your project’s virtual environment. Even if your project uses Black 23.x, the pre-commit hook uses whatever version you pinned in .pre-commit-config.yaml — no conflicts.

Tutorial image
pre-commit: the last line of defense before the pull request of shame.

A Full pre-commit Configuration

Here is a production-ready .pre-commit-config.yaml that covers formatting, linting, type checking, and common file issues. Customize by removing hooks you don’t need.

# .pre-commit-config.yaml
# Run 'pre-commit autoupdate' periodically to update pinned versions

default_language_version:
  python: python3.11

repos:
  # General file checks (no extra dependencies required)
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.6.0
    hooks:
      - id: trailing-whitespace      # remove trailing spaces
      - id: end-of-file-fixer        # ensure files end with a newline
      - id: check-yaml               # validate YAML syntax
      - id: check-toml               # validate TOML syntax
      - id: check-json               # validate JSON syntax
      - id: check-merge-conflict     # block accidental merge conflict markers
      - id: check-added-large-files  # prevent committing huge files
        args: ['--maxkb=500']
      - id: debug-statements         # catch leftover pdb/breakpoint() calls
      - id: no-commit-to-branch      # prevent direct commits to main/master
        args: ['--branch', 'main', '--branch', 'master']

  # Black -- opinionated code formatter
  - repo: https://github.com/psf/black
    rev: 24.4.2
    hooks:
      - id: black
        language_version: python3.11

  # Ruff -- fast linter (replaces flake8, isort, and more)
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.4.4
    hooks:
      - id: ruff
        args: [--fix, --exit-non-zero-on-fix]

  # mypy -- optional static type checker (remove if not using type hints)
  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.10.0
    hooks:
      - id: mypy
        additional_dependencies: [types-requests, types-PyYAML]
        args: [--ignore-missing-imports]

The rev field pins each hook to a specific tagged version. Running pre-commit autoupdate updates all these pins to the latest released versions — it’s good practice to run this every month or as part of your dependency update process. The no-commit-to-branch hook is particularly valuable on shared projects: it prevents developers from accidentally pushing directly to the main branch instead of creating a pull request.

Running Hooks Manually and Skipping Them

You don’t have to commit to run the hooks. Several commands let you run hooks on demand or against the entire codebase.

# Terminal -- running hooks manually

# Run all hooks on all staged files (same as a commit would)
pre-commit run

# Run all hooks on ALL files in the repo (not just staged)
pre-commit run --all-files

# Run a single specific hook
pre-commit run black --all-files
pre-commit run ruff --all-files

# Skip hooks entirely for one commit (use sparingly)
git commit -m "WIP: quick fix" --no-verify

# Update all hooks to latest versions
pre-commit autoupdate

# Clear cached hook environments (useful when a hook behaves strangely)
pre-commit clean

# Uninstall hooks (removes from .git/hooks/)
pre-commit uninstall

Output of pre-commit run –all-files (example):

Trim Trailing Whitespace.................................................Passed
Fix End of Files.........................................................Passed
Check Yaml...............................................................Passed
Check for added large files..............................................Passed
Debug Statements (Python)................................................Passed
black....................................................................Passed
ruff.....................................................................Failed
- hook id: ruff
- exit code: 1

app/models.py:12:1: F401 `os` imported but unused
Found 1 error.

The --no-verify flag completely bypasses all hooks for one commit. Use it only when you have a genuine reason to skip checks (e.g., committing auto-generated code that doesn’t meet style standards, or doing an emergency hotfix). Overusing --no-verify defeats the purpose of having hooks. A better approach for long-term exceptions is to add # noqa: F401 inline comments for specific Ruff suppressions, or configure exclusions in pyproject.toml.

Tutorial image
All hooks passed. The code review will be mercifully short.

Running pre-commit in GitHub Actions

pre-commit hooks run locally on each developer’s machine, but you also want them to run in CI so that PRs from contributors who didn’t install hooks are still checked. GitHub Actions makes this easy.

# .github/workflows/quality.yml

name: Code Quality

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main, develop]

jobs:
  pre-commit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      # Cache pre-commit environments to speed up runs
      - uses: actions/cache@v4
        with:
          path: ~/.cache/pre-commit
          key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }}
          restore-keys: pre-commit-

      - name: Install pre-commit
        run: pip install pre-commit

      - name: Run all hooks
        run: pre-commit run --all-files

The cache step stores the hook environments so they don’t need to be recreated on every CI run. The cache key is based on the hash of .pre-commit-config.yaml — when you change hook versions or add new hooks, the cache is automatically invalidated. A typical pre-commit CI run takes 30-60 seconds with a warm cache, versus 2-3 minutes cold on the first run.

Real-Life Example: Setting Up a New Python Project with Full Quality Gates

Tutorial image
Enforced at commit, enforced in CI, enforced in code review. Pick your battles.

Here is a complete bootstrap script that sets up a new Python project with pre-commit, Black, Ruff, and a matching pyproject.toml configuration that keeps all tools in sync.

# setup_quality_gates.py
# Run once to set up a new project: python setup_quality_gates.py

import subprocess
import os

def run(cmd):
    print(f"$ {cmd}")
    subprocess.run(cmd, shell=True, check=True)

# 1. Install tools
run("pip install pre-commit black ruff mypy")

# 2. Install hooks into .git
run("pre-commit install")
run("pre-commit install --hook-type commit-msg")  # optional: enforce commit format

# 3. Create pyproject.toml tool config (appends if file exists)
config = """
[tool.black]
line-length = 100
target-version = ["py311"]
include = '\\.pyi?$'
exclude = '''
/(
    \\.git
  | \\.venv
  | __pycache__
  | migrations
)/
'''

[tool.ruff]
line-length = 100
target-version = "py311"
select = [
    "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "I",   # isort
    "B",   # flake8-bugbear
    "UP",  # pyupgrade
]
ignore = ["E501"]  # line too long (handled by black)

[tool.ruff.per-file-ignores]
"tests/*" = ["S101"]  # allow assert in tests

[tool.mypy]
python_version = "3.11"
ignore_missing_imports = true
strict = false
"""

mode = "a" if os.path.exists("pyproject.toml") else "w"
with open("pyproject.toml", mode) as f:
    f.write(config)

# 4. Run hooks on all files to establish baseline
print("\\nRunning hooks on all files (initial baseline)...")
result = subprocess.run("pre-commit run --all-files", shell=True)
if result.returncode == 0:
    print("\\nAll hooks passed! Project is clean.")
else:
    print("\\nSome files were reformatted. Review changes with 'git diff', then commit.")

Output:

$ pip install pre-commit black ruff mypy
$ pre-commit install
pre-commit installed at .git/hooks/pre-commit
Running hooks on all files (initial baseline)...
Trim Trailing Whitespace.................................................Passed
black....................................................................Passed
ruff.....................................................................Passed
All hooks passed! Project is clean.

The script uses subprocess.run(..., check=True) for the setup commands so it fails immediately if any step errors — no silent partial setups. The pyproject.toml configuration aligns Black and Ruff to the same line length (100) so they don’t conflict: Black formats to 100 characters and Ruff’s E501 rule (which would also complain about line length) is disabled since Black handles it.

Frequently Asked Questions

My pre-commit hooks are slow — how do I speed them up?

The most common cause is mypy, which is inherently slow because it type-checks the entire project. Move mypy to a pre-push hook (add stages: [pre-push] under the mypy hook entry) so it only runs on push rather than every commit. For other hooks, run pre-commit run --all-files once to warm the cache — subsequent runs are much faster because hooks only process changed files. Also, replace flake8 + isort with Ruff (which handles both) for a significant speed improvement.

Do all team members need to run pre-commit install?

Yes, each developer needs to run pre-commit install in the cloned repo once. This installs the hooks into the local .git/hooks/ directory, which is not tracked by Git. To automate this, add a Makefile target or a setup.sh script that runs pre-commit install, and document it in your README. Alternatively, use the pre-commit GitHub Action to catch issues in CI from developers who skipped local installation.

Can I exclude specific files or lines from a hook?

Yes. To exclude entire files or directories, add an exclude pattern to the hook config in .pre-commit-config.yaml: exclude: ^(migrations/|docs/). To suppress a specific Ruff or flake8 rule on one line, add a trailing comment: import os # noqa: F401. For Black, you can mark a region to skip with # fmt: off / # fmt: on comments — useful for hand-aligned data structures where Black’s formatting destroys readability.

Can I write my own custom hook?

Yes. Add a repo: local entry in your config and define the hook directly. For example, a custom hook that runs your test suite on push:

repos:
  - repo: local
    hooks:
      - id: run-tests
        name: Run pytest
        entry: pytest tests/ -x -q
        language: system
        pass_filenames: false
        stages: [pre-push]

The language: system setting means the hook uses the Python interpreter from your active virtual environment rather than an isolated pre-commit environment.

Conclusion

pre-commit turns code quality enforcement from a manual discipline into an automatic process. You configured hooks for Black (formatting), Ruff (linting and import sorting), mypy (type checking), and common file hygiene checks; ran them manually with pre-commit run --all-files; handled the fail-fix-re-add-recommit workflow; and wired the same checks into GitHub Actions for CI coverage. The complete setup described here takes about 10 minutes to install on a new project and saves hours of formatting-related code review comments over the life of the project.

The next step is exploring the full catalog of community hooks at pre-commit.com/hooks.html, which includes hooks for detecting secrets accidentally committed to Git, validating Dockerfile syntax, enforcing commit message conventions, and many more. Combine pre-commit with a branch protection rule requiring CI to pass before merging and you’ve built a complete, automated quality gate.

Full documentation is at pre-commit.com.

How To Use Python Plotly for Interactive Data Visualizations

How To Use Python Plotly for Interactive Data Visualizations

Beginner

Static charts made with Matplotlib are fine for reports and papers, but when you’re presenting data to a stakeholder or building a data product, you want something the viewer can interact with: hover over a data point to see exact values, zoom into a region, click to filter by category, or toggle series on and off. That interactivity makes data comprehensible in ways that a frozen PNG never can.

Plotly is the leading Python library for interactive data visualizations. It renders charts directly in Jupyter notebooks and web browsers, produces publication-quality output, and supports over 40 chart types — from basic bar and line charts to heatmaps, 3D scatter plots, geographic maps, and animated timelines. Plotly has two APIs: plotly.express for quick one-line charts and plotly.graph_objects for fine-grained customization. Install it with pip install plotly.

In this tutorial you’ll create line charts, bar charts, scatter plots, and heatmaps using Plotly Express, learn how to customize layouts and colors, combine multiple traces into a single figure, export charts to HTML and PNG, and build a multi-panel dashboard. By the end you’ll be producing interactive visualizations in a fraction of the time static charts take.

Python Plotly: Quick Example

Here is a complete interactive line chart in four lines of code, using a dataset built directly into Plotly.

# quick_plotly.py
import plotly.express as px

# Load a built-in dataset (no external file needed)
df = px.data.gapminder().query("country == 'Australia'")

fig = px.line(df, x="year", y="lifeExp", title="Life Expectancy in Australia Over Time",
              labels={"lifeExp": "Life Expectancy (years)", "year": "Year"})
fig.show()  # Opens in your browser -- fully interactive (hover, zoom, pan)

# Save as standalone HTML file
fig.write_html("life_expectancy.html")
print("Chart saved as life_expectancy.html")

Output:

Chart saved as life_expectancy.html
(Interactive chart opens in browser at http://localhost:XXXXX)

px.data.gapminder() returns a pandas DataFrame with country-level statistics including GDP per capita, population, and life expectancy from 1952 to 2007. The fig.show() call opens the chart in your default browser. Every Plotly chart is interactive by default: hover over any point to see its exact values, click and drag to zoom, double-click to reset the view. The write_html() output is a self-contained HTML file you can share with anyone — no Python installation needed to view it.

Plotly Express vs Graph Objects

Plotly has two layers of API. Plotly Express (plotly.express or px) is a high-level wrapper that generates common chart types from a DataFrame in one function call. Graph Objects (plotly.graph_objects or go) is the lower-level API that gives you complete control over every element of the chart.

FeaturePlotly Express (px)Graph Objects (go)
Code length1-3 lines for most charts10-30+ lines for same chart
Data inputpandas DataFrame (column names)Raw arrays and dicts
CustomizationLimited to function parametersFull control over every trace
Multi-trace chartsVia color= and facet_ paramsManual add_trace() calls
Best forExploratory analysis, quick chartsProduction dashboards, edge cases

The best approach is to start with Plotly Express and drop down to Graph Objects only when you need something Express cannot do. You can also mix them: create a chart with Express and then call fig.update_layout() or fig.update_traces() to make fine-grained adjustments.

Tutorial image
fig.show(): static PNGs were a cry for help.

Common Chart Types with Plotly Express

Most data stories fit into a handful of chart types. Here is a working example for each of the most common ones, all using built-in Plotly datasets so you can run them without any external data files.

# common_charts.py
import plotly.express as px

# --- Bar Chart ---
df_tips = px.data.tips()
fig_bar = px.bar(df_tips, x="day", y="total_bill", color="sex",
                 barmode="group",
                 title="Restaurant Bills by Day and Gender",
                 labels={"total_bill": "Total Bill ($)", "day": "Day of Week"})
fig_bar.write_html("bar_chart.html")

# --- Scatter Plot ---
df_gap = px.data.gapminder().query("year == 2007")
fig_scatter = px.scatter(df_gap, x="gdpPercap", y="lifeExp",
                         size="pop", color="continent",
                         hover_name="country",
                         log_x=True,  # log scale on x axis
                         title="GDP vs Life Expectancy (2007)",
                         labels={"gdpPercap": "GDP per Capita (log scale)",
                                 "lifeExp": "Life Expectancy"})
fig_scatter.write_html("scatter_chart.html")

# --- Histogram ---
fig_hist = px.histogram(df_tips, x="total_bill", nbins=20,
                        color="smoker",
                        marginal="rug",  # adds a rug plot above histogram
                        title="Distribution of Restaurant Bills")
fig_hist.write_html("histogram.html")

# --- Box Plot ---
fig_box = px.box(df_tips, x="day", y="tip", color="sex",
                 title="Tip Distribution by Day and Gender",
                 points="all")  # show all data points as dots
fig_box.write_html("boxplot.html")

print("All charts saved as HTML files.")

Output:

All charts saved as HTML files.

The size="pop" argument in the scatter plot maps bubble size to the pop column — Plotly scales the circles automatically. hover_name="country" makes the country name bold in the hover tooltip. log_x=True applies a logarithmic scale to the x-axis, which is standard practice when GDP data spans several orders of magnitude. The marginal="rug" parameter on the histogram adds a one-dimensional scatter plot along the axis, showing individual data points in addition to the distribution shape.

Customizing Layouts and Styles

Plotly Express creates charts with sensible defaults, but you’ll almost always want to adjust titles, colors, fonts, axis labels, and layout for presentation. The update_layout() and update_traces() methods let you modify any aspect of the chart after creation.

# customization.py
import plotly.express as px
import plotly.graph_objects as go

df = px.data.gapminder().query("continent == 'Europe' and year == 2007")
fig = px.bar(df, x="country", y="gdpPercap",
             color="gdpPercap",
             color_continuous_scale="Viridis",
             title="GDP per Capita -- European Countries 2007")

# Customize the layout
fig.update_layout(
    title_font_size=20,
    title_font_family="Arial",
    xaxis_tickangle=-45,       # rotate x labels for readability
    xaxis_title="Country",
    yaxis_title="GDP per Capita (USD)",
    plot_bgcolor="white",
    paper_bgcolor="white",
    font=dict(family="Arial", size=12, color="#333333"),
    coloraxis_showscale=False,  # hide the color legend
    margin=dict(t=80, b=100, l=60, r=20),
    height=500
)

# Add a horizontal reference line at the European average
avg_gdp = df["gdpPercap"].mean()
fig.add_hline(y=avg_gdp, line_dash="dash", line_color="red",
              annotation_text=f"EU Avg: ${avg_gdp:,.0f}",
              annotation_position="top right")

fig.write_html("europe_gdp_styled.html")

# Export as a static PNG (requires kaleido: pip install kaleido)
try:
    fig.write_image("europe_gdp.png", width=1200, height=500, scale=2)
    print("PNG saved (requires kaleido package)")
except Exception:
    print("Install kaleido for PNG export: pip install kaleido")

Output:

PNG saved (requires kaleido package)

The add_hline() call adds an annotation line across the full width of the chart at a specific y-value. Similar methods include add_vline(), add_hrect() (horizontal band), and add_vrect() (vertical band). These are perfect for showing thresholds, targets, or date ranges. Static PNG export requires the kaleido package (pip install kaleido) in addition to Plotly — Plotly itself only handles the interactive HTML output natively.

Tutorial image
make_subplots(rows=2, cols=2): four charts for the price of one fig.show().

Multi-Trace and Subplot Charts

Many real dashboards combine multiple related charts in a single figure. Plotly’s make_subplots() function creates a grid of sub-axes that you can populate independently, while all sharing the same interactive controls.

# subplots.py
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

df = px.data.gapminder()
countries = ["United States", "China", "India", "Germany"]
colors = {"United States": "#1f77b4", "China": "#ff7f0e",
          "India": "#2ca02c", "Germany": "#d62728"}

fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=("Life Expectancy Over Time", "GDP Per Capita Over Time"),
    shared_xaxes=True  # zoom on one chart affects the other
)

for country in countries:
    country_df = df[df["country"] == country]
    color = colors[country]

    # Left chart: life expectancy
    fig.add_trace(
        go.Scatter(x=country_df["year"], y=country_df["lifeExp"],
                   mode="lines+markers", name=country,
                   line=dict(color=color, width=2),
                   legendgroup=country),  # group legend entries
        row=1, col=1
    )

    # Right chart: GDP per capita
    fig.add_trace(
        go.Scatter(x=country_df["year"], y=country_df["gdpPercap"],
                   mode="lines+markers", name=country,
                   line=dict(color=color, width=2),
                   legendgroup=country,
                   showlegend=False),  # avoid duplicate legend entries
        row=1, col=2
    )

fig.update_layout(
    title_text="Global Development Trends: Selected Countries",
    height=450,
    hovermode="x unified"  # show all traces at same x position on hover
)

fig.update_yaxes(title_text="Life Expectancy (years)", row=1, col=1)
fig.update_yaxes(title_text="GDP per Capita (USD)", row=1, col=2)

fig.write_html("development_dashboard.html")
print("Dashboard saved.")

Output:

Dashboard saved.

The legendgroup=country parameter links the two traces for each country so that clicking a country name in the legend shows or hides both its life expectancy and GDP lines simultaneously. hovermode="x unified" is a particularly useful layout setting: it shows all traces’ values for the same x-coordinate in a single tooltip, making it easy to compare values at the same year across countries.

Real-Life Example: Sales Performance Dashboard

Tutorial image
fig.write_html(): the only report your manager will actually interact with.

Here is a self-contained sales dashboard that generates realistic sample data and builds a multi-panel figure with KPI annotations, a trend line, and a product breakdown.

# sales_dashboard.py
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import random
from datetime import date, timedelta

# Generate realistic sample sales data
random.seed(42)
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
          "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
revenue = [random.randint(80000, 200000) for _ in months]
units = [random.randint(400, 900) for _ in months]
products = {"Widget A": 35, "Widget B": 25, "Widget C": 20, "Other": 20}

fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=("Monthly Revenue", "Units Sold",
                    "Revenue Trend (with 3-month avg)", "Product Mix"),
    specs=[[{"type": "bar"}, {"type": "scatter"}],
           [{"type": "scatter"}, {"type": "pie"}]]
)

# Top-left: Monthly revenue bar chart
fig.add_trace(
    go.Bar(x=months, y=revenue, name="Revenue",
           marker_color=["#2ecc71" if r >= 130000 else "#e74c3c" for r in revenue]),
    row=1, col=1
)

# Top-right: Units sold as scatter with lines
fig.add_trace(
    go.Scatter(x=months, y=units, mode="lines+markers", name="Units",
               line=dict(color="#3498db", width=2), marker=dict(size=8)),
    row=1, col=2
)

# Bottom-left: Revenue with 3-month moving average
moving_avg = [sum(revenue[max(0,i-2):i+1]) / len(revenue[max(0,i-2):i+1]) for i in range(len(revenue))]
fig.add_trace(go.Bar(x=months, y=revenue, name="Monthly Revenue",
                     marker_color="#bdc3c7", showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=months, y=moving_avg, mode="lines", name="3-Mo Avg",
                         line=dict(color="#e67e22", width=3, dash="dash")), row=2, col=1)

# Bottom-right: Product mix pie chart
fig.add_trace(
    go.Pie(labels=list(products.keys()), values=list(products.values()),
           name="Products", hole=0.4),
    row=2, col=2
)

total_rev = sum(revenue)
fig.update_layout(
    title_text=f"2026 Sales Dashboard -- Total Revenue: ${total_rev:,}",
    height=700,
    showlegend=True,
    plot_bgcolor="white"
)

fig.write_html("sales_dashboard.html")
print(f"Dashboard saved. Total revenue: ${total_rev:,}")

Output:

Dashboard saved. Total revenue: $1,567,000

The specs parameter in make_subplots() defines the chart type for each cell — this is required when mixing different trace types like bars, scatter plots, and pie charts in the same figure. The color list comprehension ["#2ecc71" if r >= 130000 else "#e74c3c" for r in revenue] colors revenue bars green if they exceed a threshold and red otherwise — a simple but effective way to highlight performance against a target. Extend this dashboard by wiring it up to a live database query and serving it with Flask or Dash for a real-time monitoring page.

Frequently Asked Questions

How do I display Plotly charts in Jupyter Notebook?

Plotly integrates with Jupyter automatically — fig.show() renders the interactive chart inline in the notebook cell. For JupyterLab, you may need to run pip install "jupyterlab>=3" ipywidgets and enable the Plotly extension. In VS Code’s Jupyter extension, charts render inline without any extra setup. If charts appear blank, try fig.show(renderer="iframe") as a fallback.

Does Plotly work with pandas DataFrames?

Yes, Plotly Express is designed around pandas DataFrames. You pass column names as strings to px.line(df, x="date_col", y="value_col") and Plotly handles the rest. You can also use Plotly’s pandas backend: import plotly.io as pio; pio.templates.default = "plotly_white" and then df.plot(backend="plotly") to use Plotly syntax on any pandas DataFrame plot call.

What is Plotly Dash and how is it different?

Dash is a full web application framework built on top of Plotly. While Plotly creates static interactive charts saved as HTML files, Dash lets you build live dashboards with Python callbacks: dropdowns, sliders, and buttons that trigger Python functions to update charts in real time. Dash runs as a web server. Install it with pip install dash. If you need a shareable, embeddable chart, use Plotly directly; if you need a live app with user controls, use Dash.

How do I export Plotly charts as PNG or PDF?

Static export requires the kaleido package: pip install kaleido. Then use fig.write_image("chart.png"), fig.write_image("chart.pdf"), or fig.write_image("chart.svg"). Use the scale parameter for higher resolution: fig.write_image("chart.png", scale=3) produces a 3x resolution image suitable for print. For batch export of many charts, reuse the same kaleido process by calling multiple write_image() calls on the same Python process.

How do I apply a consistent theme to all my charts?

Plotly includes several built-in themes: "plotly" (default), "plotly_white", "plotly_dark", "ggplot2", "seaborn", "simple_white", and "none". Set a global default with import plotly.io as pio; pio.templates.default = "plotly_white" and all subsequent charts will use that theme. You can also create custom templates by extending existing ones with pio.templates["my_theme"] = go.layout.Template(...).

Conclusion

Plotly makes interactive data visualization accessible in Python with minimal code. You learned to create line charts, bar charts, scatter plots, histograms, and box plots with Plotly Express, customize layouts with update_layout(), combine multiple traces with make_subplots(), and build a complete sales performance dashboard with moving averages and color-coded KPIs. The write_html() output is a shareable, self-contained file that any stakeholder can open in a browser without installing Python.

The natural next step is Plotly Dash for building live dashboards with interactive controls, or integrating Plotly charts into a Flask or FastAPI application as JSON responses rendered by the frontend. Both approaches build directly on the Plotly knowledge you’ve developed here.

Complete documentation and an extensive example gallery are available at plotly.com/python.

How To Use Alembic for Database Migrations in Python

How To Use Alembic for Database Migrations in Python

Intermediate

Adding a column to a live database sounds simple until you realize you need to do it on a production server with real user data, make the change repeatable across three developer laptops and a staging environment, and be able to roll it back if something goes wrong. Running raw ALTER TABLE statements by hand works once. It does not work as a team workflow across environments, and it leaves no record of what changed, when, or why.

Alembic is the database migration tool built specifically for SQLAlchemy. It tracks your schema version in a dedicated table, generates migration scripts automatically by comparing your models to the live database, and lets you apply (upgrade) or reverse (downgrade) changes one step at a time. Alembic is the standard migration tool for SQLAlchemy-based projects and is used in production by frameworks like FastAPI with SQLModel, and Flask with Flask-SQLAlchemy. Installing it is a single command: pip install alembic sqlalchemy.

In this tutorial you’ll set up Alembic in a real project, create and run migrations, handle common schema changes (add column, rename, add index), write manual migrations for data transformations, and use the downgrade mechanism safely. By the end you’ll have a versioned, team-friendly database workflow.

Alembic Migrations: Quick Example

Here is the fastest path from a blank project to a running migration. These are terminal commands, not Python scripts.

# Terminal commands

# Install dependencies
pip install alembic sqlalchemy

# Initialize Alembic in your project
alembic init alembic

# This creates:
#   alembic/           -- migration scripts directory
#   alembic/env.py     -- Alembic configuration (edit this to point at your DB)
#   alembic/versions/  -- migration script files go here
#   alembic.ini        -- Alembic settings file

# After configuring alembic.ini and env.py (see next section)...

# Generate a migration from model changes
alembic revision --autogenerate -m "add users table"

# Apply the migration
alembic upgrade head

# Check current version
alembic current

# Roll back one step
alembic downgrade -1

Output of alembic upgrade head:

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> a3f92b1c4d8e, add users table

Every migration is identified by a unique revision ID (the a3f92b1c4d8e part). Alembic records the current revision in a special table (alembic_version) in your database, so it always knows which migrations have run and which haven’t. The hash shown in upgrade head is the latest migration script’s revision ID.

What is Alembic and How Does It Work?

Alembic works by maintaining a chain of migration scripts. Each script has an upgrade() function that applies a change (add a column, create a table) and a downgrade() function that reverses it. Scripts are linked in a revision chain, and Alembic walks that chain to get from wherever your database currently is to wherever you want it to be.

ConceptWhat it means
RevisionA unique ID identifying one migration script
HeadThe latest revision (most up-to-date)
BaseThe initial state (no migrations applied)
alembic_versionTable Alembic creates in your DB to track current revision
upgradeApply one or more migrations forward
downgradeReverse one or more migrations backward
autogenerateAuto-detect schema changes by comparing models to DB

The --autogenerate flag is Alembic’s most powerful feature: it imports your SQLAlchemy models, inspects the live database, and generates the migration script automatically. However, autogenerate cannot detect everything — it misses data changes, stored procedures, and some constraint types — so always review generated scripts before running them.

Tutorial image
ALTER TABLE in production without a migration file. Enjoy your mystery schema.

Setting Up Alembic in a Project

Alembic needs to know two things: the database URL to connect to, and where to find your SQLAlchemy models. Both are configured in alembic.ini and alembic/env.py.

# models.py -- your SQLAlchemy models
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import DeclarativeBase
from datetime import datetime

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String(255), nullable=False, unique=True)
    username = Column(String(100), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True)
    title = Column(String(500), nullable=False)
    body = Column(String, nullable=False)
    author_id = Column(Integer, nullable=False)
# alembic/env.py -- edit the target_metadata line and database URL

import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Import your models Base so autogenerate can see the schema
from models import Base

config = context.config
fileConfig(config.config_file_name)

# This is the key line -- point autogenerate at your models
target_metadata = Base.metadata

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()

run_migrations_online()
# alembic.ini -- set your database URL
# Find the sqlalchemy.url line and update it:
# sqlalchemy.url = sqlite:///./myapp.db
# For PostgreSQL: postgresql://user:pass@localhost/mydb
# For MySQL:      mysql+pymysql://user:pass@localhost/mydb

The critical step is setting target_metadata = Base.metadata in env.py. This is what enables autogenerate to compare your Python models against the live database schema. If you skip this, autogenerate produces empty migration scripts. Make sure models.py is importable from env.py — add your project root to sys.path as shown above if needed.

Creating and Running Migrations

Once configured, the typical migration workflow is three commands: generate, review, apply.

# Terminal -- migration workflow

# Step 1: Generate a migration from current model state
alembic revision --autogenerate -m "create users and posts tables"
# Output: Generating /project/alembic/versions/a1b2c3d4e5f6_create_users_and_posts_tables.py

# Step 2: Review the generated file (always do this before applying)
cat alembic/versions/a1b2c3d4e5f6_create_users_and_posts_tables.py

Here is what a typical auto-generated migration script looks like:

# alembic/versions/a1b2c3d4e5f6_create_users_and_posts_tables.py
"""create users and posts tables

Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2026-05-16 08:30:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'a1b2c3d4e5f6'
down_revision = None   # None means this is the first migration
branch_labels = None
depends_on = None

def upgrade() -> None:
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('email', sa.String(length=255), nullable=False),
        sa.Column('username', sa.String(length=100), nullable=False),
        sa.Column('created_at', sa.DateTime(), nullable=True),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email'),
    )
    op.create_table(
        'posts',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('title', sa.String(length=500), nullable=False),
        sa.Column('body', sa.String(), nullable=False),
        sa.Column('author_id', sa.Integer(), nullable=False),
        sa.PrimaryKeyConstraint('id'),
    )

def downgrade() -> None:
    op.drop_table('posts')
    op.drop_table('users')
# Step 3: Apply the migration
alembic upgrade head

# Verify current state
alembic current
# INFO  [alembic.runtime.migration] Running upgrade  -> a1b2c3d4e5f6 (head)
# a1b2c3d4e5f6 (head)

# Show full history
alembic history --verbose

The down_revision = None indicates this is the first migration in the chain. Subsequent migrations will reference this revision ID in their own down_revision, forming the chain. Never edit a migration script after it has been applied to any shared environment — create a new migration instead.

Tutorial image
alembic history: the git log for your schema.

Common Schema Change Operations

Most real-world migrations involve adding columns, creating indexes, or renaming things. Here are migration scripts for the most common operations, showing both the upgrade() and downgrade() halves.

# Manual migration script: add columns and index
# alembic/versions/b2c3d4e5f6a7_add_user_bio_and_index.py

"""add bio to users and index on email

Revision ID: b2c3d4e5f6a7
Revises: a1b2c3d4e5f6
Create Date: 2026-05-16 09:00:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'b2c3d4e5f6a7'
down_revision = 'a1b2c3d4e5f6'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add a nullable column with a server-side default
    op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))

    # Add a column with NOT NULL -- requires a server_default for existing rows
    op.add_column('users', sa.Column('is_active', sa.Boolean(),
                                     nullable=False, server_default=sa.true()))

    # Create an index for faster email lookups
    op.create_index('ix_users_email', 'users', ['email'], unique=False)

    # Add a foreign key column to posts
    op.add_column('posts', sa.Column('published_at', sa.DateTime(), nullable=True))
    op.create_index('ix_posts_author_id', 'posts', ['author_id'])

def downgrade() -> None:
    op.drop_index('ix_posts_author_id', table_name='posts')
    op.drop_column('posts', 'published_at')
    op.drop_index('ix_users_email', table_name='users')
    op.drop_column('users', 'is_active')
    op.drop_column('users', 'bio')

The server_default=sa.true() is critical when adding a NOT NULL column to a table that already has rows. Without a default, the database will reject the ALTER because existing rows would have a NULL value in a NOT NULL column. Use server_default for the database-level default and default for the Python-level default (used by SQLAlchemy when inserting new rows). After adding the column with a server default, you can remove the default in a follow-up migration if desired.

Real-Life Example: Adding a Subscription System to an Existing App

Tutorial image
upgrade head in production. Then take a calm sip of coffee.

Here is a realistic scenario: adding a subscription tier system to an existing user table, including a data migration that sets a default tier for all existing users.

# alembic/versions/c3d4e5f6a7b8_add_subscription_tiers.py
"""add subscription tiers table and user tier column

Revision ID: c3d4e5f6a7b8
Revises: b2c3d4e5f6a7
Create Date: 2026-05-16 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
from sqlalchemy import String, Integer

revision = 'c3d4e5f6a7b8'
down_revision = 'b2c3d4e5f6a7'

def upgrade() -> None:
    # 1. Create the subscription_tiers lookup table
    op.create_table(
        'subscription_tiers',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('name', sa.String(50), nullable=False, unique=True),
        sa.Column('monthly_price_cents', sa.Integer(), nullable=False),
    )

    # 2. Seed the lookup data (data migration inside schema migration)
    tiers_table = table('subscription_tiers',
                        column('id', Integer),
                        column('name', String),
                        column('monthly_price_cents', Integer))
    op.bulk_insert(tiers_table, [
        {'id': 1, 'name': 'free', 'monthly_price_cents': 0},
        {'id': 2, 'name': 'pro', 'monthly_price_cents': 999},
        {'id': 3, 'name': 'enterprise', 'monthly_price_cents': 4999},
    ])

    # 3. Add tier_id to users with a default of 1 (free)
    op.add_column('users',
        sa.Column('tier_id', sa.Integer(),
                  sa.ForeignKey('subscription_tiers.id'),
                  nullable=False,
                  server_default='1'))

    # 4. Create an index for tier-based queries
    op.create_index('ix_users_tier_id', 'users', ['tier_id'])

def downgrade() -> None:
    op.drop_index('ix_users_tier_id', table_name='users')
    op.drop_column('users', 'tier_id')
    op.drop_table('subscription_tiers')
# Apply all pending migrations
alembic upgrade head

# Verify
alembic current
# c3d4e5f6a7b8 (head)

The op.bulk_insert() call seeds the lookup data as part of the migration, so the data migration runs atomically with the schema migration. The table() and column() helpers from sqlalchemy.sql let you reference tables without importing your model classes — this is important because model classes can change after the migration is written, but the migration must always produce the same result when re-run. Never import model classes directly inside migration scripts.

Frequently Asked Questions

What does alembic autogenerate miss?

Autogenerate detects table creation, table removal, column additions, column removals, column type changes, and unique constraint changes on most databases. It does NOT detect: changes to stored procedures, changes to data (you need data migrations for those), changes to indexes on some databases, changes to schema comments, or changes made outside of SQLAlchemy (e.g., manual ALTER TABLE commands run directly). Always review generated migration scripts before applying them.

Is it safe to downgrade in production?

Downgrading can be risky if the migration involved adding columns with data, dropping tables, or making irreversible data changes. Always test your downgrade function on a copy of production data before relying on it as a real rollback strategy. For critical migrations, write the downgrade function even if you never plan to use it — it forces you to think through the reversal and can save you in an emergency. For truly irreversible changes (e.g., dropping a table), add a comment and raise NotImplementedError in the downgrade function to make the intentionality explicit.

What does ‘multiple heads’ mean?

If two developers each create a migration from the same base revision, Alembic has two “heads” — two migration chains that diverged at the same point. You need to merge them with alembic merge heads, which creates a new merge revision that depends on both heads. This is analogous to a merge commit in Git. The merge revision has an empty upgrade/downgrade and simply links the chains back together.

How do I run migrations with zero downtime?

Zero-downtime migrations require careful ordering. The safe pattern is: first add the new column as nullable (no application change needed), then deploy the application code that writes to the new column, then backfill existing rows, then add the NOT NULL constraint. Never add a NOT NULL column without a server default in a single migration on a large production table — it locks the table during the ALTER on most databases. Tools like pg_repack for PostgreSQL can help with large table changes.

How do I use different database URLs for dev and production?

Set the database URL from an environment variable in env.py instead of hardcoding it in alembic.ini. Add this to env.py: config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"]). Then in CI/CD and production, set DATABASE_URL as a secret environment variable. Never put production credentials in alembic.ini — that file is typically committed to version control.

Conclusion

Alembic gives you a versioned, reproducible way to evolve your database schema over time. You learned to set up Alembic with alembic init, configure env.py to point at your SQLAlchemy models, generate migrations with alembic revision --autogenerate, apply them with alembic upgrade head, and roll back with alembic downgrade -1. The real-life example showed how to combine schema changes with seed data in a single atomic migration.

The most important habit to build is always reviewing autogenerated scripts before applying them and always writing correct downgrade functions. With those two practices in place, Alembic becomes the safety net that lets you evolve your database with confidence rather than dread.

The official Alembic documentation with a tutorial, cookbook, and API reference is at alembic.sqlalchemy.org.

How To Use Python poetry for Dependency and Project Management

How To Use Python poetry for Dependency and Project Management

Beginner

If you’ve ever cloned a Python project and spent an hour figuring out which packages to install, which versions are compatible, and why things work on your machine but not your colleague’s — you understand exactly why dependency management matters. The traditional combo of pip plus a hand-maintained requirements.txt works for simple scripts, but it breaks down quickly on real projects with dozens of dependencies and multiple contributors.

Poetry solves this by giving you a single tool that handles virtual environment creation, dependency resolution, version pinning, and package publishing — all driven by a single pyproject.toml file. Poetry is not the only tool in this space (pip-tools, pipenv, and uv all solve overlapping problems), but it has become one of the most widely adopted thanks to its clean CLI, deterministic lockfile, and first-class support for building and publishing packages to PyPI.

In this tutorial you’ll install Poetry, create a new project from scratch, add and manage dependencies, understand the lockfile, work with environments, and package your project for distribution. By the end you’ll have a complete workflow that replaces scattered requirements files with a reproducible, shareable project setup.

Python Poetry: Quick Example

Here is a minimal workflow — create a project, add a dependency, and run a script — to show how Poetry feels in practice.

# In your terminal (not Python) -- these are shell commands

# Install Poetry (run once)
curl -sSL https://install.python-poetry.org | python3 -

# Create a new project
poetry new myproject
cd myproject

# Add a dependency
poetry add requests

# Run a Python script inside Poetry's managed environment
poetry run python -c "import requests; print(requests.get('https://httpbin.org/get').status_code)"

Output:

200

The poetry new command creates a project directory with a pyproject.toml, a README.md, and a package skeleton. poetry add requests installs requests into a dedicated virtual environment, records it in pyproject.toml, and pins the exact version in poetry.lock. poetry run executes any command inside that managed environment without needing to manually activate it.

What is Poetry and Why Use It?

Poetry is a Python packaging and dependency management tool that uses pyproject.toml as its single source of truth. It replaces several tools that traditionally had to be combined: pip for installation, venv for environments, and setuptools for packaging. Poetry handles all three, plus it generates a lockfile that pins every transitive dependency to an exact version.

TaskTraditional approachPoetry approach
Install packagespip install packagepoetry add package
Create virtual envpython -m venv .venvAutomatic on first add/install
Pin versionspip freeze > requirements.txtpoetry.lock (automatic)
Dev vs prod depsrequirements-dev.txt separatelypoetry add –group dev package
Build + publishsetuptools + twine (separate)poetry build && poetry publish
Reproduce envpip install -r requirements.txtpoetry install

The lockfile is Poetry’s killer feature. When you run poetry add, Poetry solves the entire dependency graph — including every indirect dependency — and records the exact version of every package in poetry.lock. Anyone who clones your repo and runs poetry install gets the exact same environment, down to patch versions. This makes “it works on my machine” problems much rarer.

Tutorial image
poetry.lock: because ‘it worked yesterday’ is not a deployment strategy.

Installing Poetry and Setting Up a Project

The recommended way to install Poetry is the official installer, which places it in its own isolated environment separate from any project. This prevents Poetry itself from interfering with your project’s dependencies.

# install_poetry.sh -- run in terminal

# Install Poetry (Linux/macOS/WSL)
curl -sSL https://install.python-poetry.org | python3 -

# On Windows (PowerShell)
# (Invoke-WebRequest -Uri https://install.python-poetry.org -UseBasicParsing).Content | python -

# Verify installation
poetry --version
# Poetry (version 1.8.x)

# Configure Poetry to create virtual envs inside the project folder
# This makes it easier to see and delete the env
poetry config virtualenvs.in-project true

Output:

Poetry (version 1.8.3)

After installation, you may need to add Poetry’s bin directory to your PATH. The installer prints the exact path — typically ~/.local/bin on Linux/macOS. The virtualenvs.in-project true setting is optional but recommended: it creates the virtual environment in a .venv/ folder inside your project directory, which makes it visible in your IDE and easy to delete when switching Python versions.

Creating a New Project vs Initializing an Existing One

Poetry has two modes for starting: poetry new creates a fresh project directory from a template, and poetry init adds Poetry to an existing directory interactively.

# Terminal commands

# Start a brand new project
poetry new mywebapp
# Creates:
#   mywebapp/
#   mywebapp/pyproject.toml
#   mywebapp/README.md
#   mywebapp/mywebapp/__init__.py
#   mywebapp/tests/__init__.py

# OR add Poetry to an existing project
cd existing_project
poetry init
# Interactive prompts ask for project name, version, description,
# Python version constraint, and initial dependencies

# After either approach, install dependencies (creates the venv if needed)
poetry install

Output after poetry install:

Creating virtualenv mywebapp-2T5b5b6K-py3.11 in /home/user/.cache/pypoetry/virtualenvs
Installing dependencies from lock file

Package operations: 0 installs, 0 updates, 0 removals

The generated pyproject.toml contains everything Poetry needs to know about your project: name, version, description, Python version requirements, and dependencies. Commit this file and poetry.lock to version control — both are essential for reproducibility.

Adding, Updating, and Removing Dependencies

All dependency management flows through the poetry add, poetry update, and poetry remove commands. Poetry resolves the full dependency graph on every operation and updates the lockfile accordingly.

# Terminal commands for dependency management

# Add a runtime dependency
poetry add fastapi

# Add with version constraint
poetry add "sqlalchemy>=2.0,<3.0"

# Add a development-only dependency (not installed in production)
poetry add --group dev pytest pytest-cov black ruff

# Add an optional dependency (user must opt in)
poetry add --optional pandas

# See what's installed
poetry show

# See dependency tree (what depends on what)
poetry show --tree

# Update a specific package to the latest allowed version
poetry update requests

# Update ALL packages
poetry update

# Remove a package
poetry remove httpx

Output of poetry show --tree (excerpt):

fastapi 0.111.0 FastAPI framework
|-- anyio >=3.7.1,<5
|   |-- idna >=2.8
|   `-- sniffio >=1.1
|-- pydantic >=1.7.4,!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,!=2.1.0,<3.0.0
|   |-- annotated-types >=0.4.0
|   `-- pydantic-core 2.18.2
`-- starlette >=0.37.2,<0.38.0

The --group dev flag separates development tools from production dependencies. When you deploy your application and run poetry install --without dev, dev dependencies are skipped, keeping your production image lean. The dependency tree view is invaluable for understanding why a particular package version is installed -- if a constraint is preventing an upgrade, the tree shows you which package is imposing it.

Tutorial image
poetry show --tree: finally understanding why numpy is v1.24 and not v2.

Working with Environments

Poetry automatically creates and manages a virtual environment for each project. You do not need to run python -m venv or source .venv/bin/activate manually. Instead, prefix commands with poetry run or drop into the environment with poetry shell.

# Terminal environment commands

# Run a single command inside Poetry's environment
poetry run python script.py
poetry run pytest
poetry run python -m flask run

# Activate the environment in your current shell session
poetry shell
# Now python, pip, and any installed scripts are from Poetry's env
# Type 'exit' or Ctrl-D to leave the shell

# See where the virtual environment lives
poetry env info
# Output:
# Virtualenv
# Python:         3.11.9
# Implementation: CPython
# Path:           /home/user/mywebapp/.venv
# Executable:     /home/user/mywebapp/.venv/bin/python

# Use a specific Python version (must be installed on your system)
poetry env use python3.12

# List all environments Poetry knows about for this project
poetry env list

# Delete the current environment (useful when switching Python versions)
poetry env remove python3.11

Output of poetry env info (excerpt):

Virtualenv
Python:         3.11.9
Implementation: CPython
Path:           /home/user/mywebapp/.venv
Executable:     /home/user/mywebapp/.venv/bin/python
Valid:           True

Most IDEs (VS Code, PyCharm) can detect Poetry environments automatically when virtualenvs.in-project is set to true -- just point the IDE at .venv/bin/python or select it from the interpreter picker. For CI/CD pipelines, use poetry run pytest rather than activating the shell, since shell activation can behave differently across CI environments.

Real-Life Example: Setting Up a FastAPI Project with Poetry

Tutorial image
One toml to rule them all, one lockfile to find them.

Here is a complete example of bootstrapping a FastAPI project with Poetry, including runtime dependencies, dev tooling, and a script entrypoint. This is the setup pattern used in real production Python services.

# Terminal commands to bootstrap a FastAPI project

poetry new fastapi-demo
cd fastapi-demo

# Add runtime dependencies
poetry add fastapi "uvicorn[standard]" pydantic-settings

# Add dev dependencies
poetry add --group dev pytest httpx pytest-asyncio ruff black

# Check the generated pyproject.toml
cat pyproject.toml

pyproject.toml output:

[tool.poetry]
name = "fastapi-demo"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.111.0"
uvicorn = {extras = ["standard"], version = "^0.29.0"}
pydantic-settings = "^2.2.1"

[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
httpx = "^0.27.0"
pytest-asyncio = "^0.23.0"
ruff = "^0.4.0"
black = "^24.4.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# fastapi_demo/main.py
from fastapi import FastAPI

app = FastAPI(title="Demo API")

@app.get("/health")
async def health():
    return {"status": "ok"}
# Run the app
poetry run uvicorn fastapi_demo.main:app --reload

# Run tests
poetry run pytest -v

# Lint and format
poetry run ruff check .
poetry run black .

This pattern keeps everything reproducible. A new developer clones the repo, runs poetry install, and gets the exact same environment with all runtime and dev dependencies. The lockfile (poetry.lock) guarantees no version surprises, and the pyproject.toml serves as the single source of truth for project metadata, dependencies, and tool configuration.

Frequently Asked Questions

Should I commit poetry.lock to version control?

Yes, always commit poetry.lock for applications. It ensures every developer and every CI run uses identical package versions. For libraries that other projects depend on, the recommendation is more nuanced -- many library maintainers still commit the lockfile to pin CI test dependencies, but the lockfile is not used when someone installs your library as a dependency. The pyproject.toml version constraints are what matter for library consumers.

Can I export a requirements.txt from Poetry?

Yes: run poetry export -f requirements.txt --output requirements.txt --without-hashes. This is useful when deploying to environments that don't have Poetry installed (e.g., a Docker container that uses pip). The --without-hashes flag makes the file compatible with plain pip. For dev requirements, add --with dev to the export command.

How do I change the Python version for a Poetry project?

First install the target Python version on your system (via pyenv, asdf, or your OS package manager). Then run poetry env use python3.12 to point Poetry at the new interpreter. Poetry will create a fresh virtual environment for that version. Update the python = "^3.12" constraint in pyproject.toml if needed, then run poetry install to reinstall all dependencies.

How do I use Poetry in GitHub Actions?

Install Poetry as a step before your Python setup, then use poetry install to install dependencies. Cache the virtual environment using the lockfile hash as the cache key: hashFiles('poetry.lock'). The official Poetry documentation provides ready-to-use GitHub Actions workflow examples. Alternatively, use the popular snok/install-poetry action which handles installation and PATH setup automatically.

Poetry vs pip-tools vs uv -- which should I use?

All three solve dependency locking but have different strengths. Poetry is the most full-featured: it handles environments, dependencies, and building/publishing in one tool, making it great for both applications and libraries. pip-tools is minimal and stays close to pip -- good if you want explicit control and a lighter tool. uv is extremely fast (Rust-based) and is gaining rapid adoption for its speed; it now has a compatibility mode for pyproject.toml projects. For new projects in 2026, Poetry or uv are both solid choices.

Conclusion

Poetry simplifies Python project management by replacing the fragmented combination of pip, venv, and requirements files with a single unified tool driven by pyproject.toml. You learned to create projects with poetry new, add dependencies with poetry add, separate dev and production requirements with dependency groups, inspect the dependency tree, manage virtual environments, and set up a complete FastAPI project using the Poetry workflow.

The reproducibility that comes from committing poetry.lock to version control is the biggest practical benefit -- it eliminates an entire category of "works on my machine" bugs and makes onboarding new team members faster. Try converting one of your existing projects to Poetry by running poetry init in the project root and migrating your existing requirements files.

Find the full Poetry documentation and a migration guide from pip at python-poetry.org/docs.

How To Use Python Pillow for Image Processing

How To Use Python Pillow for Image Processing

Beginner

Whether you need to resize product images before uploading them to a web store, watermark a batch of photos, convert a folder of PNGs to JPEGs, or draw bounding boxes around objects in a computer vision pipeline — image processing comes up constantly in Python projects. Doing this manually in a GUI photo editor is fine for one or two images, but when you have hundreds or thousands to process, you need code.

Pillow is Python’s go-to library for image processing. It’s the actively maintained fork of the original PIL (Python Imaging Library) and supports over 30 image formats including JPEG, PNG, GIF, BMP, TIFF, and WebP. Pillow lets you open, manipulate, and save images with a clean, intuitive API that works on Windows, Mac, and Linux. Installing it takes one command: pip install Pillow.

In this tutorial you’ll learn how to open and inspect images, resize and crop them, apply filters and adjustments, convert between formats, draw text and shapes, and run a practical batch processing script. By the end you’ll be able to automate image workflows that would otherwise take hours of manual effort.

Python Pillow: Quick Example

Here is a minimal working example that opens an image, resizes it, applies a sharpening filter, and saves the result. You can run this against any JPEG or PNG on your computer — just swap the filename.

# quick_pillow.py
from PIL import Image, ImageFilter

# Open the image
img = Image.open("photo.jpg")
print(f"Original size: {img.size}")   # (width, height) in pixels
print(f"Format: {img.format}")
print(f"Mode: {img.mode}")

# Resize to 800px wide, preserving aspect ratio
base_width = 800
ratio = base_width / img.width
new_height = int(img.height * ratio)
resized = img.resize((base_width, new_height), Image.LANCZOS)

# Apply a sharpening filter
sharpened = resized.filter(ImageFilter.SHARPEN)

# Save the result
sharpened.save("photo_resized.jpg", quality=90)
print(f"Saved as photo_resized.jpg ({sharpened.size[0]}x{sharpened.size[1]})")

Output:

Original size: (3024, 4032)
Format: JPEG
Mode: RGB
Saved as photo_resized.jpg (800x1067)

The key import is from PIL import Image — note that even though the package is called Pillow, the import name is still PIL for backward compatibility. The Image.open() call does not load the full pixel data into memory immediately; it’s lazy-loaded when you first access pixel data or call a transform. The Image.LANCZOS resampling filter produces high-quality results when downscaling.

The sections below dig into resizing, cropping, color adjustments, drawing, format conversion, and a real batch processing project.

What is Pillow and Why Use It?

Pillow is a Python imaging library that gives you a consistent interface for reading and writing image files, and for performing common image operations. Think of it as the Swiss Army knife of image manipulation — it does not replace specialized tools like OpenCV for real-time computer vision, but it handles the vast majority of day-to-day image tasks with minimal code.

The two most important concepts in Pillow are the Image object and image modes. Every image you open or create is represented as an Image object. The mode describes how pixel data is stored:

ModeDescriptionChannels
RGBStandard color (red, green, blue)3
RGBAColor + transparency (alpha channel)4
LGrayscale (luminance)1
PPalette-mapped (indexed colors)1 (with palette)
CMYKPrint color (cyan, magenta, yellow, key/black)4
11-bit black and white1

Understanding modes matters when converting between formats — for example, JPEG does not support transparency, so saving an RGBA image as JPEG requires converting to RGB first. Pillow will raise an error if you skip this step, which is a common beginner mistake.

Tutorial image
img.open() — because your 4032px camera photo shouldn’t stay 4032px.

Opening and Inspecting Images

Before manipulating an image, it helps to inspect its properties. Pillow exposes size, mode, format, and EXIF metadata as attributes on the Image object.

# inspect_image.py
from PIL import Image, ExifTags

img = Image.open("photo.jpg")

print(f"Size (W x H): {img.size}")
print(f"Width: {img.width}px, Height: {img.height}px")
print(f"Format: {img.format}")
print(f"Mode: {img.mode}")
print(f"Info keys: {list(img.info.keys())}")

# Read EXIF data if present
exif_data = img._getexif()
if exif_data:
    for tag_id, value in exif_data.items():
        tag_name = ExifTags.TAGS.get(tag_id, tag_id)
        if tag_name in ("Make", "Model", "DateTime", "GPSInfo"):
            print(f"  EXIF {tag_name}: {value}")

Output:

Size (W x H): (3024, 4032)
Width: 3024px, Height: 4032px
Format: JPEG
Mode: RGB
Info keys: ['jfif', 'jfif_version', 'jfif_unit', 'jfif_density', 'dpi', 'exif']
  EXIF Make: Apple
  EXIF Model: iPhone 14 Pro
  EXIF DateTime: 2026:01:15 14:22:08

The img._getexif() method returns a dictionary of raw EXIF tag IDs mapped to values. Wrapping with ExifTags.TAGS.get() converts numeric tag IDs to human-readable names. Not all images have EXIF data — PNG files typically do not — so always check for None before iterating.

Resizing and Cropping Images

Resizing and cropping are the most common image operations. Pillow provides resize() for changing dimensions and crop() for extracting a rectangular region. The thumbnail() method is a convenient shortcut that resizes an image in place while preserving aspect ratio.

# resize_crop.py
from PIL import Image

img = Image.open("photo.jpg")

# --- Resize to exact dimensions ---
exact = img.resize((640, 480), Image.LANCZOS)
exact.save("exact_640x480.jpg")

# --- Resize preserving aspect ratio using thumbnail() ---
thumb = img.copy()
thumb.thumbnail((400, 400), Image.LANCZOS)  # fits within 400x400 box
thumb.save("thumbnail_400.jpg")
print(f"Thumbnail size: {thumb.size}")  # e.g., (300, 400) -- longest side = 400

# --- Crop a specific region (left, upper, right, lower) ---
# Coordinates are in pixels from the top-left corner
box = (100, 200, 900, 800)   # crop a 800x600 region
cropped = img.crop(box)
cropped.save("cropped_region.jpg")
print(f"Cropped size: {cropped.size}")  # (800, 600)

# --- Center crop to a square ---
width, height = img.size
short_side = min(width, height)
left = (width - short_side) // 2
top = (height - short_side) // 2
square_box = (left, top, left + short_side, top + short_side)
square = img.crop(square_box)
square.save("square_crop.jpg")
print(f"Square size: {square.size}")

Output:

Thumbnail size: (300, 400)
Cropped size: (800, 600)
Square size: (3024, 3024)

The thumbnail() method modifies the image in place and never enlarges it — if the image is already smaller than the specified box, nothing happens. For resizing up (enlarging), use resize() with Image.BICUBIC or Image.LANCZOS. LANCZOS produces the sharpest results when downscaling; BICUBIC is slightly faster and works well for upscaling.

Tutorial image
thumbnail() never upsizes. resize() never apologizes.

Color Adjustments and Filters

Pillow provides two modules for adjusting image appearance: ImageFilter for convolution-based filters (blur, sharpen, edge detection) and ImageEnhance for adjusting brightness, contrast, color, and sharpness numerically. Both modules are easy to compose — you can chain multiple enhancements one after another.

# color_filters.py
from PIL import Image, ImageFilter, ImageEnhance

img = Image.open("photo.jpg")

# --- Built-in convolution filters ---
blurred = img.filter(ImageFilter.GaussianBlur(radius=3))
blurred.save("blurred.jpg")

sharpened = img.filter(ImageFilter.SHARPEN)
sharpened.save("sharpened.jpg")

edges = img.filter(ImageFilter.FIND_EDGES)
edges.save("edges.jpg")

# --- ImageEnhance adjustments ---
# Each enhance() call takes a factor: 1.0 = original, 0.0 = minimum, 2.0 = double

# Boost contrast by 50%
contrast_img = ImageEnhance.Contrast(img).enhance(1.5)
contrast_img.save("high_contrast.jpg")

# Reduce brightness (simulate underexposure)
dim_img = ImageEnhance.Brightness(img).enhance(0.6)
dim_img.save("dim.jpg")

# Boost color saturation for vivid colors
vivid = ImageEnhance.Color(img).enhance(1.8)
vivid.save("vivid_colors.jpg")

# Convert to grayscale using mode conversion
grayscale = img.convert("L")
grayscale.save("grayscale.jpg")
print("All filter variations saved.")

Output:

All filter variations saved.

The ImageFilter.GaussianBlur(radius=3) call applies a Gaussian blur with a radius of 3 pixels — larger values produce a stronger blur. FIND_EDGES is a Laplacian edge detection filter that highlights boundaries between light and dark areas. The ImageEnhance factor of 1.0 always returns the original unchanged; values below 1.0 reduce the effect and values above 1.0 amplify it. Chaining is straightforward: apply one enhancer, pass the result to the next.

Converting Image Formats

Pillow handles format conversion transparently — you simply open one format and save as another. The main gotcha is the RGBA-to-JPEG conversion: JPEG does not support an alpha (transparency) channel, so you must convert RGBA to RGB first or Pillow will raise a OSError: cannot write mode RGBA as JPEG.

# format_conversion.py
from PIL import Image
import os

def convert_png_to_jpeg(png_path, output_path, quality=85):
    """Convert a PNG (possibly with transparency) to JPEG."""
    img = Image.open(png_path)

    # Handle transparency by compositing onto a white background
    if img.mode in ("RGBA", "P"):
        background = Image.new("RGB", img.size, (255, 255, 255))
        if img.mode == "P":
            img = img.convert("RGBA")
        background.paste(img, mask=img.split()[3])  # use alpha channel as mask
        img = background
    elif img.mode != "RGB":
        img = img.convert("RGB")

    img.save(output_path, "JPEG", quality=quality, optimize=True)

    original_size = os.path.getsize(png_path) / 1024
    converted_size = os.path.getsize(output_path) / 1024
    print(f"Converted: {png_path} -> {output_path}")
    print(f"  Size: {original_size:.1f}KB -> {converted_size:.1f}KB")

# Convert a single file
convert_png_to_jpeg("logo.png", "logo.jpg", quality=90)

# Convert all PNGs in a folder
input_folder = "images_png"
output_folder = "images_jpg"
os.makedirs(output_folder, exist_ok=True)

for filename in os.listdir(input_folder):
    if filename.lower().endswith(".png"):
        src = os.path.join(input_folder, filename)
        dst = os.path.join(output_folder, filename.replace(".png", ".jpg"))
        convert_png_to_jpeg(src, dst)

Output:

Converted: logo.png -> logo.jpg
  Size: 142.3KB -> 28.7KB
Converted: images_png/banner.png -> images_jpg/banner.jpg
  ...

The img.split()[3] extracts the alpha channel from an RGBA image for use as a paste mask. This composite approach preserves the visual appearance of semi-transparent pixels by blending them against a white background, which is the standard approach for web output. The optimize=True flag tells the JPEG encoder to run an extra optimization pass to reduce file size at the same quality level.

Tutorial image
RGBA to JPEG. You’ve been warned. Composite it or crash.

Drawing Text and Shapes on Images

The ImageDraw module lets you draw directly onto an image — rectangles, circles, lines, polygons, and text. This is useful for adding watermarks, annotating images, drawing bounding boxes around detected objects, or generating thumbnails with overlaid metadata.

# drawing.py
from PIL import Image, ImageDraw, ImageFont

# Create a blank 800x400 canvas with a dark background
canvas = Image.new("RGB", (800, 400), color=(30, 30, 40))
draw = ImageDraw.Draw(canvas)

# Draw a rectangle (border box)
draw.rectangle([(50, 50), (350, 200)], outline=(0, 200, 255), width=3)

# Draw a filled circle (ellipse with equal width/height bounding box)
draw.ellipse([(420, 50), (620, 250)], fill=(255, 100, 0), outline=(255, 255, 255), width=2)

# Draw a line
draw.line([(50, 300), (750, 300)], fill=(100, 255, 100), width=2)

# Draw a polygon (triangle)
draw.polygon([(650, 50), (750, 200), (550, 200)], fill=(180, 0, 255), outline=(255, 255, 255))

# Add text (using default font -- no system font required)
try:
    font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 32)
except OSError:
    font = ImageFont.load_default()

draw.text((50, 330), "Python Pillow Drawing Demo", fill=(255, 255, 255), font=font)

canvas.save("drawing_demo.png")
print("Drawing saved as drawing_demo.png")

# --- Add a watermark to an existing photo ---
photo = Image.open("photo.jpg").convert("RGBA")
overlay = Image.new("RGBA", photo.size, (0, 0, 0, 0))
draw_overlay = ImageDraw.Draw(overlay)

# Semi-transparent watermark text
draw_overlay.text((20, photo.height - 60), "c 2026 MyBlog", fill=(255, 255, 255, 128), font=font)

watermarked = Image.alpha_composite(photo, overlay).convert("RGB")
watermarked.save("watermarked_photo.jpg", quality=92)
print("Watermarked photo saved.")

Output:

Drawing saved as drawing_demo.png
Watermarked photo saved.

The ImageDraw.Draw() call returns a drawing context tied to the image. All drawing operations modify the underlying image in place. For watermarks, the recommended pattern is to create a transparent RGBA overlay, draw on it, then use Image.alpha_composite() to merge it with the original photo. This avoids permanently altering the original image in memory until you explicitly save. The alpha value of 128 in the text fill means 50% transparent.

Real-Life Example: Batch Image Resizer for a Web Store

Tutorial image
1200 product photos. One for loop. Zero manual resizing.

E-commerce platforms usually require product images at specific dimensions (e.g., 800×800 pixels) with a white background. The script below takes a folder of raw product photos in various sizes and formats, processes them into web-ready square images, and saves a report of what was converted.

# batch_resize.py
import os
from PIL import Image, ImageOps

def process_product_image(src_path, dst_path, size=(800, 800), bg_color=(255, 255, 255)):
    """
    Resize + pad a product image to a square canvas with white background.
    Handles JPEG, PNG, and WEBP input.
    Returns a dict with info about the conversion.
    """
    result = {"src": src_path, "success": False, "error": None, "original_size": None, "output_size": None}

    try:
        img = Image.open(src_path)
        result["original_size"] = img.size

        # Convert to RGBA to handle transparency
        if img.mode != "RGBA":
            img = img.convert("RGBA")

        # Use ImageOps.pad() to fit image into target box, adding background
        padded = ImageOps.pad(img, size, color=bg_color + (255,), method=Image.LANCZOS)

        # Composite onto white background (handle any remaining transparency)
        background = Image.new("RGB", size, bg_color)
        background.paste(padded, mask=padded.split()[3])

        background.save(dst_path, "JPEG", quality=88, optimize=True)
        result["success"] = True
        result["output_size"] = background.size
    except Exception as e:
        result["error"] = str(e)

    return result

def batch_process(input_dir, output_dir, size=(800, 800)):
    os.makedirs(output_dir, exist_ok=True)
    valid_extensions = {".jpg", ".jpeg", ".png", ".webp", ".bmp"}
    files = [f for f in os.listdir(input_dir) if os.path.splitext(f)[1].lower() in valid_extensions]

    print(f"Processing {len(files)} images...")
    results = []

    for filename in files:
        src = os.path.join(input_dir, filename)
        base = os.path.splitext(filename)[0]
        dst = os.path.join(output_dir, f"{base}.jpg")
        r = process_product_image(src, dst, size=size)
        results.append(r)
        status = "OK" if r["success"] else f"FAIL: {r['error']}"
        print(f"  [{status}] {filename} {r.get('original_size','')} -> {r.get('output_size','')}")

    success_count = sum(1 for r in results if r["success"])
    fail_count = len(results) - success_count
    print(f"\nDone: {success_count} converted, {fail_count} failed.")

# Run the batch processor
batch_process("raw_products", "web_ready", size=(800, 800))

Output:

Processing 6 images...
  [OK] shirt_front.jpg (1200, 1800) -> (800, 800)
  [OK] mug_photo.png (2048, 2048) -> (800, 800)
  [OK] banner.webp (1920, 600) -> (800, 800)
  [FAIL: cannot identify image file] corrupt_file.jpg None -> None
  [OK] hat_side.jpg (3024, 4032) -> (800, 800)
  [OK] tshirt_back.png (800, 1200) -> (800, 800)

Done: 5 converted, 1 failed.

ImageOps.pad() is a convenience method that resizes the image to fit inside the target box while preserving aspect ratio, then pads the empty space with the background color. This avoids the stretching you’d get from a plain resize() call on non-square inputs. The error handling means one corrupt file does not crash the whole batch — it gets logged and processing continues. Extend this script by adding a CSV report writer or sending a Slack alert when failures exceed a threshold.

Frequently Asked Questions

How do I install Pillow?

Run pip install Pillow in your terminal. Note that the package name is Pillow (capital P), not PIL. The import in your code is still from PIL import Image because Pillow is a drop-in replacement for the original PIL and keeps its import namespace. If you’re using a virtual environment (recommended), activate it first before installing.

What image formats does Pillow support?

Pillow supports over 30 formats out of the box, including JPEG, PNG, GIF, BMP, TIFF, WebP, ICO, and PSD. For some formats like TIFF with certain compressions or raw camera formats (CR2, NEF), you may need additional system libraries. Check the full list with from PIL import features; features.pilinfo() to see what’s available on your installation.

My resized images are rotated incorrectly — how do I fix it?

Camera phones store rotation information in EXIF data rather than actually rotating the pixel data. When Pillow opens the file, it uses the raw pixel orientation and ignores EXIF rotation. Use from PIL import ImageOps; img = ImageOps.exif_transpose(img) right after opening the image to automatically apply the EXIF rotation. This should be a standard step in any pipeline that processes photos from mobile devices.

Why does processing many large images crash with MemoryError?

A 12-megapixel JPEG may only be 3MB on disk but expands to 36MB+ in memory as an uncompressed RGB array. Processing 100 such images simultaneously would require 3.6GB of RAM. The solution is to process images one at a time in a loop and let Python garbage-collect each image after saving. Explicitly closing images with img.close() or using a with Image.open(path) as img: context manager helps free memory immediately.

When should I use Pillow vs OpenCV?

Use Pillow for general-purpose image manipulation — resizing, cropping, format conversion, watermarking, and batch processing. Use OpenCV when you need real-time video processing, feature detection, object tracking, or computer vision algorithms. OpenCV uses NumPy arrays as its image representation, which makes it faster for numerical operations but less convenient for simple file-oriented tasks. Many projects use both: Pillow for loading and saving, OpenCV or NumPy for the heavy computation.

Conclusion

Pillow gives you everything you need to handle images in Python without reaching for a GUI tool. In this tutorial you used Image.open() to inspect image metadata, resize() and thumbnail() to change dimensions, crop() to extract regions, ImageFilter and ImageEnhance to apply visual effects, ImageDraw to overlay text and shapes, and a complete batch processing pipeline to convert and pad product images to a uniform size.

A natural next step is combining Pillow with os.walk() to process entire directory trees recursively, or integrating it into a Flask or FastAPI endpoint that accepts image uploads and returns processed results. For advanced processing needs — object detection, face recognition, image segmentation — pair Pillow with OpenCV or PyTorch’s torchvision library.

The official Pillow documentation at pillow.readthedocs.io is comprehensive and includes format-specific notes that are essential when targeting specific output formats or platforms.

How To Use Python kafka-python for Event Streaming

How To Use Python kafka-python for Event Streaming

Intermediate

Your microservices need to talk to each other, but direct HTTP calls create tight coupling, fragile retry logic, and cascading failures. Apache Kafka solves this with a durable, high-throughput message log that decouples producers from consumers — publish an event once, any number of services can consume it independently, at their own pace, with replay if something goes wrong. Understanding Kafka is increasingly a must-have skill for Python backend engineers.

The kafka-python library is the most widely used Python client for Apache Kafka. Install it with pip install kafka-python. You will also need a running Kafka broker — the examples below show how to run one locally with Docker in a single command.

In this article we will cover producing messages to Kafka topics, consuming messages with consumer groups, serializing and deserializing JSON payloads, handling consumer offsets and at-least-once delivery, and building a complete event-driven pipeline. By the end you will be able to wire Python services together with Kafka.

kafka-python: Quick Example

Start Kafka locally with Docker (one-time setup):

# Run Kafka locally with KRaft mode (no ZooKeeper needed)
# docker run -d --name kafka -p 9092:9092 \
#   -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
#   -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
#   -e KAFKA_CFG_NODE_ID=1 \
#   -e KAFKA_CFG_PROCESS_ROLES=broker,controller \
#   -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
#   -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
#   bitnami/kafka:latest

Then produce and consume your first message:

# quick_kafka.py
from kafka import KafkaProducer, KafkaConsumer

BOOTSTRAP = "localhost:9092"
TOPIC = "quickstart"

# --- Producer ---
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP)
producer.send(TOPIC, b"Hello from Python!")
producer.flush()
print("Message sent.")
producer.close()

# --- Consumer ---
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=BOOTSTRAP,
    auto_offset_reset="earliest",   # Read from the beginning of the topic
    consumer_timeout_ms=3000,       # Stop after 3 seconds of no messages
)
for message in consumer:
    print(f"Received: {message.value.decode()} (offset={message.offset})")
consumer.close()

Output:

Message sent.
Received: Hello from Python! (offset=0)

The producer sends bytes to the topic and flushes to ensure delivery before the script exits. The consumer reads from the beginning (auto_offset_reset="earliest") and stops after 3 seconds of silence (consumer_timeout_ms=3000), making it convenient for scripts that don’t want to run forever.

What Is Kafka and When Should You Use It?

Apache Kafka is a distributed event streaming platform — a durable, ordered, append-only log that retains messages for a configurable period (days to forever). Producers write events to topics; consumers read from them independently, maintaining their own position (offset) in the log.

FeatureKafkaRedis Pub/SubRabbitMQ
Message durabilityDurable (disk-based)In-memory onlyDurable (optional)
Replay old messagesYes (seek to offset)NoNo (once consumed)
Multiple consumersYes (consumer groups)Yes (fan-out)Yes (competing/fan-out)
ThroughputVery high (millions/sec)HighMedium
ComplexityHigh (broker setup)LowMedium

Use Kafka when you need durable, replayable event streams across multiple services. Use Redis Pub/Sub for simple real-time fan-out where loss is acceptable. Use RabbitMQ for traditional work queues with routing logic.

Producing JSON Messages

Real-world applications rarely send raw bytes. You want to serialize Python dictionaries to JSON so consumers can decode them easily. Configure the KafkaProducer with a value_serializer that handles the encoding automatically.

# json_producer.py
import json
from kafka import KafkaProducer
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8") if k else None,
    acks="all",                 # Wait for all replicas to confirm
    retries=3,                  # Retry on transient errors
    compression_type="gzip",    # Compress messages in transit
)

TOPIC = "user-events"

def publish_event(event_type: str, user_id: int, payload: dict) -> None:
    event = {
        "event_type": event_type,
        "user_id": user_id,
        "timestamp": datetime.utcnow().isoformat(),
        "payload": payload,
    }
    # Use user_id as key for partition affinity (same user -> same partition)
    future = producer.send(TOPIC, value=event, key=str(user_id))
    record_metadata = future.get(timeout=10)  # Block until confirmed
    print(f"Sent {event_type} for user {user_id} -> "
          f"partition={record_metadata.partition} offset={record_metadata.offset}")

publish_event("user_signup", 1001, {"email": "alice@example.com", "plan": "pro"})
publish_event("user_login",  1001, {"ip": "192.168.1.100", "device": "Chrome"})
publish_event("user_signup", 1002, {"email": "bob@example.com", "plan": "free"})

producer.flush()
producer.close()

Output:

Sent user_signup for user 1001 -> partition=0 offset=0
Sent user_login for user 1001 -> partition=0 offset=1
Sent user_signup for user 1002 -> partition=1 offset=0

Using the user ID as the message key ensures all events for the same user land on the same partition, which preserves ordering per user. The acks="all" setting waits for all in-sync replicas to confirm the write before returning — this is the safest setting for data that must not be lost.

acks=all -- your event stream is not the place for fire-and-forget.
acks=all — your event stream is not the place for fire-and-forget.

Consumer Groups and Parallel Processing

Consumer groups allow multiple instances of your service to consume from the same topic in parallel, each handling a different partition. Kafka guarantees that each partition is consumed by exactly one consumer in a group at a time — this is how you scale out consumption without processing the same message twice.

# json_consumer.py
import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers="localhost:9092",
    group_id="analytics-service",    # Consumer group ID
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    key_deserializer=lambda k: k.decode("utf-8") if k else None,
    auto_offset_reset="earliest",    # Start from beginning if no committed offset
    enable_auto_commit=True,         # Commit offsets automatically every 5 seconds
    auto_commit_interval_ms=5000,
    max_poll_records=10,             # Process at most 10 records per poll
)

print("Analytics consumer started. Listening for events...")
try:
    for message in consumer:
        event = message.value
        print(f"[partition={message.partition} offset={message.offset}]")
        print(f"  Type: {event['event_type']}")
        print(f"  User: {event['user_id']}")
        print(f"  Time: {event['timestamp']}")
        print(f"  Data: {event['payload']}")
        print()
except KeyboardInterrupt:
    print("Consumer stopped.")
finally:
    consumer.close()

Output:

Analytics consumer started. Listening for events...
[partition=0 offset=0]
  Type: user_signup
  User: 1001
  Time: 2026-05-15T09:00:00.123456
  Data: {'email': 'alice@example.com', 'plan': 'pro'}

[partition=0 offset=1]
  Type: user_login
  User: 1001
  Time: 2026-05-15T09:00:01.456789
  Data: {'ip': '192.168.1.100', 'device': 'Chrome'}

If you start a second instance of this consumer with the same group_id, Kafka will rebalance and assign partitions between them. The first consumer keeps partition 0, the second gets partition 1. Scale horizontally by adding consumers — Kafka handles the rebalance automatically.

Manual Offset Commits for Exactly-Once Processing

Auto-commit offsets can cause duplicate processing: if your consumer crashes between receiving a message and committing its offset, it will re-process the message on restart. For sensitive operations (database writes, payments), use manual commits to commit only after successful processing.

# manual_commit.py
import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers="localhost:9092",
    group_id="payment-service",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=False,   # Manual commits only
)

def process_event(event: dict) -> bool:
    """Process the event. Returns True on success."""
    print(f"Processing {event['event_type']} for user {event['user_id']}")
    # Simulate database write, payment charge, etc.
    return True

print("Payment consumer started (manual commits).")
try:
    for message in consumer:
        event = message.value
        success = process_event(event)
        if success:
            # Only commit AFTER successful processing
            consumer.commit({
                message.partition: {
                    "offset": message.offset + 1,
                    "metadata": "processed_ok"
                }
            })
            print(f"  Committed offset {message.offset + 1}")
        else:
            print(f"  Processing failed -- NOT committing offset {message.offset}")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Output:

Payment consumer started (manual commits).
Processing user_signup for user 1001
  Committed offset 1
Processing user_login for user 1001
  Committed offset 2

The consumer.commit() call takes a dictionary mapping partitions to their next offset (current offset + 1). If the consumer crashes before committing, Kafka will re-deliver the uncommitted messages to the next consumer instance — guaranteeing at-least-once delivery for sensitive operations.

enable_auto_commit=False -- payment processing is not a fire-and-hope situation.
enable_auto_commit=False — payment processing is not a fire-and-hope situation.

Real-Life Example: Order Processing Pipeline

Here is a complete event-driven order processing pipeline with a producer that publishes order events and a consumer that processes them with manual commits and error handling.

# order_pipeline.py
import json
import time
import uuid
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from dataclasses import dataclass, asdict
from datetime import datetime

BOOTSTRAP = "localhost:9092"
TOPIC = "orders"

@dataclass
class Order:
    order_id: str
    user_id: int
    items: list
    total_usd: float
    status: str = "pending"
    created_at: str = ""

    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat()

class OrderProducer:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=BOOTSTRAP,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            acks="all",
        )

    def publish(self, order: Order) -> None:
        future = self.producer.send(TOPIC, value=asdict(order))
        meta = future.get(timeout=10)
        print(f"[Producer] Order {order.order_id} sent (offset={meta.offset})")

    def close(self):
        self.producer.flush()
        self.producer.close()

class OrderConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            TOPIC,
            bootstrap_servers=BOOTSTRAP,
            group_id="order-processor",
            value_deserializer=lambda v: json.loads(v.decode("utf-8")),
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            consumer_timeout_ms=5000,
        )

    def process(self, order: dict) -> bool:
        """Process order: validate, charge, fulfill."""
        print(f"[Consumer] Processing order {order['order_id']} "
              f"(${order['total_usd']:.2f} for user {order['user_id']})")
        time.sleep(0.05)  # Simulate processing
        return True

    def run(self):
        print("[Consumer] Listening for orders...")
        for message in self.consumer:
            order = message.value
            try:
                success = self.process(order)
                if success:
                    self.consumer.commit()
                    print(f"[Consumer] Order {order['order_id']} fulfilled.")
            except Exception as e:
                print(f"[Consumer] Error processing {order['order_id']}: {e}")
        self.consumer.close()

# --- Run producer ---
producer = OrderProducer()
orders = [
    Order(str(uuid.uuid4())[:8], 1001, ["book", "pen"], 29.99),
    Order(str(uuid.uuid4())[:8], 1002, ["laptop"],    999.00),
    Order(str(uuid.uuid4())[:8], 1001, ["notebook"],    5.49),
]
for order in orders:
    producer.publish(order)
producer.close()

# --- Run consumer ---
OrderConsumer().run()

Output:

[Producer] Order a1b2c3d4 sent (offset=0)
[Producer] Order e5f6g7h8 sent (offset=1)
[Producer] Order i9j0k1l2 sent (offset=2)
[Consumer] Listening for orders...
[Consumer] Processing order a1b2c3d4 ($29.99 for user 1001)
[Consumer] Order a1b2c3d4 fulfilled.
[Consumer] Processing order e5f6g7h8 ($999.00 for user 1002)
[Consumer] Order e5f6g7h8 fulfilled.
[Consumer] Processing order i9j0k1l2 ($5.49 for user 1001)
[Consumer] Order i9j0k1l2 fulfilled.

This pattern separates the producer and consumer into distinct classes, making it easy to run them in separate processes or containers. The manual commit ensures that if the consumer crashes mid-batch, unprocessed orders are re-delivered on restart. Add a dead-letter topic to capture orders that fail processing after N retries.

Consumer groups: every partition processed by exactly one consumer.
Consumer groups: every partition processed by exactly one consumer.

Frequently Asked Questions

When should I use Kafka vs RabbitMQ?

Use Kafka when you need message replay (consuming old events), very high throughput (millions of messages per second), or event sourcing where the log is the source of truth. Use RabbitMQ when you need complex routing (exchanges, bindings, fanout), priority queues, or simpler deployment. Kafka requires a broker cluster and has higher operational overhead than RabbitMQ but scales further.

How do I install kafka-python?

Run pip install kafka-python. Note that there is also a fork called kafka-python-ng (a community-maintained version) and a separate library called confluent-kafka-python (from Confluent, the company behind Kafka). The confluent-kafka library uses a native C extension and has better performance for high-throughput scenarios, but requires librdkafka to be installed.

How many partitions should a topic have?

A common starting point is one partition per consumer you plan to run in parallel. More partitions allow more parallelism but increase overhead. Start with 3-6 partitions for most topics and scale up if needed. Remember: you can increase partitions but not decrease them after a topic is created, so start conservative.

What serialization format should I use?

JSON is the most readable and is fine for most use cases. For high-throughput scenarios where bandwidth matters, consider Apache Avro (with a schema registry) or Protocol Buffers. Both provide schema evolution support and much smaller message sizes than JSON. The confluent-kafka library integrates natively with the Confluent Schema Registry.

What does at-least-once delivery mean?

At-least-once means every message is guaranteed to be delivered, but may be delivered more than once in failure scenarios (consumer crash before commit, network retries). Your consumer must be idempotent — processing the same message twice should produce the same result. For example, use database upserts instead of inserts, or track processed message IDs in a deduplications table.

Conclusion

The kafka-python library gives you a clean Python API for the full Kafka producer and consumer lifecycle. We covered producing JSON messages with custom serializers, consuming with consumer groups for parallel processing, manual offset commits for reliable at-least-once delivery, and a complete order processing pipeline as a real-world example. The key patterns — JSON serializers, group_id for parallel consumption, and manual commits for critical workloads — will serve you in any Python Kafka integration.

Try extending the order pipeline with a dead-letter topic: if processing fails after 3 retries, produce the failed order to a orders-dlq topic for manual inspection instead of blocking the consumer.

For the full API reference, see the kafka-python documentation and the Apache Kafka documentation.

How To Use Python atexit for Graceful Shutdown

How To Use Python atexit for Graceful Shutdown

Beginner

Your Python script connects to a database, opens a temp file, and acquires a lock file. Then an unhandled exception crashes it. The temp file is left on disk, the lock is never released, and the next run refuses to start because the lock file still exists. This kind of “dirty exit” is one of the most frustrating classes of bugs to debug — and the atexit module is the standard library’s answer to it.

atexit lets you register cleanup functions that Python guarantees to call when the interpreter exits normally — whether the program finishes naturally, calls sys.exit(), or raises an unhandled exception. It is simpler than try/finally blocks because you register the cleanup once, anywhere in your code, and forget about it.

In this article we will cover registering cleanup functions with atexit.register(), passing arguments to cleanup functions, the execution order of multiple handlers, the limitations of atexit, and how it compares to try/finally and context managers. By the end you will know exactly when and how to use atexit for reliable resource cleanup.

Python atexit: Quick Example

# quick_atexit.py
import atexit

def cleanup():
    print("Cleanup: releasing resources.")

atexit.register(cleanup)

print("Script running...")
print("Script done.")

Output:

Script running...
Script done.
Cleanup: releasing resources.

The cleanup function runs after print("Script done.") — after the main script body finishes. You can register it at any point in your code, even before the resources it cleans up have been created. This is the core pattern: register early, clean up late.

What Is atexit and Why Use It?

The atexit module maintains a stack of registered functions. When the Python interpreter shuts down, it pops and calls each function in last-in-first-out order. This mirrors the natural teardown order of programs — the last thing you set up is usually the first thing you need to clean up.

ApproachProsCons
atexit.register()Simple, global, works with sys.exit()Does not run on SIGKILL or os._exit()
try/finallyPrecise scope, catches all exceptionsMust wrap every code path manually
Context managers (with)Pythonic, automatic, reusableMust use with block everywhere
Signal handlersHandles OS signals (SIGTERM, etc.)Unix-specific, more complex

atexit is not a replacement for context managers — it is a complement. Use context managers for local resource management and atexit for process-level cleanup that must happen regardless of where the exit originates.

Registering Handlers with Arguments

You often need to pass the resource you are cleaning up to the handler. atexit.register() accepts positional and keyword arguments that are forwarded to the function when it is called.

# atexit_with_args.py
import atexit
import tempfile
import os

def delete_temp_file(path: str) -> None:
    if os.path.exists(path):
        os.remove(path)
        print(f"Deleted temp file: {path}")
    else:
        print(f"Temp file already gone: {path}")

# Create temp file and register cleanup immediately
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".tmp")
tmp.write(b"Temporary data")
tmp.close()

print(f"Created: {tmp.name}")

# Register the cleanup with the path as an argument
atexit.register(delete_temp_file, tmp.name)

print("Doing some work...")
# Imagine this raises an exception here -- cleanup still runs
print("Work done.")

Output:

Created: /tmp/tmpAbc123.tmp
Doing some work...
Work done.
Deleted temp file: /tmp/tmpAbc123.tmp

The function signature is atexit.register(func, *args, **kwargs). The args and kwargs are captured at registration time — they are not re-evaluated when the handler runs. This means you can safely register a cleanup with a path even before the file exists, as long as the handler checks existence before deleting.

atexit.register() -- the cleanup function that runs even when you forget to call it.
atexit.register() — the cleanup function that runs even when you forget to call it.

Execution Order of Multiple Handlers

Handlers are called in LIFO (last-in-first-out) order, like a stack. The most recently registered handler runs first. This mirrors the natural teardown sequence — if you open a database connection and then acquire a lock, you should release the lock before closing the connection.

# atexit_order.py
import atexit

def teardown_db():
    print("3. Database connection closed.")

def release_lock():
    print("2. Lock released.")

def cleanup_cache():
    print("1. Cache flushed.")

# Register in setup order
atexit.register(teardown_db)
atexit.register(release_lock)
atexit.register(cleanup_cache)

print("Application running...")

Output:

Application running...
1. Cache flushed.
2. Lock released.
3. Database connection closed.

Even though teardown_db was registered first, it runs last. This is the correct order: flush the cache (which may need the lock), release the lock, then close the database. Plan your registration order accordingly.

Unregistering Handlers

Sometimes you set up a resource, register its cleanup, but then explicitly clean it up mid-program. You do not want the atexit handler to run a second time at exit. Use atexit.unregister(func) to remove a previously registered handler.

# atexit_unregister.py
import atexit
import os

LOCK_FILE = "/tmp/myapp.lock"

def remove_lock():
    if os.path.exists(LOCK_FILE):
        os.remove(LOCK_FILE)
        print(f"Lock removed: {LOCK_FILE}")

# Create lock file
with open(LOCK_FILE, "w") as f:
    f.write(str(os.getpid()))
print(f"Lock created: {LOCK_FILE}")

atexit.register(remove_lock)

# ... do some work ...

# Mid-program explicit cleanup
remove_lock()
atexit.unregister(remove_lock)  # Don't run again at exit

print("Lock explicitly released. No double cleanup.")

Output:

Lock created: /tmp/myapp.lock
Lock removed: /tmp/myapp.lock
Lock explicitly released. No double cleanup.

atexit.unregister(func) removes all registrations of that function, even if it was registered multiple times. Note that unregister was added in Python 3.2 — if you need to support older versions, use a flag variable inside the handler instead.

atexit.unregister() -- for when you clean up early and want no double-tap.
atexit.unregister() — for when you clean up early and want no double-tap.

Limitations of atexit

Understanding when atexit does NOT run is as important as knowing when it does. Three situations bypass atexit entirely:

# atexit_limitations.py
import atexit
import os
import signal

def my_cleanup():
    print("Cleanup ran!")

atexit.register(my_cleanup)

# Case 1: os._exit() -- bypasses atexit entirely
# os._exit(0)  # Uncomment to test -- cleanup will NOT run

# Case 2: SIGKILL -- the OS terminates with no handler
# os.kill(os.getpid(), signal.SIGKILL)  # Uncomment -- cleanup will NOT run

# Case 3: Fatal interpreter errors (e.g., C extension segfault)
# -- atexit does not run

# Normal exit: atexit DOES run
# sys.exit() -- atexit runs
# Unhandled exception -- atexit runs (after printing traceback)
# Program ends naturally -- atexit runs

print("Program ending normally. atexit will run.")

Output:

Program ending normally. atexit will run.
Cleanup ran!

The three cases where atexit is bypassed: os._exit() (used by multiprocessing worker processes), SIGKILL (uncatchable), and fatal C-level errors. For Kubernetes deployments that send SIGTERM before SIGKILL, combine atexit with a SIGTERM handler (from the signal module) to cover all normal shutdown paths.

Real-Life Example: Application Bootstrap with Cleanup Stack

Here is a complete application bootstrap pattern that registers multiple cleanup handlers as resources come online.

# app_bootstrap.py
import atexit
import os
import json
import time
from pathlib import Path

class AppResources:
    """Manages application resources with atexit-based cleanup."""

    def __init__(self, app_name: str):
        self.app_name = app_name
        self.lock_path = Path(f"/tmp/{app_name}.lock")
        self.log_path = Path(f"/tmp/{app_name}.log")
        self.log_file = None
        self.start_time = time.time()

    def acquire_lock(self):
        if self.lock_path.exists():
            pid = self.lock_path.read_text().strip()
            raise RuntimeError(f"App already running (PID {pid}). Delete {self.lock_path} to force.")
        self.lock_path.write_text(str(os.getpid()))
        print(f"[{self.app_name}] Lock acquired: {self.lock_path}")
        atexit.register(self._release_lock)  # Register cleanup right away

    def open_log(self):
        self.log_file = open(self.log_path, "a")
        self.log_file.write(f"=== {self.app_name} started ===\n")
        print(f"[{self.app_name}] Log opened: {self.log_path}")
        atexit.register(self._close_log)  # Registered after lock, so runs before lock cleanup

    def _release_lock(self):
        if self.lock_path.exists():
            self.lock_path.unlink()
            uptime = time.time() - self.start_time
            print(f"[{self.app_name}] Lock released. Uptime: {uptime:.1f}s")

    def _close_log(self):
        if self.log_file and not self.log_file.closed:
            self.log_file.write(f"=== {self.app_name} stopped ===\n")
            self.log_file.close()
            print(f"[{self.app_name}] Log closed.")

def main():
    app = AppResources("myapp")
    app.acquire_lock()
    app.open_log()

    print("Application doing work...")
    time.sleep(0.1)

    # Write some log entries
    app.log_file.write("Processed 42 requests\n")
    print("Work complete.")
    # atexit handlers fire here automatically

if __name__ == "__main__":
    main()

Output:

[myapp] Lock acquired: /tmp/myapp.lock
[myapp] Log opened: /tmp/myapp.log
Application doing work...
Work complete.
[myapp] Log closed.
[myapp] Lock released. Uptime: 0.1s

Notice the LIFO order: _close_log runs before _release_lock, which is correct because we need to write the final log entry before closing the file, and we need the log closed before releasing the lock. By registering each cleanup immediately after acquiring each resource, you get the right teardown order automatically.

Last in, first out. Register in setup order, tear down in reverse.
Last in, first out. Register in setup order, tear down in reverse.

Frequently Asked Questions

Should I use atexit instead of try/finally?

No — use both, for different purposes. try/finally is best for local resource management within a function or code block. atexit is best for process-level cleanup that needs to run regardless of where in the code the exit happens. Context managers (with statements) are the cleanest way to handle local resource cleanup in modern Python.

What happens if an atexit handler raises an exception?

Python prints the exception traceback and continues calling the remaining handlers. No handler can prevent the other handlers from running. After all handlers have run, Python exits with code 1. This means you should write handlers defensively — check if the resource exists before accessing it, and wrap risky operations in their own try/except.

Do atexit handlers run in threads?

No. atexit handlers are registered globally and run in the main thread during interpreter shutdown. They do not run when individual threads finish — only when the whole process exits. If you need cleanup when a thread exits, use threading.Thread subclassing or a try/finally block inside the thread’s target function.

Does atexit work with multiprocessing?

Each process has its own atexit stack. Handlers registered in the main process do not run in child processes, and vice versa. Worker processes created by multiprocessing.Pool use os._exit() to terminate, which bypasses atexit entirely. Register cleanup in the main process for shared resources, and use multiprocessing.Pool‘s finalizer argument for worker-level cleanup.

Is the execution order of atexit handlers guaranteed?

Yes — handlers are always called in LIFO (last-in-first-out) order. The CPython implementation guarantees this. The guarantee holds across normal exits, sys.exit(), and unhandled exceptions. Plan your registration order with the teardown sequence in mind.

Conclusion

The atexit module provides a simple, reliable way to register cleanup functions at the process level. We covered registering handlers with and without arguments, understanding LIFO execution order, unregistering handlers with atexit.unregister(), and the limitations of atexit (os._exit, SIGKILL). The app bootstrap example demonstrates a production-ready pattern for managing multiple resources cleanly.

Extend the bootstrap pattern by combining it with a SIGTERM handler from the signal module — that covers graceful shutdown from process managers like systemd and Kubernetes, where atexit alone is not sufficient.

For the full API reference, see the Python atexit documentation.

How To Use Python signal for Unix Signal Handling

How To Use Python signal for Unix Signal Handling

Intermediate

You have a Python worker process running in production. Kubernetes sends it a SIGTERM to gracefully shut it down before a deployment. Your process ignores it and gets hard-killed 30 seconds later, losing the job it was halfway through. Sound familiar? Signal handling is how long-running Python processes stay in control of their own fate — and it is built right into the standard library.

The signal module lets you register Python functions as handlers for Unix signals like SIGTERM, SIGINT, SIGHUP, and SIGUSR1. When the operating system delivers a signal to your process, Python interrupts the main thread and calls your handler. No threads needed, no polling loops — the OS does the heavy lifting.

In this article we will cover the most important signals and what they mean, how to register signal handlers with signal.signal(), how to implement graceful shutdown patterns, how to reload configuration on SIGHUP, and how signals interact with threads. By the end you will know how to write Python daemons and workers that handle signals correctly.

Python signal: Quick Example

Here is the simplest possible signal handler — catching Ctrl+C (SIGINT) and printing a friendly message instead of a traceback:

# quick_signal.py
import signal
import sys
import time

def handle_sigint(signum, frame):
    print("\nCaught SIGINT (Ctrl+C). Exiting cleanly.")
    sys.exit(0)

# Register the handler
signal.signal(signal.SIGINT, handle_sigint)

print("Running. Press Ctrl+C to stop.")
while True:
    time.sleep(1)
    print("Working...")

Output (after pressing Ctrl+C):

Running. Press Ctrl+C to stop.
Working...
Working...
^CCaught SIGINT (Ctrl+C). Exiting cleanly.

The handler function receives two arguments: signum (the signal number) and frame (the current stack frame). You can use signum to write a single handler for multiple signals.

What Are Unix Signals?

Signals are asynchronous notifications sent by the OS or other processes to a running process. Think of them as hardware interrupts for software — the process stops what it is doing, runs the handler, and resumes. They are used for lifecycle management, inter-process communication, and error reporting.

SignalDefault ActionCommon Use
SIGINT (2)TerminateCtrl+C in terminal
SIGTERM (15)TerminateGraceful shutdown request (kill, kubectl)
SIGHUP (1)TerminateTerminal hangup; reload config in daemons
SIGUSR1 (10)TerminateUser-defined; dump stats, toggle debug
SIGUSR2 (12)TerminateUser-defined; second custom action
SIGKILL (9)Terminate (uncatchable)Force-kill; cannot be caught or ignored
SIGCHLD (17)IgnoreChild process state change

Note: SIGKILL and SIGSTOP cannot be caught, blocked, or ignored. If the OS sends SIGKILL, your process dies immediately with no handler call. This is why SIGTERM exists — it is the polite request before the OS resorts to SIGKILL.

Implementing Graceful Shutdown

The most important signal pattern for production services is graceful shutdown on SIGTERM. Instead of dying instantly, the process finishes its current unit of work, flushes buffers, closes connections, and exits cleanly.

# graceful_shutdown.py
import signal
import sys
import time
import threading

class Worker:
    def __init__(self):
        self.running = True
        self.current_job = None
        self._shutdown_event = threading.Event()

        # Register handlers
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)

    def _handle_shutdown(self, signum, frame):
        sig_name = signal.Signals(signum).name
        print(f"\n[{sig_name}] Shutdown requested. Finishing current job...")
        self.running = False
        self._shutdown_event.set()

    def process_job(self, job_id: int) -> None:
        self.current_job = job_id
        print(f"  Processing job #{job_id}...")
        time.sleep(0.5)  # Simulate work
        self.current_job = None
        print(f"  Job #{job_id} complete.")

    def run(self):
        print("Worker started. Send SIGTERM or Ctrl+C to stop.")
        job_id = 0
        while self.running:
            job_id += 1
            self.process_job(job_id)
            # Wait a bit between jobs; wake up immediately on shutdown
            self._shutdown_event.wait(timeout=0.2)

        print("Worker exited cleanly. Goodbye.")

if __name__ == "__main__":
    worker = Worker()
    worker.run()

Output (after sending SIGTERM with kill -TERM <pid>):

Worker started. Send SIGTERM or Ctrl+C to stop.
  Processing job #1...
  Job #1 complete.
  Processing job #2...
[SIGTERM] Shutdown requested. Finishing current job...
  Job #2 complete.
Worker exited cleanly. Goodbye.

The key design is the self.running flag. The signal handler sets it to False and signals the threading.Event. The main loop checks self.running between jobs, so it finishes the current job before exiting. No job is lost, no file is left half-written.

SIGTERM is a request. self.running = False is the answer.
SIGTERM is a request. self.running = False is the answer.

Reloading Configuration on SIGHUP

On Linux, SIGHUP was originally sent when a terminal disconnected. For daemons, it became the convention for “reload your configuration without restarting.” This lets you update a config file and reload it in a running service without downtime.

# config_reload.py
import signal
import json
import time
from pathlib import Path

CONFIG_PATH = Path("/tmp/app_config.json")

# Write an initial config file
CONFIG_PATH.write_text(json.dumps({"log_level": "INFO", "max_retries": 3}))

config = {}

def load_config():
    global config
    try:
        config = json.loads(CONFIG_PATH.read_text())
        print(f"Config loaded: {config}")
    except (json.JSONDecodeError, FileNotFoundError) as e:
        print(f"Config load failed: {e}. Keeping previous config.")

def handle_sighup(signum, frame):
    print("\n[SIGHUP] Reloading configuration...")
    load_config()

signal.signal(signal.SIGHUP, handle_sighup)
load_config()

print("Daemon running. Send SIGHUP to reload config.")
for i in range(5):
    print(f"  Tick {i+1} -- log_level={config.get('log_level')}")
    time.sleep(1)

Output (after editing config and sending kill -HUP <pid>):

Config loaded: {'log_level': 'INFO', 'max_retries': 3}
Daemon running. Send SIGHUP to reload config.
  Tick 1 -- log_level=INFO
  Tick 2 -- log_level=INFO
[SIGHUP] Reloading configuration...
Config loaded: {'log_level': 'DEBUG', 'max_retries': 5}
  Tick 3 -- log_level=DEBUG
  Tick 4 -- log_level=DEBUG

Important: signal handlers run in the main thread and should be short and fast. The handler itself just calls load_config(), which is synchronous and quick. For expensive reloads (re-establishing database connections, re-reading large files), set a flag in the handler and do the actual work in the main loop instead.

Signals and Threads

Python only delivers signals to the main thread. Worker threads cannot register signal handlers or receive signals directly. This is a critical constraint for multithreaded programs — if your main thread is blocked waiting for a thread join, signals may be delayed until the join returns.

# signal_threads.py
import signal
import threading
import time

shutdown_event = threading.Event()

def handle_shutdown(signum, frame):
    print(f"\n[Signal {signum}] Stopping threads...")
    shutdown_event.set()  # Thread-safe way to signal workers

signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)

def worker_thread(thread_id: int):
    while not shutdown_event.is_set():
        print(f"  Thread {thread_id} working...")
        shutdown_event.wait(timeout=1.0)  # Interruptible sleep
    print(f"  Thread {thread_id} exiting.")

threads = [threading.Thread(target=worker_thread, args=(i,)) for i in range(3)]
for t in threads:
    t.start()

# Main thread: keep the signal handlers alive and wait
while not shutdown_event.is_set():
    time.sleep(0.1)  # Short sleep keeps the main thread responsive to signals

for t in threads:
    t.join()
print("All threads stopped.")

Output (after Ctrl+C):

  Thread 0 working...
  Thread 1 working...
  Thread 2 working...
^C
[Signal 2] Stopping threads.
  Thread 0 exiting.
  Thread 1 exiting.
  Thread 2 exiting.
All threads stopped.

The pattern is: signal handler sets a threading.Event, worker threads check the event with wait(timeout=...)` instead of bare time.sleep(). The wait() call wakes up as soon as the event is set, so threads exit promptly instead of sleeping through the shutdown signal.

threading.Event.set() -- the only thread-safe thing you should do inside a signal handler.
threading.Event.set() -- the only thread-safe thing you should do inside a signal handler.

Real-Life Example: Self-Diagnosing Worker Daemon

This example combines SIGTERM graceful shutdown, SIGHUP config reload, and SIGUSR1 for dumping runtime stats -- a complete signal handling setup for a production daemon.

# worker_daemon.py
import signal
import json
import time
import sys
from pathlib import Path
from datetime import datetime

class Daemon:
    def __init__(self):
        self.running = True
        self.jobs_processed = 0
        self.start_time = datetime.now()
        self.config = {"worker_name": "default", "sleep_interval": 0.5}

        signal.signal(signal.SIGTERM, self._on_shutdown)
        signal.signal(signal.SIGINT,  self._on_shutdown)
        signal.signal(signal.SIGHUP,  self._on_reload)
        signal.signal(signal.SIGUSR1, self._on_stats)

    def _on_shutdown(self, signum, frame):
        print(f"\n[{signal.Signals(signum).name}] Graceful shutdown initiated.")
        self.running = False

    def _on_reload(self, signum, frame):
        print("\n[SIGHUP] Config reloaded (simulated).")
        self.config["worker_name"] = f"reloaded_{int(time.time())}"

    def _on_stats(self, signum, frame):
        elapsed = (datetime.now() - self.start_time).seconds
        rate = self.jobs_processed / elapsed if elapsed else 0
        print(f"\n[SIGUSR1] Stats -- jobs:{self.jobs_processed} uptime:{elapsed}s rate:{rate:.1f}/s")

    def run(self):
        print(f"Daemon '{self.config['worker_name']}' started. PID: {sys.argv[0]}")
        print("Signals: SIGTERM=shutdown, SIGHUP=reload, SIGUSR1=stats")
        while self.running:
            self.jobs_processed += 1
            time.sleep(self.config["sleep_interval"])

        print(f"Daemon stopped. Total jobs: {self.jobs_processed}")

if __name__ == "__main__":
    Daemon().run()

Output (interaction with multiple signals):

Daemon 'default' started.
Signals: SIGTERM=shutdown, SIGHUP=reload, SIGUSR1=stats

[SIGUSR1] Stats -- jobs:4 uptime:2s rate:2.0/s
[SIGHUP] Config reloaded (simulated).
[SIGUSR1] Stats -- jobs:9 uptime:4s rate:2.2/s

[SIGTERM] Graceful shutdown initiated.
Daemon stopped. Total jobs: 11

This three-signal pattern (SIGTERM, SIGHUP, SIGUSR1) covers the lifecycle of most production Python daemons. Add SIGUSR2 for a second diagnostic action -- for example, dumping a thread stack trace or triggering a garbage collection report.

SIGTERM is the polite ask. SIGKILL is the eviction notice.
SIGTERM is the polite ask. SIGKILL is the eviction notice.

Frequently Asked Questions

Does the signal module work on Windows?

Partially. On Windows, Python only supports a subset of signals: SIGTERM, SIGINT, SIGABRT, SIGFPE, SIGILL, SIGSEGV, and SIGBREAK. Unix-specific signals like SIGHUP, SIGUSR1, and SIGUSR2 are not available. If you are writing cross-platform code, check sys.platform before registering Unix-specific handlers.

What is safe to do inside a signal handler?

Keep signal handlers minimal. Setting a flag (self.running = False), setting a threading.Event, or calling sys.exit() are safe. Avoid complex logic, I/O operations, memory allocation, or calling non-reentrant functions. The reason is that signal handlers can interrupt your code at any point, including in the middle of a malloc or dict lookup -- complex handlers risk deadlocks and memory corruption.

How do signals work with asyncio?

Use loop.add_signal_handler(signal.SIGTERM, callback) instead of signal.signal() when working with asyncio. The asyncio-aware version integrates with the event loop so your callback runs as part of the event loop rather than interrupting it. Call loop.stop() inside the callback for a clean asyncio shutdown.

How do I ignore a signal?

Pass signal.SIG_IGN as the handler: signal.signal(signal.SIGHUP, signal.SIG_IGN). This tells the OS to silently discard the signal. Use this when you want a daemon to survive terminal hangups without reloading. To restore the default OS behavior, pass signal.SIG_DFL instead.

How do I send a signal from Python?

Use os.kill(pid, signal.SIGTERM) to send a signal to another process. To send to the current process itself, use os.kill(os.getpid(), signal.SIGUSR1). This is useful in tests -- you can programmatically trigger a signal handler to verify it works correctly without relying on external tools.

Conclusion

The signal module is the foundation of well-behaved Python daemons and long-running workers. We covered registering handlers with signal.signal(), implementing graceful shutdown on SIGTERM, reloading configuration on SIGHUP, using SIGUSR1 for runtime diagnostics, and coordinating signal handlers with worker threads via threading.Event. The daemon example brings all three patterns together in a reusable class structure.

Try adding a SIGUSR2 handler to your next worker that dumps a full stack trace of all running threads using traceback.print_stack() -- it is an invaluable debugging tool for stuck processes in production.

For the full API reference, see the Python signal documentation.

How To Use Python zlib and gzip for File Compression

How To Use Python zlib and gzip for File Compression

Intermediate

Log files, JSON exports, and database backups can balloon to gigabytes fast. Compressing them before storage or transfer is a quick win that costs almost nothing in code — but few Python developers know the standard library already ships with everything needed. You do not need a third-party package to compress files in Python; zlib and gzip are right there, waiting.

The zlib module provides raw DEFLATE compression, while gzip wraps it in the well-known .gz format compatible with Unix tools like gunzip and zcat. Both are part of the Python standard library and require no installation.

In this article we will cover compressing and decompressing bytes with zlib, reading and writing .gz files with gzip, streaming large files without loading them into memory, and choosing the right compression level. By the end you will know how to shrink files efficiently in pure Python.

Python zlib: Quick Example

Here is the fastest way to compress a string with zlib:

# quick_zlib.py
import zlib

original = b"Python compression is surprisingly simple and very useful for large data."

# Compress
compressed = zlib.compress(original)
print(f"Original size:   {len(original)} bytes")
print(f"Compressed size: {len(compressed)} bytes")
print(f"Ratio: {len(compressed)/len(original):.1%}")

# Decompress
restored = zlib.decompress(compressed)
print(f"Restored: {restored.decode()}")

Output:

Original size:   71 bytes
Compressed size: 63 bytes
Ratio: 88.7%
Restored: Python compression is surprisingly simple and very useful for large data.

zlib.compress() returns a bytes object. The compression ratio improves dramatically with larger, more repetitive data — a 10MB log file with repeated lines can compress to under 1MB. The quick example above shows minimal gains because the input is too short for patterns to emerge.

What Are zlib and gzip?

Both modules implement the DEFLATE algorithm, which combines LZ77 sliding window compression with Huffman coding. The key difference is the file format wrapper:

Featurezlibgzip
FormatRaw DEFLATE + zlib header.gz file format
Interoperable with CLI toolsNoYes (gunzip, zcat, gzip)
File-like APINoYes (gzip.open)
Best forIn-memory bytes, network streamsFile storage, log archival
ChecksumAdler-32CRC-32

Use zlib when you are compressing bytes in memory — for example, before writing to a database blob or sending over a network socket. Use gzip when you need a standard .gz file that other tools and systems can read.

Compression Levels

Both zlib and gzip accept a level parameter from 0 (no compression) to 9 (maximum compression). Higher levels take more CPU time but produce smaller files. Level -1 (the default) uses level 6, which is a good balance for most use cases.

# compression_levels.py
import zlib

data = b"The quick brown fox jumps over the lazy dog. " * 1000  # ~45KB of repetitive text

for level in [1, 3, 6, 9]:
    compressed = zlib.compress(data, level=level)
    ratio = len(compressed) / len(data)
    print(f"Level {level}: {len(compressed):,} bytes ({ratio:.1%} of original)")

Output:

Level 1: 204 bytes (0.5% of original)
Level 3: 204 bytes (0.5% of original)
Level 6: 192 bytes (0.4% of original)
Level 9: 192 bytes (0.4% of original)

For highly repetitive data, even level 1 is extremely effective. For diverse text or binary data, higher levels help more. In practice, use level 6 (default) for file archival and level 1 for network streams where latency matters more than size.

Level 1 or level 9? Just use the default. The difference is usually under 5%.
Level 1 or level 9? Just use the default. The difference is usually under 5%.

Reading and Writing .gz Files with gzip

The gzip.open() function gives you a file-like object that compresses transparently. You read and write it exactly like a normal file, but the data on disk is compressed. This is the best approach for compressing log files, CSV exports, or any text data you want to archive.

# gzip_files.py
import gzip
import os

# --- Write a .gz file ---
output_path = "/tmp/sample_log.txt.gz"
lines = [
    f"2026-05-15 09:{i:02d}:00 INFO Request processed successfully\n"
    for i in range(60)
]

with gzip.open(output_path, "wt", encoding="utf-8") as f:
    f.writelines(lines)

print(f"Written: {output_path}")
print(f"File size: {os.path.getsize(output_path):,} bytes")

# --- Read it back ---
with gzip.open(output_path, "rt", encoding="utf-8") as f:
    content = f.read()

line_count = content.count("\n")
print(f"Lines read back: {line_count}")
print(f"First line: {content.splitlines()[0]}")

Output:

Written: /tmp/sample_log.txt.gz
File size: 172 bytes
Lines read back: 60
First line: 2026-05-15 09:00:00 INFO Request processed successfully

Use "wt" mode for text (UTF-8) and "wb" for binary data. The gzip.open() interface is identical to the built-in open(), so you can often swap between compressed and uncompressed files by changing just the function name.

Streaming Large Files

Loading a 2GB log file into memory to compress it will crash your program. Stream it instead: read chunks from the source, feed them to a zlib.compressobj, and write compressed chunks to the output. This keeps memory usage constant regardless of file size.

# stream_compress.py
import zlib
import os

def compress_file_streaming(src_path: str, dst_path: str, chunk_size: int = 65536) -> dict:
    """
    Compress a file using streaming zlib compression.
    Memory usage stays at chunk_size regardless of file size.
    """
    compressor = zlib.compressobj(level=6, wbits=31)  # wbits=31 = gzip format

    original_size = 0
    compressed_size = 0

    with open(src_path, "rb") as src, open(dst_path, "wb") as dst:
        while True:
            chunk = src.read(chunk_size)
            if not chunk:
                break
            original_size += len(chunk)
            compressed_chunk = compressor.compress(chunk)
            if compressed_chunk:
                compressed_size += len(compressed_chunk)
                dst.write(compressed_chunk)

        # Flush remaining compressed data
        final_chunk = compressor.flush()
        compressed_size += len(final_chunk)
        dst.write(final_chunk)

    return {
        "original_bytes": original_size,
        "compressed_bytes": compressed_size,
        "ratio": compressed_size / original_size if original_size else 0,
    }

# Create a test file
test_file = "/tmp/test_data.txt"
with open(test_file, "w") as f:
    for i in range(10000):
        f.write(f"Log entry {i}: Request from 192.168.1.{i % 255} processed in {i % 100}ms\n")

result = compress_file_streaming(test_file, test_file + ".gz")
print(f"Original:   {result['original_bytes']:,} bytes")
print(f"Compressed: {result['compressed_bytes']:,} bytes")
print(f"Ratio:      {result['ratio']:.1%}")

Output:

Original:   680,000 bytes
Compressed: 9,847 bytes
Ratio:      1.4%

The wbits=31 parameter tells zlib to write the gzip format header — the output file is a valid .gz file that you can open with any gzip-compatible tool. The compress() method may return an empty bytes object for some chunks (it is buffering), so always check before writing. The flush() call at the end writes all buffered data.

Stream it. A 64KB chunk limit beats a 2GB memory crash every time.
Stream it. A 64KB chunk limit beats a 2GB memory crash every time.

Decompressing In-Memory Bytes

Sometimes you receive compressed data over a network connection or from a database BLOB field. Use zlib.decompress() for raw zlib data, or gzip.decompress() for gzip-formatted bytes, to restore the original content without touching the filesystem.

# decompress_memory.py
import gzip
import zlib

# Simulate receiving compressed bytes from an API or database
original_text = "User data: name=Alice, email=alice@example.com, plan=pro\n" * 100
compressed_bytes = gzip.compress(original_text.encode("utf-8"))

print(f"Received {len(compressed_bytes)} compressed bytes")

# Decompress in memory -- no temp files needed
restored_bytes = gzip.decompress(compressed_bytes)
restored_text = restored_bytes.decode("utf-8")

print(f"Decompressed to {len(restored_text)} characters")
print(f"First line: {restored_text.splitlines()[0]}")

# For raw zlib format (no gzip header):
raw_compressed = zlib.compress(original_text.encode("utf-8"))
raw_restored = zlib.decompress(raw_compressed)
print(f"Raw zlib round-trip OK: {raw_restored == original_text.encode()}")

Output:

Received 163 compressed bytes
Decompressed to 5,700 characters
First line: User data: name=Alice, email=alice@example.com, plan=pro
Raw zlib round-trip OK: True

Match the decompression function to the compression format: use gzip.decompress() for gzip bytes and zlib.decompress() for raw zlib bytes. Mixing them raises a BadGzipFile or zlib.error exception.

Real-Life Example: Log Archiver

A common DevOps task is rotating log files: compress yesterday’s logs and move them to an archive directory. Here is a complete log archiver that handles this pattern.

# log_archiver.py
import gzip
import os
import shutil
from pathlib import Path
from datetime import date

def archive_log(log_path: str, archive_dir: str) -> dict:
    """
    Compress a log file into the archive directory with a date suffix.
    Removes the original log file after successful compression.
    """
    log_path = Path(log_path)
    archive_dir = Path(archive_dir)
    archive_dir.mkdir(parents=True, exist_ok=True)

    today = date.today().isoformat()
    archive_name = f"{log_path.stem}_{today}.log.gz"
    archive_path = archive_dir / archive_name

    original_size = log_path.stat().st_size

    # Stream-compress the log file
    with open(log_path, "rb") as src, gzip.open(archive_path, "wb") as dst:
        shutil.copyfileobj(src, dst)

    compressed_size = archive_path.stat().st_size
    savings = 1 - (compressed_size / original_size)

    # Remove original after successful compression
    log_path.unlink()

    return {
        "archived_to": str(archive_path),
        "original_size_kb": original_size // 1024,
        "compressed_size_kb": compressed_size // 1024,
        "space_saved": f"{savings:.1%}",
    }

# --- Demo ---
# Create a fake log file
demo_log = Path("/tmp/app.log")
with open(demo_log, "w") as f:
    for i in range(5000):
        f.write(f"2026-05-15 {i//3600:02d}:{(i//60)%60:02d}:{i%60:02d} INFO endpoint=/api/v1/users latency={i%200}ms\n")

result = archive_log("/tmp/app.log", "/tmp/log_archive")
print(f"Archived to: {result['archived_to']}")
print(f"Original:    {result['original_size_kb']} KB")
print(f"Compressed:  {result['compressed_size_kb']} KB")
print(f"Space saved: {result['space_saved']}")

Output:

Archived to: /tmp/log_archive/app_2026-05-15.log.gz
Original:    292 KB
Compressed:  3 KB
Space saved: 99.0%

The shutil.copyfileobj(src, dst) call handles the streaming copy efficiently — it reads and writes in chunks automatically. The original log file is deleted only after the compressed archive is successfully written, preventing data loss if compression fails partway through. Extend this with a try/except block that restores the original if shutil.copyfileobj raises an exception.

292 KB in. 3 KB out. shutil.copyfileobj did the work.
292 KB in. 3 KB out. shutil.copyfileobj did the work.

Frequently Asked Questions

When should I use zlib vs gzip?

Use zlib when you are working with bytes in memory and do not need a file-compatible format — for example, compressing data before storing it in a database BLOB or sending it over a custom TCP protocol. Use gzip when you need a standard .gz file that other tools (gunzip, tar, Python’s gzip.open()) can read. The underlying compression algorithm is the same; the difference is the file format wrapper.

What about zip, bz2, and lzma?

Python includes modules for all common compression formats. zipfile handles .zip archives (multiple files). bz2 offers better compression ratios than gzip but is slower. lzma provides the best compression of all standard formats (.xz files) but is the slowest. For most log archival and data transfer use cases, gzip strikes the best balance of speed, compatibility, and compression ratio.

Do I always need to stream large files?

For files under ~100MB on a machine with adequate RAM, loading the whole file is fine. For files over 500MB, streaming is strongly recommended. The gzip.open() + shutil.copyfileobj() pattern handles streaming automatically without extra complexity. When in doubt, stream — the code is the same length and you avoid memory errors.

How do I compress an entire directory?

Use shutil.make_archive() to create a tar.gz of a directory: shutil.make_archive('/tmp/mybackup', 'gztar', '/path/to/dir'). This creates /tmp/mybackup.tar.gz. Alternatively, use the tarfile module for more control — open with tarfile.open('output.tar.gz', 'w:gz') and call add() for each file.

How do I verify a .gz file is not corrupted?

Use gzip.open(path, 'rb') and read the entire file in a try/except block. If the file is corrupted, gzip will raise a BadGzipFile or EOFError. For programmatic integrity checks, the gzip format includes a CRC-32 checksum that Python validates automatically during decompression — you do not need to compute it yourself.

Conclusion

Python’s zlib and gzip modules give you DEFLATE compression without any third-party dependencies. We covered compressing bytes in memory with zlib, reading and writing .gz files with gzip.open(), streaming large files using zlib.compressobj and shutil.copyfileobj, and choosing the right compression level for your use case. The log archiver example shows a complete, production-ready pattern for compressing log files on rotation.

Try extending the archiver to support a retention policy that deletes archives older than 30 days, or add multi-file support using the tarfile module to bundle multiple logs into a single .tar.gz archive.

For the full API reference, see the Python gzip documentation and Python zlib documentation.

How To Use Python graphlib for Dependency Resolution

How To Use Python graphlib for Dependency Resolution

Intermediate

You have a list of tasks where some must run before others — database migrations before app startup, CSS before JavaScript, package A before package B. Keeping track of these dependencies manually is a recipe for mysterious bugs and circular import nightmares. What you really need is a topological sort: an ordering of nodes in a directed graph such that every dependency comes before the thing that depends on it.

Python 3.9 added the graphlib module to the standard library, giving you a clean, built-in way to solve exactly this problem. No third-party libraries, no reimplementing Kahn’s algorithm from scratch — just import TopologicalSorter and describe your dependency graph.

In this article, we will cover how graphlib.TopologicalSorter works, how to build and traverse dependency graphs, how to detect circular dependencies, and how to parallelize independent tasks. By the end you will have a solid understanding of topological sorting and a reusable pattern for dependency-driven task execution.

Python graphlib: Quick Example

Here is the fastest path from zero to a sorted dependency order using graphlib:

# quick_graphlib.py
from graphlib import TopologicalSorter

# Define a dependency graph: key depends on all values
graph = {
    "build": {"test"},
    "test": {"lint", "compile"},
    "lint": set(),
    "compile": {"lint"},
}

sorter = TopologicalSorter(graph)
order = list(sorter.static_order())
print("Build order:", order)

Output:

Build order: ['lint', 'compile', 'test', 'build']

The TopologicalSorter takes a dictionary where each key depends on its value set. static_order() returns a single valid execution order — no parallelism, just a flat sequence. This is the simplest mode and is perfect when you want a deterministic list.

The sections below cover more advanced use: dynamic mode for parallel scheduling, cycle detection, and real-world patterns for build systems and package managers.

What Is graphlib and Why Use It?

Topological sorting solves the question: “given a set of tasks with dependencies, in what order should I run them so every dependency is satisfied before the task that needs it?” It is named after topology because it works on directed acyclic graphs (DAGs) — graphs with no cycles.

Think of it like a university course planner. You cannot take Algorithms without taking Data Structures first, and you cannot take Data Structures without completing Introduction to Programming. A topological sort finds the valid sequence of courses to take.

ApproachProsCons
Manual orderingSimple for small graphsError-prone, doesn’t scale
Roll your own DFS/BFSFull control30+ lines, subtle bugs
graphlib.TopologicalSorterStandard library, clean API, cycle detectionPython 3.9+ only
NetworkXFull graph algorithmsThird-party dependency, heavy

For most dependency resolution use cases, graphlib hits the sweet spot: it is built in, readable, and handles cycle detection out of the box.

Building Dependency Graphs

The TopologicalSorter accepts a dictionary where each key is a node and each value is a set (or any iterable) of nodes that the key depends on. You can also build the graph incrementally using the add() method, which is useful when you discover dependencies at runtime.

# build_graph.py
from graphlib import TopologicalSorter

sorter = TopologicalSorter()

# Add nodes one at a time: add(node, *dependencies)
sorter.add("deploy", "build", "test")
sorter.add("build", "compile", "lint")
sorter.add("compile", "setup")
sorter.add("lint", "setup")
sorter.add("test", "build")
sorter.add("setup")  # no dependencies

order = list(sorter.static_order())
print("Deployment pipeline:", order)

Output:

Deployment pipeline: ['setup', 'compile', 'lint', 'build', 'test', 'deploy']

The add(node, *deps) signature is convenient when you are reading dependency data from a config file or database. You do not need to construct the full dictionary upfront — just call add() as you discover each dependency relationship.

Dependency graphs: when the order matters and your build system needs to know it.
Dependency graphs: when the order matters and your build system needs to know it.

Detecting Circular Dependencies

A topological sort is only possible on a directed acyclic graph — if you have a cycle (A depends on B depends on A), there is no valid ordering. graphlib raises a CycleError when it detects one, giving you the list of nodes involved in the cycle.

# cycle_detection.py
from graphlib import TopologicalSorter, CycleError

graph = {
    "A": {"B"},
    "B": {"C"},
    "C": {"A"},  # creates a cycle: A -> B -> C -> A
}

sorter = TopologicalSorter(graph)

try:
    order = list(sorter.static_order())
    print("Order:", order)
except CycleError as e:
    print(f"Circular dependency detected: {e}")
    print(f"Cycle involves: {e.args[1]}")

Output:

Circular dependency detected: ('nodes are in a cycle', ['A', 'C', 'B', 'A'])
Cycle involves: ['A', 'C', 'B', 'A']

The CycleError exception includes the list of nodes forming the cycle as e.args[1]. This makes it straightforward to surface a useful error message to the user — “Task A and Task C have a circular dependency” is far more actionable than a generic crash.

Dynamic Mode for Parallel Execution

The static_order() method gives you one valid sequence, but it is inherently serial. If some tasks can run in parallel (because they have no dependency between them), you are leaving performance on the table. graphlib provides a dynamic mode for exactly this use case.

In dynamic mode, you call prepare() first, then iterate using get_ready() and done(). The sorter yields batches of tasks that are currently ready to execute (all their dependencies have been marked done), allowing you to dispatch them concurrently.

# parallel_sorter.py
from graphlib import TopologicalSorter
import concurrent.futures
import time

def run_task(task_name):
    """Simulate a task with a short delay."""
    print(f"  Running: {task_name}")
    time.sleep(0.1)
    return task_name

graph = {
    "deploy": {"build", "test"},
    "build": {"lint", "compile"},
    "test": {"compile"},
    "compile": {"setup"},
    "lint": {"setup"},
    "setup": set(),
}

sorter = TopologicalSorter(graph)
sorter.prepare()

with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    while sorter.is_active():
        # Get all tasks whose dependencies are done
        for task in sorter.get_ready():
            future = pool.submit(run_task, task)
            futures[future] = task

        # Wait for any task to finish, then mark it done
        done_futures, _ = concurrent.futures.wait(
            futures, return_when=concurrent.futures.FIRST_COMPLETED
        )
        for future in done_futures:
            task = futures.pop(future)
            sorter.done(task)
            print(f"  Completed: {task}")

print("All tasks finished.")

Output:

  Running: setup
  Completed: setup
  Running: lint
  Running: compile
  Completed: lint
  Completed: compile
  Running: build
  Running: test
  Completed: test
  Completed: build
  Running: deploy
  Completed: deploy
All tasks finished.

The dynamic mode correctly dispatches lint and compile in parallel once setup finishes, then build and test in parallel once compile finishes. You get maximum parallelism automatically based on the graph structure.

get_ready() hands you independent tasks in batches. ThreadPoolExecutor handles the rest.
get_ready() hands you independent tasks in batches. ThreadPoolExecutor handles the rest.

Real-Life Example: Package Dependency Resolver

Let’s build a simplified package dependency resolver — the kind of logic that sits at the heart of pip, npm, and apt. Given a list of packages with their dependencies, it outputs a valid installation order and detects any circular dependencies.

# package_resolver.py
from graphlib import TopologicalSorter, CycleError
from dataclasses import dataclass, field
from typing import Dict, Set, List

@dataclass
class Package:
    name: str
    version: str
    requires: Set[str] = field(default_factory=set)

def resolve_install_order(packages: List[Package]) -> List[str]:
    """
    Given a list of packages, return a valid installation order
    that satisfies all dependencies.

    Raises CycleError if circular dependencies are detected.
    Raises ValueError if an unknown package is referenced.
    """
    known = {pkg.name for pkg in packages}

    # Validate: check all dependencies exist in the package list
    for pkg in packages:
        for dep in pkg.requires:
            if dep not in known:
                raise ValueError(
                    f"Package '{pkg.name}' depends on '{dep}', "
                    f"which is not in the package list"
                )

    # Build graph: each package depends on its requires set
    graph: Dict[str, Set[str]] = {
        pkg.name: pkg.requires for pkg in packages
    }

    try:
        sorter = TopologicalSorter(graph)
        return list(sorter.static_order())
    except CycleError as e:
        cycle_nodes = e.args[1]
        raise CycleError(
            f"Circular dependency detected involving: "
            f"{' -> '.join(cycle_nodes)}"
        )

# --- Example usage ---
packages = [
    Package("myapp",     "1.0.0", requires={"requests", "sqlalchemy"}),
    Package("requests",  "2.31.0", requires={"urllib3", "certifi"}),
    Package("sqlalchemy","2.0.0",  requires={"greenlet"}),
    Package("urllib3",   "2.0.7",  requires=set()),
    Package("certifi",   "2024.2", requires=set()),
    Package("greenlet",  "3.0.0",  requires=set()),
]

order = resolve_install_order(packages)
print("Installation order:")
for i, pkg in enumerate(order, 1):
    print(f"  {i}. {pkg}")

Output:

Installation order:
  1. urllib3
  2. certifi
  3. greenlet
  4. requests
  5. sqlalchemy
  6. myapp

This resolver validates that all referenced packages exist, builds the dependency graph from the package metadata, and uses TopologicalSorter to produce a valid installation order. The same pattern applies to build systems, CI pipeline stages, database migration runners, and microservice startup orchestrators. Extend it by adding version conflict detection or by switching to dynamic mode for parallel installs.

Dependency resolution: urllib3 before requests before myapp. Every time.
Dependency resolution: urllib3 before requests before myapp. Every time.

Frequently Asked Questions

What Python version does graphlib require?

The graphlib module was added in Python 3.9. If you are on Python 3.8 or earlier, you will need to either upgrade or implement topological sorting manually (using DFS or Kahn’s algorithm). On Python 3.9+, it is part of the standard library with no installation needed.

When should I use static_order() vs dynamic mode?

Use static_order() when you need a simple serial execution order and do not care about parallelism — it is one line and very readable. Use dynamic mode (prepare() + get_ready() + done()) when you want to run independent tasks concurrently. Dynamic mode requires more code but gives you maximum throughput on multi-core systems.

Are there multiple valid topological orders?

Yes. For most graphs there are several valid orderings. For example, if A and B both depend only on C, you can run A before B or B before A — both are correct. graphlib does not guarantee which valid order it returns, so do not write code that relies on a specific tie-breaking behavior. If you need a deterministic order among independent nodes, sort the node names before adding them to the graph.

How do I handle CycleError gracefully?

Catch graphlib.CycleError and read e.args[1] to get the list of nodes forming the cycle. Format this into a human-readable error message that tells the user which tasks are in a circular dependency. Never swallow the exception silently — a circular dependency is a configuration bug that must be fixed, not ignored.

Does graphlib scale to large graphs?

Yes. TopologicalSorter uses Kahn’s algorithm internally, which runs in O(V + E) time where V is the number of nodes and E is the number of edges. This scales linearly and handles graphs with thousands of nodes efficiently. For extremely large graphs (millions of nodes), consider a dedicated graph library like NetworkX, but for typical build system and dependency resolution use cases, graphlib is more than adequate.

Conclusion

The graphlib module gives you a clean, standard-library solution to dependency resolution. We covered TopologicalSorter with both static_order() for simple serial ordering and dynamic mode for parallel task scheduling. We used CycleError to catch circular dependencies before they cause subtle runtime bugs, and built a complete package dependency resolver as a real-world example.

Try extending the package resolver to detect version conflicts, or wire the parallel scheduler to asyncio tasks instead of threads. The dynamic mode API maps naturally to async coroutines — just replace the ThreadPoolExecutor with asyncio.gather.

For the full API reference, see the Python graphlib documentation.

How To Build a Dashboard with Python and Dash

How To Build a Dashboard with Python and Dash

Intermediate

You have a pandas DataFrame full of sales data, website metrics, or sensor readings, and you want to share it as an interactive dashboard — not a static chart, but something your team can filter, zoom, and explore. Dash is the Python framework for exactly this: it lets you build fully interactive web dashboards entirely in Python, no JavaScript required, and run them with a single command. The charts come from Plotly, the interactivity from Dash callbacks, and the layout from Dash’s HTML component library.

Install Dash with pip install dash, which also installs Plotly and the required Flask server. That is the only dependency. Every example in this tutorial runs with python app.py and opens in your browser at http://127.0.0.1:8050.

This tutorial covers Dash layouts with HTML and core components, building Plotly figures, connecting controls to charts with callbacks, chained callbacks for dependent dropdowns, and deploying a complete interactive sales dashboard. By the end you will have a working dashboard you can adapt to any dataset.

Your First Dash Dashboard: Quick Example

Here is the minimum Dash app — a bar chart with a dropdown to switch between datasets:

# quick_dash.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd

# Sample data
data = {
    'Month': ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
    'Sales': [120, 145, 132, 178, 156, 194],
    'Expenses': [90, 105, 98, 120, 115, 135],
}
df = pd.DataFrame(data)

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1("Monthly Metrics", style={'textAlign': 'center'}),
    dcc.Dropdown(
        id='metric-dropdown',
        options=[{'label': col, 'value': col} for col in ['Sales', 'Expenses']],
        value='Sales',
        clearable=False,
        style={'width': '300px', 'margin': '0 auto 20px'}
    ),
    dcc.Graph(id='metric-chart'),
])

@app.callback(Output('metric-chart', 'figure'), Input('metric-dropdown', 'value'))
def update_chart(metric):
    fig = px.bar(df, x='Month', y=metric, title=f'Monthly {metric}', color_discrete_sequence=['#636EFA'])
    fig.update_layout(plot_bgcolor='white', paper_bgcolor='white')
    return fig

if __name__ == '__main__':
    app.run(debug=True)

Run with python quick_dash.py then open http://127.0.0.1:8050. Output: A bar chart appears. Selecting “Expenses” in the dropdown immediately updates the chart with no page reload.

Every Dash app has the same three-part structure: app.layout defines the HTML structure using component objects, @app.callback decorators wire inputs to outputs, and app.run() starts the Flask development server. The callback receives the current dropdown value and returns a Plotly figure — Dash handles the JavaScript communication automatically.

Dash Architecture: Layout and Callbacks

Understanding how Dash separates layout from logic makes building complex dashboards straightforward:

LayerWhat It DoesKey Components
LayoutDefines the HTML structurehtml.Div, html.H1, html.P, html.Table
Core ComponentsInteractive input/output widgetsdcc.Graph, dcc.Dropdown, dcc.Slider, dcc.DatePickerRange
CallbacksPython functions that update components@app.callback with Input(), Output(), State()
FiguresPlotly chart objects returned by callbackspx.bar, px.line, px.scatter, go.Figure

The key design rule: every interactive element has an id, and callbacks reference those IDs. When an Input component’s value changes, Dash automatically calls the decorated function with the new value and updates the Output component with the return value. This pattern scales from simple single-input charts to complex dashboards with dozens of linked components.

Cache Katie at Dash dashboard control panel managing live metrics
Cache Katie at the controls — real-time metrics at a glance.

Building Layouts

Dash layouts use Python objects that mirror HTML elements. Wrap multiple components in rows and columns for a grid layout:

# layout_demo.py
import dash
from dash import dcc, html
import plotly.express as px

app = dash.Dash(__name__)

app.layout = html.Div(style={'fontFamily': 'Arial, sans-serif', 'maxWidth': '1200px', 'margin': '0 auto'}, children=[
    # Header row
    html.Div(style={'background': '#1a1a2e', 'color': 'white', 'padding': '20px', 'marginBottom': '20px'}, children=[
        html.H1("Sales Dashboard", style={'margin': 0}),
        html.P("Q2 2026 Performance Overview", style={'margin': 0, 'opacity': 0.7}),
    ]),

    # KPI row -- three summary cards
    html.Div(style={'display': 'flex', 'gap': '20px', 'marginBottom': '20px'}, children=[
        html.Div(style={'flex': 1, 'background': '#f0f8ff', 'padding': '20px', 'borderRadius': '8px', 'border': '1px solid #cce5ff'}, children=[
            html.H2("$284,500", style={'margin': 0, 'color': '#0066cc'}),
            html.P("Total Revenue", style={'margin': 0, 'color': '#666'}),
        ]),
        html.Div(style={'flex': 1, 'background': '#f0fff0', 'padding': '20px', 'borderRadius': '8px', 'border': '1px solid #b3ffb3'}, children=[
            html.H2("1,842", style={'margin': 0, 'color': '#009900'}),
            html.P("Total Orders", style={'margin': 0, 'color': '#666'}),
        ]),
        html.Div(style={'flex': 1, 'background': '#fff8f0', 'padding': '20px', 'borderRadius': '8px', 'border': '1px solid #ffd9b3'}, children=[
            html.H2("$154.45", style={'margin': 0, 'color': '#cc6600'}),
            html.P("Avg Order Value", style={'margin': 0, 'color': '#666'}),
        ]),
    ]),

    # Chart area
    dcc.Graph(id='main-chart', figure=px.bar(
        x=['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
        y=[42000, 48000, 51000, 49000, 46000, 52000],
        labels={'x': 'Month', 'y': 'Revenue ($)'},
        title='Monthly Revenue Q2 2026',
    )),
])

if __name__ == '__main__':
    app.run(debug=True)

Inline styles use Python dicts instead of CSS strings — style={'fontSize': '14px', 'color': '#333'} — because Dash components are Python objects. For production dashboards, move styles to an external CSS file placed in the assets/ folder next to your script; Dash automatically serves files from that folder.

Multi-Input Callbacks and Filtering

Callbacks can take multiple inputs and update multiple outputs simultaneously. Here is a dashboard with a date range filter and category selector that both affect the same chart:

# multi_input_dash.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd
import numpy as np

np.random.seed(42)
dates = pd.date_range('2026-01-01', periods=180, freq='D')
df = pd.DataFrame({
    'date': dates,
    'revenue': np.random.randint(1000, 5000, 180),
    'category': np.random.choice(['Electronics', 'Clothing', 'Books'], 180),
    'region': np.random.choice(['North', 'South', 'East', 'West'], 180),
})

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H2("Sales Explorer"),
    html.Div(style={'display': 'flex', 'gap': '20px', 'marginBottom': '20px'}, children=[
        html.Div([
            html.Label("Category"),
            dcc.Dropdown(
                id='category-filter',
                options=[{'label': 'All', 'value': 'All'}] + [{'label': c, 'value': c} for c in df['category'].unique()],
                value='All', clearable=False,
            )
        ], style={'flex': 1}),
        html.Div([
            html.Label("Region"),
            dcc.Dropdown(
                id='region-filter',
                options=[{'label': 'All', 'value': 'All'}] + [{'label': r, 'value': r} for r in df['region'].unique()],
                value='All', clearable=False,
            )
        ], style={'flex': 1}),
    ]),
    dcc.Graph(id='revenue-chart'),
    html.Div(id='stats-box', style={'padding': '10px', 'background': '#f9f9f9', 'borderRadius': '4px'}),
])

@app.callback(
    Output('revenue-chart', 'figure'),
    Output('stats-box', 'children'),
    Input('category-filter', 'value'),
    Input('region-filter', 'value'),
)
def update_dashboard(category, region):
    filtered = df.copy()
    if category != 'All':
        filtered = filtered[filtered['category'] == category]
    if region != 'All':
        filtered = filtered[filtered['region'] == region]

    daily = filtered.groupby('date')['revenue'].sum().reset_index()
    fig = px.line(daily, x='date', y='revenue', title=f'Daily Revenue -- {category} / {region}')
    fig.update_traces(fill='tozeroy', line_color='#636EFA')

    stats = f"Total: ${filtered['revenue'].sum():,.0f} | Orders: {len(filtered)} | Avg/day: ${daily['revenue'].mean():,.0f}"
    return fig, stats

if __name__ == '__main__':
    app.run(debug=True)

When multiple Input components are listed, the callback receives all their current values as arguments in the same order. The callback fires whenever any input changes. Returning a tuple from the callback populates the multiple Output components in matching order — the figure goes to revenue-chart and the stats string goes to stats-box.

Pyro Pete triggering Dash callbacks by pulling interactive levers
Pete pulls the levers — callbacks fire and charts update live.

Real-Life Example: Interactive Sales Dashboard

Here is a complete sales dashboard with linked charts, KPI cards that update with filters, and a data table — production-ready in under 100 lines:

# sales_dashboard.py
import dash
from dash import dcc, html, dash_table
from dash.dependencies import Input, Output
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np

np.random.seed(0)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
categories = ['Electronics', 'Clothing', 'Books', 'Toys']
rows = []
for m in months:
    for c in categories:
        rows.append({'Month': m, 'Category': c, 'Revenue': np.random.randint(5000, 20000), 'Units': np.random.randint(50, 300)})
df = pd.DataFrame(rows)

app = dash.Dash(__name__)

app.layout = html.Div(style={'fontFamily': 'Arial', 'padding': '20px'}, children=[
    html.H1("Sales Dashboard 2026", style={'textAlign': 'center'}),
    dcc.Dropdown(id='cat-filter', options=[{'label': 'All Categories', 'value': 'All'}] +
        [{'label': c, 'value': c} for c in categories], value='All', clearable=False,
        style={'width': '300px', 'marginBottom': '20px'}),
    html.Div(id='kpi-row', style={'display': 'flex', 'gap': '15px', 'marginBottom': '20px'}),
    html.Div(style={'display': 'flex', 'gap': '20px'}, children=[
        dcc.Graph(id='revenue-bar', style={'flex': 1}),
        dcc.Graph(id='units-pie', style={'flex': 1}),
    ]),
    dash_table.DataTable(id='data-table', page_size=8,
        style_header={'backgroundColor': '#1a1a2e', 'color': 'white'},
        style_data_conditional=[{'if': {'row_index': 'odd'}, 'backgroundColor': '#f9f9f9'}]),
])

@app.callback(
    Output('kpi-row', 'children'), Output('revenue-bar', 'figure'),
    Output('units-pie', 'figure'), Output('data-table', 'data'),
    Output('data-table', 'columns'), Input('cat-filter', 'value')
)
def update_all(category):
    d = df if category == 'All' else df[df['Category'] == category]
    total_rev = d['Revenue'].sum()
    total_units = d['Units'].sum()
    avg_rev = d.groupby('Month')['Revenue'].sum().mean()

    kpi_style = {'flex': 1, 'background': '#eef2ff', 'padding': '15px', 'borderRadius': '8px', 'textAlign': 'center'}
    kpis = [
        html.Div([html.H3(f"${total_rev:,.0f}"), html.P("Total Revenue")], style=kpi_style),
        html.Div([html.H3(f"{total_units:,}"), html.P("Total Units")], style=kpi_style),
        html.Div([html.H3(f"${avg_rev:,.0f}"), html.P("Avg Monthly Revenue")], style=kpi_style),
    ]

    monthly = d.groupby('Month')['Revenue'].sum().reindex(months).reset_index()
    bar_fig = px.bar(monthly, x='Month', y='Revenue', title='Revenue by Month', color_discrete_sequence=['#636EFA'])

    cat_totals = d.groupby('Category')['Units'].sum().reset_index()
    pie_fig = px.pie(cat_totals, names='Category', values='Units', title='Units by Category')

    cols = [{'name': c, 'id': c} for c in d.columns]
    return kpis, bar_fig, pie_fig, d.to_dict('records'), cols

if __name__ == '__main__':
    app.run(debug=True)

This dashboard updates all five outputs (KPIs, bar chart, pie chart, table data, table columns) from a single callback triggered by the category dropdown. The dash_table.DataTable component provides sortable, paginated tabular data with no extra code. Run it, select “Electronics”, and every component immediately filters to show only electronics data.

Frequently Asked Questions

How do I deploy a Dash app to production?

Dash runs on Flask, so any Flask deployment method works: Gunicorn (gunicorn app:server), Docker, or platform-as-a-service hosts like Heroku, Render, or Railway. Export the Flask server with server = app.server and point Gunicorn at it. For production, set debug=False and use environment variables for any credentials. Dash Enterprise (commercial) provides managed hosting with authentication built in.

How do I read an input value without triggering a callback?

Use State instead of Input for components you want to read without triggering updates: from dash.dependencies import State. A State value is passed to the callback only when an Input changes, not when the State component itself changes. This is useful for “submit” button patterns where you want to read several fields only when the button is clicked.

How do I update a chart with live data?

Use dcc.Interval as a callback input: dcc.Interval(id='timer', interval=5000) fires an event every 5 seconds. Add it to your layout, then use Input('timer', 'n_intervals') in your callback. The n_intervals value increments each time the interval fires, triggering your data refresh callback. This pattern works for polling a database, an API endpoint, or reading a log file.

How do I build a multi-page Dash app?

Use Dash Pages (built into Dash 2.5+): create a pages/ folder and add one Python file per page, each calling dash.register_page(__name__, path='/my-page'). Set app = dash.Dash(__name__, use_pages=True) in your main file and include dash.page_container in the layout. Dash automatically generates routing for all registered pages without any URL configuration.

How do I apply a consistent theme across my dashboard?

Use the dash-bootstrap-components library (pip install dash-bootstrap-components) for Bootstrap-based grid layouts and pre-styled components: app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]). Alternatively, put a CSS file in the assets/ folder — Dash serves everything in that folder automatically. For consistent Plotly chart theming, set plotly.io.templates.default = 'plotly_white' once at the top of your script.

Conclusion

Dash lets you build fully interactive data dashboards in pure Python. In this tutorial you built a dropdown-driven bar chart, applied multi-input filtering to a line chart with stats, and assembled a complete sales dashboard with KPI cards, linked charts, and a data table — all updated by a single callback. The entire dashboard works without writing a single line of JavaScript.

The sales dashboard is ready to extend — add a date range picker with dcc.DatePickerRange, a live refresh interval, an export-to-CSV button, or user authentication with Dash’s built-in dash-auth library. Each extension adds one new component and one new callback, following the same Input/Output pattern you learned here.

Official documentation: Dash — Python framework for building analytical web apps.

How To Use Python aiosqlite for Async SQLite

How To Use Python aiosqlite for Async SQLite

Intermediate

SQLite is the perfect database for small to medium applications — no server to run, no configuration, just a file. But Python’s built-in sqlite3 module is synchronous: every query blocks until the database responds. In an async application built with asyncio, aiohttp, or FastAPI, that blocking call halts the entire event loop for every database operation. aiosqlite wraps Python’s sqlite3 module in an async interface, letting your database queries run without freezing everything else.

Install aiosqlite with pip install aiosqlite. There is nothing else to configure — it uses the same SQLite file your synchronous code already uses. The API closely mirrors sqlite3, so if you have used that module before, the transition is mostly adding async def, await, and async with in the right places.

This tutorial covers opening async connections, executing queries, fetching results, using transactions, handling row factories for dict-like access, and building a complete async task tracker backed by SQLite. By the end, you will know how to integrate aiosqlite into any asyncio-based project without blocking the event loop.

Async SQLite in 15 Lines: Quick Example

Here is the minimum aiosqlite script — create a table, insert a row, and read it back, all asynchronously:

# quick_aiosqlite.py
import asyncio
import aiosqlite

async def main():
    async with aiosqlite.connect('quick_demo.db') as db:
        await db.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )
        ''')
        await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ('Alice', 'alice@example.com'))
        await db.commit()

        async with db.execute("SELECT id, name, email FROM users") as cursor:
            async for row in cursor:
                print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")

asyncio.run(main())

Output:

ID: 1, Name: Alice, Email: alice@example.com

Three patterns appear in every aiosqlite script: async with aiosqlite.connect() opens and auto-closes the connection, await db.execute() runs SQL without blocking, and await db.commit() persists changes. Queries that return rows use async with db.execute() to get a cursor you iterate with async for.

aiosqlite vs sqlite3 vs Other Async Databases

LibraryDatabaseI/O ModelBest For
aiosqliteSQLite (file)asyncioAsync apps needing local/embedded DB
sqlite3SQLite (file)SynchronousScripts, Django, Flask
databasesSQLite/Postgres/MySQLasyncioSQLAlchemy Core queries, async
asyncpgPostgreSQL onlyasyncioHigh-performance async Postgres

aiosqlite is the right choice when you want SQLite’s zero-configuration simplicity in an async application. For production systems that need concurrent writes, PostgreSQL with asyncpg or motor will outperform SQLite’s single-writer model. But for bots, CLI tools, personal apps, and microservices with low write concurrency, aiosqlite gives you async SQLite without any infrastructure setup.

Async CRUD Operations

Here is a complete CRUD implementation with a row factory for dict-style access and proper error handling:

# aiosqlite_crud.py
import asyncio
import aiosqlite

DB_PATH = 'products.db'

async def init_db(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            stock INTEGER DEFAULT 0
        )
    ''')
    await db.commit()

async def create_product(db, name, price, stock=0):
    cursor = await db.execute(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        (name, price, stock)
    )
    await db.commit()
    return cursor.lastrowid

async def get_product(db, product_id):
    async with db.execute("SELECT * FROM products WHERE id = ?", (product_id,)) as cur:
        row = await cur.fetchone()
        if row is None:
            return None
        return dict(zip([col[0] for col in cur.description], row))

async def list_products(db, max_price=None):
    sql = "SELECT * FROM products"
    params = ()
    if max_price is not None:
        sql += " WHERE price <= ?"
        params = (max_price,)
    sql += " ORDER BY name"
    async with db.execute(sql, params) as cur:
        cols = [c[0] for c in cur.description]
        return [dict(zip(cols, row)) async for row in cur]

async def update_price(db, product_id, new_price):
    await db.execute("UPDATE products SET price = ? WHERE id = ?", (new_price, product_id))
    await db.commit()

async def delete_product(db, product_id):
    await db.execute("DELETE FROM products WHERE id = ?", (product_id,))
    await db.commit()

async def main():
    async with aiosqlite.connect(DB_PATH) as db:
        await init_db(db)

        id1 = await create_product(db, 'Widget', 9.99, 100)
        id2 = await create_product(db, 'Gadget', 24.99, 50)
        id3 = await create_product(db, 'Doohickey', 4.49, 200)
        print(f"Created products: {id1}, {id2}, {id3}")

        product = await get_product(db, id1)
        print(f"Fetched: {product}")

        cheap = await list_products(db, max_price=10.0)
        print(f"Products under $10: {[p['name'] for p in cheap]}")

        await update_price(db, id1, 7.99)
        updated = await get_product(db, id1)
        print(f"Updated Widget price: ${updated['price']}")

        await delete_product(db, id3)
        all_products = await list_products(db)
        print(f"Remaining products: {[p['name'] for p in all_products]}")

asyncio.run(main())

Output:

Created products: 1, 2, 3
Fetched: {'id': 1, 'name': 'Widget', 'price': 9.99, 'stock': 100}
Products under $10: ['Doohickey', 'Widget']
Updated Widget price: $7.99
Remaining products: ['Gadget', 'Widget']
Debug Dee examining aiosqlite async database results methodically
Debug Dee inspects each row — aiosqlite keeps async queries clean.

Transactions and Batch Inserts

aiosqlite handles transactions explicitly. For batch inserts, wrapping everything in a single transaction is dramatically faster than committing after each row:

# aiosqlite_transactions.py
import asyncio
import aiosqlite
import time

async def insert_batch(db, records):
    """Insert a list of (name, price, stock) tuples in one transaction."""
    await db.executemany(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        records
    )
    await db.commit()

async def main():
    async with aiosqlite.connect(':memory:') as db:  # in-memory for demo
        await db.execute('''
            CREATE TABLE products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT, price REAL, stock INTEGER
            )
        ''')

        # Generate 10,000 sample products
        records = [(f'Product_{i}', round(i * 0.99, 2), i % 100) for i in range(1, 10001)]

        start = time.perf_counter()
        await insert_batch(db, records)
        elapsed = time.perf_counter() - start

        async with db.execute("SELECT COUNT(*) FROM products") as cur:
            count = (await cur.fetchone())[0]
        print(f"Inserted {count:,} rows in {elapsed:.3f}s")

        # Transaction with rollback on error
        try:
            async with db.execute("BEGIN"):
                await db.execute("UPDATE products SET price = price * 0.9 WHERE stock > 50")
                # Simulate an error condition
                raise ValueError("Simulated error -- rolling back")
                await db.commit()
        except ValueError as e:
            await db.rollback()
            print(f"Transaction rolled back: {e}")

asyncio.run(main())

Output:

Inserted 10,000 rows in 0.041s
Transaction rolled back: Simulated error -- rolling back

executemany() with a single commit is the standard pattern for bulk inserts -- it sends all rows in one database round-trip instead of thousands of individual commits. The async with db.execute("BEGIN") pattern starts an explicit transaction block; call await db.rollback() in the except clause to undo all changes if anything fails mid-transaction.

Real-Life Example: Async Task Tracker

Here is a practical async task tracker built on aiosqlite, showing concurrent queries and the row factory pattern in a complete working application:

# async_task_tracker.py
import asyncio
import aiosqlite
from datetime import datetime

DB = 'tasks.db'

async def setup(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            status TEXT DEFAULT 'pending',
            created_at TEXT NOT NULL,
            completed_at TEXT
        )
    ''')
    db.row_factory = aiosqlite.Row  # dict-like row access
    await db.commit()

async def add_task(db, title):
    cur = await db.execute(
        "INSERT INTO tasks (title, created_at) VALUES (?, ?)",
        (title, datetime.now().isoformat())
    )
    await db.commit()
    return cur.lastrowid

async def complete_task(db, task_id):
    await db.execute(
        "UPDATE tasks SET status='done', completed_at=? WHERE id=?",
        (datetime.now().isoformat(), task_id)
    )
    await db.commit()

async def get_summary(db):
    async with db.execute('''
        SELECT status, COUNT(*) as count FROM tasks GROUP BY status
    ''') as cur:
        return {row['status']: row['count'] async for row in cur}

async def list_pending(db):
    async with db.execute("SELECT id, title, created_at FROM tasks WHERE status='pending'") as cur:
        return [dict(row) async for row in cur]

async def main():
    async with aiosqlite.connect(DB) as db:
        await setup(db)

        # Add tasks concurrently
        ids = await asyncio.gather(
            add_task(db, 'Write unit tests'),
            add_task(db, 'Fix login bug'),
            add_task(db, 'Update README'),
            add_task(db, 'Deploy to staging'),
        )
        print(f"Added tasks: {ids}")

        # Complete two tasks
        await asyncio.gather(complete_task(db, ids[0]), complete_task(db, ids[2]))

        pending = await list_pending(db)
        summary = await get_summary(db)

        print(f"\nSummary: {summary}")
        print("Pending tasks:")
        for t in pending:
            print(f"  [{t['id']}] {t['title']}")

asyncio.run(main())

Output:

Added tasks: [1, 2, 3, 4]

Summary: {'pending': 2, 'done': 2}
Pending tasks:
  [2] Fix login bug
  [4] Deploy to staging
Stack Trace Steve checking off aiosqlite transaction commits
Steve ticks each commit — async transactions done right.

Frequently Asked Questions

How does aiosqlite avoid blocking the event loop?

aiosqlite runs all SQLite operations in a separate thread using Python's concurrent.futures.ThreadPoolExecutor. The await keyword yields control back to the event loop while the thread executes the blocking sqlite3 call. This means aiosqlite does not use true async I/O at the OS level -- it uses thread offloading. For most SQLite use cases this is perfectly adequate, but it does mean you are limited by the thread pool size for very high concurrency workloads.

How do I get dict-like row access instead of tuples?

Set db.row_factory = aiosqlite.Row after opening the connection. Then rows support both index access (row[0]) and key access (row['name']). Convert to a plain dict with dict(row) when you need JSON serialization. Alternatively, build the dict manually from cursor.description: dict(zip([c[0] for c in cursor.description], row)).

Can multiple coroutines share one aiosqlite connection?

Yes, but with caution. aiosqlite serializes all operations through its internal thread, so concurrent writes are safe from a thread-safety perspective. However, SQLite itself only allows one writer at a time -- concurrent write operations will serialize at the database level. For read-heavy workloads this is fine; for write-heavy concurrent workloads, consider PostgreSQL instead. Keep one connection per event loop rather than opening connections per coroutine.

Can I use an in-memory database with aiosqlite?

Yes: aiosqlite.connect(':memory:') creates an in-memory database that exists only for the lifetime of the connection. This is useful for tests -- each test function opens its own ':memory:' connection for a clean slate with no file cleanup needed. Note that an in-memory database cannot be shared between connections; each aiosqlite.connect(':memory:') call creates a completely separate database.

How do I handle schema migrations with aiosqlite?

For simple projects, use CREATE TABLE IF NOT EXISTS and ALTER TABLE ADD COLUMN IF NOT EXISTS in your startup script. For production apps with complex schemas, use the alembic migration tool with the synchronous sqlite3 driver (alembic does not need to be async -- run migrations at startup before launching the async event loop). Store the current schema version in a schema_version table and apply pending migrations sequentially.

Conclusion

aiosqlite brings SQLite into the async Python world with minimal friction. In this tutorial you used aiosqlite.connect() as an async context manager, performed CRUD with parameterized queries, ran batch inserts with executemany(), managed transactions with commit and rollback, and built a concurrent async task tracker with dict-like row access via aiosqlite.Row.

The task tracker is a solid base to extend -- add priority columns, due dates, a web interface via FastAPI, or push notifications when tasks are completed. All of these fit naturally into the async CRUD patterns you learned here without any synchronous blocking calls.

Official documentation: aiosqlite -- async bridge to sqlite3.

How To Use Python motor for Async MongoDB Operations

How To Use Python motor for Async MongoDB Operations

Intermediate

You are building an async Python service with FastAPI or aiohttp and you need MongoDB as your database. The standard PyMongo driver is synchronous — every database call blocks the event loop, turning your async application into an accidentally single-threaded one. Motor is the official async MongoDB driver for Python: it wraps PyMongo with a non-blocking interface designed for asyncio, so your database queries and your API handlers can run concurrently without blocking each other.

Install motor with pip install motor. You will also need a running MongoDB instance — docker run -d -p 27017:27017 mongo:7 starts one locally with no authentication. For cloud MongoDB, use a free Atlas cluster at mongodb.net; the connection string works identically in motor. PyMongo is installed automatically as motor’s dependency.

This tutorial covers motor’s async CRUD operations, querying with filters and projections, aggregation pipelines, indexes, bulk writes, and change streams. You will finish by building an async product catalogue API with FastAPI and motor that handles concurrent reads without blocking.

Async Insert and Query: Quick Example

Here is the minimal motor workflow — connect, insert a document, and query it back, all with async/await:

# quick_motor.py
import asyncio
import motor.motor_asyncio

async def main():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    db = client['mydb']
    collection = db['users']

    # Insert one document
    result = await collection.insert_one({'name': 'Alice', 'age': 30, 'role': 'admin'})
    print(f"Inserted ID: {result.inserted_id}")

    # Find it back
    user = await collection.find_one({'name': 'Alice'})
    print(f"Found: {user['name']}, age {user['age']}, role {user['role']}")

    client.close()

asyncio.run(main())

Output:

Inserted ID: 664f3a1b2c8d4e5f6a7b8c9d
Found: Alice, age 30, role admin

Motor’s API mirrors PyMongo almost exactly — the key difference is that every I/O operation returns a coroutine you must await. The client and collection objects are synchronous Python objects; only the database operations require await. This design means migrating from PyMongo to motor is usually straightforward: add async def and await to the functions that call the database.

Motor vs PyMongo vs Other Drivers

Choosing the right MongoDB driver depends on whether your application is async or sync:

DriverI/O ModelBest ForInstall
motorasyncio (non-blocking)FastAPI, aiohttp, async servicespip install motor
pymongoSynchronous (blocking)Django, Flask, scripts, CLIspip install pymongo
beanieasyncio + ODM (models)When you want Pydantic-based modelspip install beanie
mongoengineSynchronous + ODMDjango-like ORM on MongoDBpip install mongoengine

Use motor whenever your web framework is async. Using PyMongo in a FastAPI handler does not raise an error immediately, but it blocks the event loop during every database call, effectively serializing all requests through a single-threaded bottleneck. Motor keeps each query non-blocking, letting your async framework handle hundreds of concurrent connections efficiently.

CRUD Operations

Motor’s CRUD methods are direct async equivalents of PyMongo’s methods. Here is a complete reference implementation covering insert, find, update, and delete:

# motor_crud.py
import asyncio
import motor.motor_asyncio

async def crud_demo():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    col = client['demo']['products']

    # --- CREATE ---
    # insert_one: single document
    r = await col.insert_one({'name': 'Widget', 'price': 9.99, 'stock': 100})
    widget_id = r.inserted_id

    # insert_many: multiple documents at once
    await col.insert_many([
        {'name': 'Gadget', 'price': 24.99, 'stock': 50},
        {'name': 'Doohickey', 'price': 4.49, 'stock': 200},
    ])
    print("Inserted 3 products")

    # --- READ ---
    # find_one: returns first match or None
    product = await col.find_one({'name': 'Widget'})
    print(f"Widget price: ${product['price']}")

    # find: returns an async cursor
    async for p in col.find({'price': {'$lt': 10}}):
        print(f"  Under $10: {p['name']} @ ${p['price']}")

    # find with projection (only return name and price)
    async for p in col.find({}, {'_id': 0, 'name': 1, 'price': 1}):
        print(f"  {p['name']}: ${p['price']}")

    # --- UPDATE ---
    # update_one: modify first matching document
    result = await col.update_one({'name': 'Widget'}, {'$set': {'price': 7.99}, '$inc': {'stock': -1}})
    print(f"Modified: {result.modified_count} document(s)")

    # update_many: modify all matching documents
    result = await col.update_many({'stock': {'$gt': 100}}, {'$set': {'on_sale': True}})
    print(f"Sale items updated: {result.modified_count}")

    # --- DELETE ---
    result = await col.delete_one({'name': 'Doohickey'})
    print(f"Deleted: {result.deleted_count} document(s)")

    # cleanup
    await col.drop()
    client.close()

asyncio.run(crud_demo())

Output:

Inserted 3 products
Widget price: $9.99
  Under $10: Widget @ $9.99
  Widget: $9.99
  Gadget: $24.99
  Doohickey: $4.49
Modified: 1 document(s)
Sale items updated: 1
Deleted: 1 document(s)
Sudo Sam at MongoDB async filing cabinet using motor
Sudo Sam opens the async MongoDB cabinet — motor does the rest.

Aggregation Pipelines

MongoDB’s aggregation pipeline is motor’s killer feature for analytics — you can group, filter, sort, and reshape data entirely in the database, returning only the results your application needs:

# motor_aggregation.py
import asyncio
import motor.motor_asyncio

async def aggregation_demo():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    col = client['store']['orders']

    # Seed data
    await col.drop()
    await col.insert_many([
        {'product': 'Widget', 'qty': 3, 'price': 9.99, 'region': 'North'},
        {'product': 'Gadget', 'qty': 1, 'price': 24.99, 'region': 'South'},
        {'product': 'Widget', 'qty': 2, 'price': 9.99, 'region': 'South'},
        {'product': 'Gadget', 'qty': 4, 'price': 24.99, 'region': 'North'},
        {'product': 'Widget', 'qty': 5, 'price': 9.99, 'region': 'North'},
    ])

    # Pipeline: total revenue and units sold per product
    pipeline = [
        {'$group': {
            '_id': '$product',
            'total_revenue': {'$sum': {'$multiply': ['$qty', '$price']}},
            'units_sold': {'$sum': '$qty'},
            'order_count': {'$sum': 1},
        }},
        {'$sort': {'total_revenue': -1}},
        {'$project': {
            'product': '$_id',
            'total_revenue': {'$round': ['$total_revenue', 2]},
            'units_sold': 1,
            'order_count': 1,
            '_id': 0,
        }},
    ]

    print("Sales summary:")
    async for doc in col.aggregate(pipeline):
        print(f"  {doc['product']}: ${doc['total_revenue']} ({doc['units_sold']} units, {doc['order_count']} orders)")

    await col.drop()
    client.close()

asyncio.run(aggregation_demo())

Output:

Sales summary:
  Gadget: $124.95 (5 units, 2 orders)
  Widget: $99.90 (10 units, 3 orders)

The aggregation cursor is async, so you iterate it with async for exactly like the regular find cursor. Pipelines run entirely inside MongoDB — no Python-side grouping or sorting needed. For large datasets this is dramatically faster than fetching all documents and processing them in Python.

Real-Life Example: Async Product Catalogue API

Let us build a FastAPI product catalogue with motor as the database backend. This shows how motor integrates with a real async web framework, including startup/shutdown lifecycle and concurrent query handling:

# catalogue_api.py
# Install: pip install fastapi uvicorn motor
# Run: uvicorn catalogue_api:app --reload
# Test: http://localhost:8000/docs

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import motor.motor_asyncio
from bson import ObjectId

MONGO_URL = "mongodb://localhost:27017"

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create client and ensure index
    app.state.client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URL)
    app.state.col = app.state.client["catalogue"]["products"]
    await app.state.col.create_index("sku", unique=True)
    yield
    # Shutdown: close connection
    app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.get("/products")
async def list_products(category: str = None, max_price: float = None):
    query = {}
    if category:
        query["category"] = category
    if max_price is not None:
        query["price"] = {"$lte": max_price}
    products = []
    async for p in app.state.col.find(query, {"_id": 0}):
        products.append(p)
    return {"count": len(products), "products": products}

@app.post("/products", status_code=201)
async def create_product(product: dict):
    if not all(k in product for k in ["sku", "name", "price"]):
        raise HTTPException(400, "sku, name, and price are required")
    result = await app.state.col.insert_one(product)
    return {"id": str(result.inserted_id), "sku": product["sku"]}

@app.patch("/products/{sku}")
async def update_product(sku: str, updates: dict):
    result = await app.state.col.update_one({"sku": sku}, {"$set": updates})
    if result.matched_count == 0:
        raise HTTPException(404, f"Product '{sku}' not found")
    return {"updated": result.modified_count}

@app.delete("/products/{sku}")
async def delete_product(sku: str):
    result = await app.state.col.delete_one({"sku": sku})
    if result.deleted_count == 0:
        raise HTTPException(404, f"Product '{sku}' not found")
    return {"deleted": True}

The lifespan context manager is the modern FastAPI pattern for managing the motor client — it creates the connection on startup and closes it cleanly on shutdown. The unique index on sku prevents duplicate products at the database level. Because motor is async, all four endpoints can handle concurrent requests without one query blocking another.

API Alice juggling multiple async MongoDB queries simultaneously
Async means handling all queries at once — no waiting.
Async MongoDB: 10x the throughput at the same latency.
Async MongoDB: 10x the throughput at the same latency.

Frequently Asked Questions

How do I handle MongoDB’s ObjectId in API responses?

MongoDB’s _id field is a bson.ObjectId object, not a plain string. JSON serializers do not know how to handle it and will raise a TypeError. The cleanest fix is to either exclude _id from your queries using a projection ({"_id": 0}), or convert it when building your response: doc["id"] = str(doc.pop("_id")). If you use Pydantic models with beanie or a custom model, configure the JSON encoder to handle ObjectId automatically.

When should I create indexes?

Create indexes for any field you filter or sort on frequently. Without an index, MongoDB scans every document in the collection for each query. Use await collection.create_index("field_name") for single-field indexes, or create_index([("a", 1), ("b", -1)]) for compound indexes. Call index creation during application startup (in the lifespan handler), not on every request. Check your index usage with collection.find(query).explain("executionStats").

How does motor handle connection pooling?

Motor uses PyMongo’s connection pool internally. The default pool size is 100 connections. One AsyncIOMotorClient instance is meant to be shared across your entire application — do not create a new client per request. The client is thread-safe and coroutine-safe. Set the pool size with AsyncIOMotorClient(url, maxPoolSize=50) to reduce memory usage on low-traffic services.

Does motor support multi-document transactions?

Yes, starting with MongoDB 4.0 (replica sets) and 4.2 (sharded clusters). Use async with await client.start_session() as session: async with session.start_transaction(): ... to wrap multiple operations in a transaction. All motor CRUD methods accept an optional session parameter. Transactions have a performance cost — only use them when you genuinely need atomicity across multiple collections or documents.

How do I store files in MongoDB with motor?

Use motor.motor_asyncio.AsyncIOMotorGridFSBucket for files larger than 16MB (MongoDB’s document size limit). For smaller files (images, PDFs under a few MB), storing them as base64-encoded strings in a document field is simpler. GridFS splits large files into 255KB chunks and stores them across two collections, making upload and download resumable. In most modern setups, object storage (S3, GCS) is a better fit for files than GridFS.

Conclusion

Motor is the right MongoDB driver whenever your Python application uses asyncio. In this tutorial you performed async CRUD with insert_one, find, update_one, and delete_one, ran aggregation pipelines for analytics, created indexes for performance, and built a FastAPI product catalogue API that handles concurrent requests without blocking.

The catalogue API is ready to extend — add authentication middleware, implement pagination with skip() and limit(), add full-text search with a MongoDB text index, or add a change stream listener that notifies clients over WebSocket when products are updated. All of these patterns use the same async CRUD primitives you learned here.

Official documentation: motor — Async Python driver for MongoDB.