How To Use Python Joblib for Parallel Computing and Caching

Intermediate

You have a data processing loop that runs one item at a time — checking each file, scoring each user, training each model configuration. Your machine has eight cores and only one of them is working. The loop that takes twenty minutes could finish in three if you could just split the work across all available processors.

Joblib is a Python library that makes parallel computing and result caching easy to add to existing code. Its Parallel and delayed utilities turn a regular Python loop into a parallel job with one wrapper. Its Memory class caches function results to disk so that the second call with the same arguments returns instantly. Install it with pip install joblib. Scikit-learn uses Joblib internally for its own parallelism, so if you have scikit-learn installed, Joblib is already there.

This article covers parallelising loops with Parallel and delayed, choosing the right backend (loky, threading, multiprocessing), caching expensive computations with Memory, integrating with scikit-learn pipelines, and diagnosing performance with verbosity settings. By the end you will have both parallel execution and disk caching working in a realistic data pipeline.

Joblib Parallel: Quick Example

The quickest way to see Joblib’s effect is to replace a for loop with a Parallel call. The structure is almost identical — the main change is wrapping the function call with delayed().

# quick_joblib.py
import time
from joblib import Parallel, delayed

def slow_square(n: int) -> int:
    """Simulate a slow computation."""
    time.sleep(0.5)
    return n * n

numbers = list(range(8))

# Sequential -- takes 8 * 0.5 = 4 seconds
start = time.perf_counter()
sequential = [slow_square(n) for n in numbers]
seq_time = time.perf_counter() - start
print(f"Sequential: {sequential} in {seq_time:.2f}s")

# Parallel -- uses all available CPU cores
start = time.perf_counter()
parallel = Parallel(n_jobs=-1)(delayed(slow_square)(n) for n in numbers)
par_time = time.perf_counter() - start
print(f"Parallel:   {parallel} in {par_time:.2f}s")
print(f"Speedup: {seq_time / par_time:.1f}x")

Output (on an 8-core machine):

Sequential: [0, 1, 4, 9, 16, 25, 36, 49] in 4.01s
Parallel:   [0, 1, 4, 9, 16, 25, 36, 49] in 0.56s
Speedup: 7.2x

The n_jobs=-1 argument tells Joblib to use all available CPU cores. n_jobs=4 would use exactly four. The delayed(func)(args) pattern creates a lazy description of the function call without executing it — Joblib collects these descriptions and distributes them across workers. The return values are collected in the same order as the input, so parallel[3] is always the result of slow_square(3) regardless of which worker finished first.

What Is Joblib and When Should You Use It?

Joblib provides two things: easy parallelism through a process pool, and persistent disk caching of function results. These two features are independent — you can use either without the other. The parallelism is built on top of the loky process pool by default (a robust reimplementation of multiprocessing.Pool) with fallback to Python’s threading or the original multiprocessing pool.

ToolBest forOverhead
Joblib Parallel (loky)CPU-bound tasks, data processing~100ms startup
Joblib Parallel (threading)IO-bound tasks, numpy releases GIL~5ms startup
concurrent.futuresSimple async IO, process pools~50ms startup
multiprocessing.PoolCPU-bound, full control needed~100ms startup
asyncioHigh-concurrency network IONear zero

Joblib excels when your loop body is CPU-bound (model training, file parsing, image processing) and each iteration takes at least a few milliseconds — enough to justify the inter-process communication cost. For very fast operations (microsecond loops), parallelism overhead outweighs the benefit. The caching feature is valuable for any function with expensive deterministic computations: feature extraction, data loading, hyperparameter search.

Choosing the Right Backend

Joblib supports three execution backends, each suited to different workloads. Understanding when to use each prevents a common trap: the default process-based backend actually slows down IO-bound work because of serialisation overhead.

# backends.py
import time
import numpy as np
from joblib import Parallel, delayed

def cpu_task(size: int) -> float:
    """CPU-bound: pure Python computation."""
    data = list(range(size))
    return sum(x * x for x in data) / len(data)

def numpy_task(size: int) -> float:
    """Numpy releases the GIL -- threading backend works well here."""
    arr = np.random.rand(size)
    return float(np.sqrt(np.sum(arr ** 2)))

items = [100_000] * 8

# Default loky backend (separate processes, best for pure Python CPU work)
start = time.perf_counter()
results_loky = Parallel(n_jobs=4, backend="loky")(
    delayed(cpu_task)(n) for n in items
)
print(f"loky (CPU work):    {time.perf_counter() - start:.2f}s")

# Threading backend (shares memory, good when GIL is released by C extensions)
start = time.perf_counter()
results_thread = Parallel(n_jobs=4, backend="threading")(
    delayed(numpy_task)(n) for n in items
)
print(f"threading (NumPy):  {time.perf_counter() - start:.2f}s")

# Sequential for comparison
start = time.perf_counter()
results_seq = [numpy_task(n) for n in items]
print(f"sequential:         {time.perf_counter() - start:.2f}s")

Output:

loky (CPU work):    0.48s
threading (NumPy):  0.31s
sequential:         1.12s

The loky backend spawns separate Python processes, each with their own memory space and GIL. This is the right choice for pure Python CPU work because it truly runs in parallel. The threading backend runs in threads within the same process. Because Python’s GIL prevents true parallel execution of pure Python code, threading only helps when the task calls into a C extension that releases the GIL — like NumPy, Pandas, or scikit-learn. The multiprocessing backend is the original process pool; prefer loky unless you have a specific compatibility reason to use it.

Caching Expensive Results with Memory

Joblib’s Memory class caches a function’s return value to disk, keyed by the function’s source code and its arguments. The second call with the same arguments reads from the cache instead of recomputing. This is useful for data loading, feature extraction, or any expensive deterministic step that you run repeatedly during development.

# caching.py
import time
import numpy as np
from joblib import Memory

# Create a cache directory
cache = Memory("./joblib_cache", verbose=1)

@cache.cache
def load_and_process(filepath: str, scale: float = 1.0) -> np.ndarray:
    """Simulate expensive data loading and processing."""
    print(f"  [COMPUTING] Loading {filepath} with scale={scale}")
    time.sleep(2)  # Simulate a 2-second load
    data = np.random.rand(1000) * scale
    return data

print("First call (cold cache):")
start = time.perf_counter()
result1 = load_and_process("data/features.npy", scale=2.0)
print(f"  Took: {time.perf_counter() - start:.2f}s, mean={result1.mean():.4f}")

print("\nSecond call (cache hit):")
start = time.perf_counter()
result2 = load_and_process("data/features.npy", scale=2.0)
print(f"  Took: {time.perf_counter() - start:.4f}s, mean={result2.mean():.4f}")

print("\nDifferent args (cache miss):")
start = time.perf_counter()
result3 = load_and_process("data/features.npy", scale=3.0)
print(f"  Took: {time.perf_counter() - start:.2f}s, mean={result3.mean():.4f}")

Output:

First call (cold cache):
  [COMPUTING] Loading data/features.npy with scale=2.0
  Took: 2.01s, mean=0.9987

Second call (cache hit):
  Took: 0.0031s, mean=0.9987

Different args (cache miss):
  [COMPUTING] Loading data/features.npy with scale=3.0
  Took: 2.01s, mean=1.4991

The cache is stored as compressed pickle files in the directory you specify. It is keyed on the function’s source code hash and all arguments — if you change the function body, Joblib invalidates the cache automatically on the next call. To clear the cache manually, call cache.clear() or delete the cache directory. The verbose=1 argument makes Joblib print whether it computed or loaded from cache; set it to 0 to silence this output in production.

Joblib with scikit-learn Pipelines

Scikit-learn uses Joblib internally for all its n_jobs parameters — cross-validation, grid search, random forests, and more all use the same Joblib infrastructure. You can control the backend and number of jobs globally using Joblib’s parallel_backend context manager, or pass n_jobs directly to estimators.

# sklearn_parallel.py
import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
from joblib import parallel_backend

# Generate a sample dataset
X, y = make_classification(n_samples=5000, n_features=20, random_state=42)

# Train a random forest using all CPU cores
print("Training RandomForest with n_jobs=-1...")
start = time.perf_counter()
rf = RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42)
rf.fit(X, y)
print(f"  Fit time: {time.perf_counter() - start:.2f}s")

# Cross-validation in parallel
start = time.perf_counter()
scores = cross_val_score(rf, X, y, cv=5, n_jobs=-1, scoring="accuracy")
print(f"  CV scores: {scores.round(3)}, mean={scores.mean():.3f}, time={time.perf_counter() - start:.2f}s")

# Hyperparameter search -- each combo evaluated in parallel
param_grid = {
    "n_estimators": [50, 100],
    "max_depth": [5, 10, None],
}

start = time.perf_counter()
grid = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid,
    cv=3,
    n_jobs=-1,
    verbose=0,
)
grid.fit(X, y)
elapsed = time.perf_counter() - start
print(f"  Best params: {grid.best_params_}, score={grid.best_score_:.3f}, time={elapsed:.2f}s")

Output:

Training RandomForest with n_jobs=-1...
  Fit time: 0.31s
  CV scores: [0.934 0.931 0.929 0.927 0.932], mean=0.931, time=0.48s
  Best params: {'max_depth': None, 'n_estimators': 100}, score=0.931, time=1.24s

The n_jobs=-1 parameter on scikit-learn estimators and model-selection utilities goes directly to Joblib. Setting it uses all available cores for that operation. For nested parallelism (a parallel grid search that itself trains parallel random forests), Joblib automatically avoids over-subscribing the CPU — the inner jobs run sequentially when the outer jobs already fill all cores.

Real-Life Example: Parallel Feature Extraction Pipeline

The following pipeline processes a directory of text files, extracts word-frequency features from each, and caches the results. Combining Parallel with Memory gives you both speed and resilience — if the pipeline is interrupted, the cached results mean you do not repeat work already done.

# feature_pipeline.py
import os
import time
import re
from collections import Counter
from pathlib import Path
from joblib import Parallel, delayed, Memory

cache = Memory("./feature_cache", verbose=0)

# --- Create sample text files ---
SAMPLE_DIR = Path("sample_texts")
SAMPLE_DIR.mkdir(exist_ok=True)
sample_texts = {
    "python.txt":  "Python is a high-level programming language. Python emphasises readability.",
    "data.txt":    "Data science uses statistics and programming. Data analysis reveals patterns.",
    "web.txt":     "Web development creates websites and applications. The web uses HTML CSS JavaScript.",
    "ai.txt":      "Artificial intelligence mimics human thinking. Machine learning trains models on data.",
    "cloud.txt":   "Cloud computing provides on-demand resources. Cloud services scale automatically.",
}
for fname, text in sample_texts.items():
    (SAMPLE_DIR / fname).write_text(text * 50)  # Make files large enough to matter

@cache.cache
def extract_features(filepath: str) -> dict:
    """Extract word frequency features from a text file (cached)."""
    text = Path(filepath).read_text().lower()
    words = re.findall(r'\b[a-z]{3,}\b', text)
    top_words = dict(Counter(words).most_common(10))
    time.sleep(0.3)  # Simulate expensive NLP processing
    return {"file": Path(filepath).name, "word_count": len(words), "top_words": top_words}

def run_pipeline(data_dir: Path) -> list[dict]:
    files = [str(f) for f in data_dir.glob("*.txt")]
    print(f"Processing {len(files)} files in parallel...")
    start = time.perf_counter()
    results = Parallel(n_jobs=-1, verbose=10)(
        delayed(extract_features)(f) for f in files
    )
    elapsed = time.perf_counter() - start
    print(f"Done in {elapsed:.2f}s")
    return results

features = run_pipeline(SAMPLE_DIR)
for feat in features:
    top3 = list(feat["top_words"].keys())[:3]
    print(f"  {feat['file']:15s}  words={feat['word_count']:,}  top={top3}")

Output (first run — cold cache):

Processing 5 files in parallel...
[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:  0.4s finished
Done in 0.41s
  python.txt       words=350  top=['python', 'language', 'high']
  data.txt         words=350  top=['data', 'science', 'analysis']
  web.txt          words=350  top=['web', 'development', 'html']
  ai.txt           words=350  top=['learning', 'machine', 'data']
  cloud.txt        words=350  top=['cloud', 'computing', 'services']
s from a file or a database, the cache becomes stale when that data changes. You are responsible for clearing the cache when upstream data changes, either by calling memory.clear(), by passing a version argument to the function, or by using a time-based expiry implemented in the function body.

How do I track progress in a long Parallel job?

Set verbose=10 (the maximum) in Parallel() to print a status line after each completed job, including elapsed time, estimated remaining time, and memory usage. For a progress bar, use the tqdm library: wrap the generator with tqdm(delayed(func)(x) for x in items, total=len(items)) -- Joblib will pull items from the tqdm-wrapped iterator and tqdm updates the bar as items are consumed.

Are there memory issues with Joblib on long-running jobs?

When using the loky backend with large return values, worker memory can accumulate if workers are reused across many batches. Set max_nbytes="10M" in Parallel() to use memory-mapped files for return values above 10 MB instead of pickle serialisation. To prevent worker memory from growing across restarts, set Parallel(n_jobs=4, max_nbytes=None) combined with periodic worker recycling using loky.get_reusable_executor(max_workers=4, reuse="kill_workers").

Conclusion

Joblib makes two of the most common performance problems in data pipelines trivially easy to solve: parallelising embarrassingly parallel loops with Parallel and delayed, and caching expensive deterministic computations with Memory. You have seen how to replace a for loop with a parallel equivalent in four lines, choose the right backend for CPU-bound versus IO-bound work, cache results to disk, and integrate both patterns with scikit-learn.

The natural extension of the feature extraction pipeline is to add a cache validation step that checks file modification timestamps, and to feed the extracted features directly into a scikit-learn pipeline with n_jobs=-1 cross-validation -- so both the feature extraction and the model evaluation run in parallel with full caching.

For the full Joblib reference including memory-mapped arrays, batch processing, and custom backends, see the official Joblib documentation.

How To Use Python PyArrow for Columnar Data Processing

Intermediate

You have a CSV file with 50 million rows. Loading it into Pandas takes three minutes and uses 8 GB of RAM. Your colleague sends you a Parquet file and asks you to join it with another dataset. Your data pipeline reads from an S3 bucket where every file is a different format. Each of these scenarios has the same underlying solution: stop treating your data as row-based text and start treating it as columnar binary data.

PyArrow is the Python binding for Apache Arrow, a cross-language in-memory data format and file I/O library. It reads Parquet files in seconds, handles files larger than RAM with streaming, and converts between Pandas DataFrames, Polars frames, and NumPy arrays with zero copies in most cases. Install it with pip install pyarrow. Pandas installs PyArrow as an optional dependency, but using it directly gives you more control over schema, compression, and memory layout.

This article covers creating Arrow tables and arrays, reading and writing Parquet files, filtering and selecting columns during reads to avoid loading unnecessary data, streaming large files in batches, converting between PyArrow and Pandas, and using PyArrow’s compute functions for fast transformations. By the end you will have the tools to handle data files that would overwhelm a naive CSV-based pipeline.

PyArrow Parquet: Quick Example

The most common use case for PyArrow is reading and writing Parquet files. Here is the smallest working example — write a table to Parquet and read it back.

# quick_pyarrow.py
import pyarrow as pa
import pyarrow.parquet as pq

# Create a table from Python dicts
table = pa.table({
    "name":   ["Alice", "Bob", "Carol", "David"],
    "age":    [30, 25, 35, 28],
    "salary": [95000.0, 72000.0, 110000.0, 88000.0],
})

print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")

# Write to Parquet
pq.write_table(table, "employees.parquet", compression="snappy")
print("Wrote employees.parquet")

# Read it back
loaded = pq.read_table("employees.parquet")
print(loaded.to_pydict())

Output:

Schema: name: string
age: int64
salary: double
Rows: 4, Columns: 3
Wrote employees.parquet
{'name': ['Alice', 'Bob', 'Carol', 'David'], 'age': [30, 25, 35, 28], 'salary': [95000.0, 72000.0, 110000.0, 88000.0]}

The pa.table() constructor infers schema from the Python types automatically. PyArrow maps Python str to Arrow string, Python int to int64, and Python float to double. The Snappy compression used in write_table() is a fast, widely-supported compressor that typically reduces Parquet file sizes by 50-70% with negligible CPU overhead. The following sections go deeper into schema control, partial reads, and large-file handling.

What Is PyArrow and Why Use It?

Apache Arrow defines a language-independent in-memory format for columnar data. Where a CSV row stores "Alice,30,95000" as a single string, Arrow stores all names in one contiguous memory buffer, all ages in another, and all salaries in a third. This columnar layout means that reading only the salary column never touches the name data — the disk seeks and memory reads are proportional to the columns you actually need, not to the total row width.

OperationCSV + pandasParquet + PyArrow
Read 50M rows, all columns3-5 min, 8 GB RAM15-30 sec, 1-2 GB RAM
Read 50M rows, 2 columnsSame (must parse all)2-4 sec (skips unused columns)
Filter rows during readNot possibleYes, via predicate pushdown
File size (same data)500 MB (uncompressed)50-100 MB (Snappy)
Schema enforcementGuessed at read timeStored in file metadata

Parquet is the standard file format for data engineering pipelines on platforms like AWS Athena, Google BigQuery, and Apache Spark. If you receive data from any of these systems, it will often be Parquet. If you send data to them, Parquet is what they prefer. PyArrow is the reference Python implementation for reading and writing this format.

Defining and Enforcing Schema

When reading files from multiple sources, auto-inferred types can cause subtle bugs — an integer column that arrives as a string in one file breaks a join against an integer column in another. PyArrow lets you define a schema explicitly and validate incoming data against it.

# schema_control.py
import pyarrow as pa
import pyarrow.parquet as pq

# Define the exact types you expect
schema = pa.schema([
    pa.field("user_id",    pa.int32()),
    pa.field("username",   pa.string()),
    pa.field("email",      pa.string()),
    pa.field("age",        pa.int16()),
    pa.field("score",      pa.float32()),
    pa.field("active",     pa.bool_()),
])

# Create data that matches the schema
data = {
    "user_id":  pa.array([1, 2, 3],          type=pa.int32()),
    "username": pa.array(["alice", "bob", "carol"]),
    "email":    pa.array(["a@x.com", "b@x.com", "c@x.com"]),
    "age":      pa.array([30, 25, 35],        type=pa.int16()),
    "score":    pa.array([0.95, 0.82, 0.99],  type=pa.float32()),
    "active":   pa.array([True, True, False]),
}

table = pa.table(data, schema=schema)
print(f"Schema: {table.schema}")
print(f"Memory: {table.nbytes:,H�]\ȊB��K�ܚ]W�X�JX�K�\�\�˜\�]Y]�B����XY�X��KH��[XH\��\�\��Y[�H�[HY]Y]B��YYHK��XY�X�J�\�\�˜\�]Y]�B��[�
���YY��[XN���YY���[X_H�O���O���O������ۙϓ�]]����ۙϏ�����O���O���[XN�\�\��Y�[�̂�\�\��[YN���[�™[XZ[���[�˜Y�N�[�M�����ܙN���]�X�]�N�����Y[[ܞN�M��]\“�YY��[XN�\�\��Y�[�̂�\�\��[YN���[�™[XZ[���[�˜Y�N�[�M�����ܙN���]�X�]�N�������O���O��������[���X[\�[�Y�\�\\�
��O�[�M����O�[��XYو��O�[�
����O�H[���O���]̏���O�[��XYو��O���]
����O��[�[�HHY[[ܞH����[�وH\��H]\�]Y�H�[Y\��][�H�X[\��[��K�H��[XH\��ܙY[��YHH\�]Y]�[I��Y]Y]K���[�[�H�XYH�[H]\�
܈�\�H]�]H��XY�YJKH\\�\�H�\�\��Y^X�H\�[�HY�[�Y[HKH���Y\��[��]�XY[YK������YH��[X�]�K\�XYȏ��XY[��ۛH�][�H�YY������\�]Y]	����[[�\��ܛX]]�P\������\��[[��[��[\�����]H�[K\�XY]�[�Y�ܙHH]H[�\��]ۈY[[ܞK�\�\��[Y�YX�]H\��ۈ[���[[��ڙX�[ۋ��܈\��H�[\�]�[��YX�H�XY[YH���HZ[�]\���X�ۙ��H�XY[��ۛHH�]\�]\��[�\��[\�������O���O���[X�]�WܙXY�B�[\ܝX\���˜\�]Y]\�B�[\ܝX\���˘��\]H\�š[\ܝX\����\�B����\��ܙX]HH\��\��[\H�[B�[\ܝ�[��B�����HL��X�HHK�X�Jˆ�\�\��Y��\�
�[��J����JK����[��H��ܘ[��K���X�JȐUH��Tȋ�Rȋ��H�JH�܈�[��[��J����WK���]�[�YH��ܛ�[�
�[��K�[�Y�ܛJLL
K�H�܈�[��[��J����WK���]Y�ܞH��ܘ[��K���X�JȐH�����ȗJH�܈�[��[��J����WK���X��Ȏ�ܘ[��K��[�[�
L
H�܈�[��[��J����WK�JB�K�ܚ]W�X�JX�K�]�[�˜\�]Y]���\�\��[ۏH�ۘ\H�B��[�
��ܛ�Hܛ��΋H���ȊB����XYۛH���[[���]و
B�\�X[HK��XY�X�J�]�[�˜\�]Y]���[[��Vȝ\�\��Y���]�[�YH�JB��[�
��\�X[�XY��\�X[��[W���[[��H��[[���\�X[��[Wܛ��΋H���ȊB����XYۛH]\��[X[�����
�YX�]H\��ۊB�]Wٚ[\�H����[��H��H��UH�WB�]W�X�HHK��XY�X�J�]�[�˜\�]Y]��[\��X]Wٚ[\�B��[�
��UH���Έ�]W�X�K��[Wܛ��΋H�B�����X�[�N�ۛHUH����ۛH���[[�˜]Wܙ]�[�YHHK��XY�X�J�]�[�˜\�]Y]���[[��Vȝ\�\��Y���]�[�YH�K�[\��X]Wٚ[\�B��[�
��UH�]�[�YH���Έ�]Wܙ]�[�YK��[Wܛ��΋K��[[�Έ�]Wܙ]�[�YK��[W���[[��H�O���O���O������ۙϓ�]]����ۙϏ�����O���O�ܛ�HL���”\�X[�XY����[[��L���UH���Έ�N
UH�]�[�YH���Έ�N
���[[�Έ����O���O����H��O��[\�����O�\�[Y]\�X��\�H\�و��O���[[��\�]܋�[YJO���O�\\ˈ�\ܝY�\�]ܜ�[��YH��O�O���O���O�OO���O���O������O���O���O���O���O�������O���O����O���O���O�[����O�[���O���[����O��[�H�[���X�[�H�ۙ][ۜ�[��S�ܛ�\��]�\�Y\�Έ��O��[\��V����[��H��H��UH�K
��]�[�YH����ȋ
L
WO���O��ۈ�X[�[\��]Z[[ۜ�و����\�H�[\���[X]X�[H�YX�HH�]\��XY���H\���X�]\�H\�]Y]�ܙ\�\�\���Yܛ�\�]\�X��]]P\������\[�\�H�[����]�]�XY[��Z\�]K�����KKHSPQ�W�P�R�T��H]H�[H�]��YH��[[��Y�Y�Y[���Y����\�[��\��ܙ^YY�]���[���[X�]�H��[[��XY[�ˈ�\[ێ��L��[[��[�H�[K�[�\��[\��XY���\�]Y]�\ۉ��[�HH��[[�[�H��\��KO����YH�\��KY�[\ȏ���X[Z[��\��H�[\��]\�]Y]�[O�������[�H�[H\�\��\�[�]�Z[X�H�SK�Y[��H��HX�H]ۘ�H\���[��[ۋ�P\��������O�\�]Y]�[O���O��\���ݚY\�[�]\�]܈[�\��X�H]�XY�H�[H[����ܛ�\�KHH[�\��[�]�\�]\�]Y]ܙ�[�\�\�]H[�ˈ���\��[��ۙH�]�]H[YH]�[�H[�H\��]�\�[H\��H�[\��]H�^YY[[ܞH����[�������O���O����X[Z[��ܙXY�B�[\ܝX\���˜\�]Y]\�B�[\ܝX\���˘��\]H\�‚���[�H�[H�]�]�Y[��]�\�]Y]ٚ[HHK�\�]Y]�[J�]�[�˜\�]Y]�B���[�
��Y]Y]N��\�]Y]ٚ[K�Y]Y]K��[Wܛ��΋H�[���ȊB��[�
�����ܛ�\Έ�\�]Y]ٚ[K�Y]Y]K��[Wܛ���ܛ�\�H�B��[�
����[XN��\�]Y]ٚ[K���[XW�\����H�B������\��[��]�\�وL����[ܙ]�[�YHH��]Wܛ���H���܈�]�[�\�]Y]ٚ[K�]\�ؘ]�\��]���^�OLL���[[��VȘ��[��H���]�[�YH�JN����[\��܈]\��[XB�X\��H˙\]X[
�]����[[����[��H�K�UH�B�]Wؘ]�H�]���[\�X\��B����[H�]�[�YH�܈\��]��Y�]Wؘ]���[Wܛ�������]�ܙ]�[�YHH˜�[J]Wؘ]����[[���]�[�YH�JK�\��J
B��[ܙ]�[�YH
�H�]�ܙ]�[�YHY��]�ܙ]�[�YH[�H�]Wܛ���
�H]Wؘ]���[Wܛ��‚��[�
�����\��Y�]Wܛ��΋HUH���ȊB��[�
���[UH�]�[�YN�	��[ܙ]�[�YN����H�O���O���O������ۙϓ�]]����ۙϏ�����O���O�Y]Y]N�L�[���”���ܛ�\ΈB���[XN���[��N���[�ˆ�]�[�YN��X�B����\��Y�N
�UH���•�[UH�]�[�YN�	L�
͋
Mˌ�O���O���O����H��O�]\�ؘ]�\�
O���O�Y]��XY�ۙH�]�]H[YH[�ZY[���O��X�ܙ�]����O�ؚ�X�ˈH��O�X\���˘��\]O���O�[�[H�ݚY\��X�ܚ\�Y�\�][ۜ�Z�H��O�˙\]X[

O���O���O�˜�[J
O���O���O�˛YX[�
O���O�[�X[�H[ܙHKH\�H�[�]��YYۈH�]�\��^\��]�]ܙX][��]ۈؚ�X���܈XX���ˈ\���X�[�][ۈو��X[Z[���XY�[���\]H�\�][ۜ�\���]H\[[�\����\��][KY�Y�X�]H�[\�ۈXX�[�\��][Z]Y�SK������YH��X[[Y�KY^[\H���X[SY�H^[\N�Z[H�[\��\ܝ\[[�O������H����[��\[[�H�XY�H\�X�ܞHوZ[H\�]Y]�[\��[\���܈H�\��[�[۝Y�ܙY�]\��]�[�YH�H��[��K[�ܚ]\�H�[[X\�H\�]Y]�[K�\�]\��\X\��[�[��]H[��[�Y\�[���ܚٛ��ˏ�����O���O���[\��\[[�K�B�[\ܝX\����\�B�[\ܝX\���˜\�]Y]\�B�[\ܝX\���˘��\]H\�™���H]X�[\ܝ]����H]][YH[\ܝ]B�[\ܝ�[��B��UW�T�H]
��[\��]H�B�UW�T��Z�\�^\����U�YJB��Y�ܙX]W��[\Wٚ[\�
N�����ܙX]H�[\HZ[H�[\�\�]Y]�[\ˈ�����܈^H[��[��JK
N������H�[��K��[�[�

LML
B�X�HHK�X�Jˆ�]H��و����L
K^�^N��H�H
���������[��H��ܘ[��K���X�JȐUH��Tȋ�Rȋ��H�JH�܈�[��[��J����WK����X���ܘ[��K���X�JȔ�ȋ��\�Xȋ�[�\��\�H�JH�܈�[��[��J����WK���]�[�YH��ܛ�[�
�[��K�[�Y�ܛJ�
L
K�H�܈�[��[��J����WK�JB�K�ܚ]W�X�JX�KUW�T�����[\�̌��L
K^�^N��K�\�]Y]�B��[�
��ܙX]Y�^_H�[\H�[\�[��UW�T�KȊB��Y��[[X\�\�W؞W���[��J]W�\��]
HO�K�X�N������XY[�[\��]\���]�[�YH�[��H��[��K�����\�]Y]ٚ[\�H�ܝY
]W�\���؊���\�]Y]�JB�Y���\�]Y]ٚ[\΂��Z\�H�[S����[�\��܊����\�]Y]�[\�[��]W�\�H�B��[�X�\�H�B��܈�[�\�]Y]ٚ[\΂�HK��XY�X�J���[[��VȘ��[��H���]�[�YH�JB�[�X�\˘\[�

B����X�[�YHK��ۘ�]�X�\�[�X�\�B��[�
����X�[�Y����X�[�Y��[Wܛ��΋H�������H�[�\�]Y]ٚ[\�_H�[\ȊB���ܛ�\�H��[��H\�[��P\����X�Hܛ�\�B�ܛ�\YH��X�[�Y�ܛ�\؞J���[��H�K�Y�ܙY�]J���]�[�YH���[H�K
��]�[�YH����[��WJB��]\��ܛ�\Y��ܝ؞J���]�[�YW��[H��\��[�[�ȊWJB��ܙX]W��[\Wٚ[\�
B��[[X\�HH�[[X\�\�W؞W���[��JUW�T�B��[�
���]�[�YH�H��[��N��B��܈���[��[[X\�K���[\�

N���[�
��ܛ������[��I�N��H	ܛ���ܙ]�[�YW��[I�N��L����H
ܛ���ܙ]�[�YW���[�	�N�H�[��X�[ۜ�H�B��K�ܚ]W�X�J�[[X\�K�[۝W��[[X\�K�\�]Y]�B��[�
��ܛ�H[۝W��[[X\�K�\�]Y]�O���O���O������ۙϓ�]]����ۙϏ�����O���O�ܙX]Y
��[\H�[\�[��[\��]K��X�[�Y�
��
��������H
��[\”�]�[�YH�H��[��N��T�	
�L�N��
M��H�[��X�[ۜ�B�UH	
�N
͍K�LH
M�N
H�[��X�[ۜ�B�R�	
N
�
Mˎ
M�
�[��X�[ۜ�B��H	
�K�����
M�
L��[��X�[ۜ�B�ܛ�H[۝W��[[X\�K�\�]Y]���O���O����H��O�K��ۘ�]�X�\�
O���O��[��[ۈ�X���][\HX�\��]H�[YH��[XH[��ۙK�H��O�ܛ�\؞J
K�Y�ܙY�]J
O���O��Z[�\Y\��X�ܚ\�YY�ܙY�][ۈ]��YYKH��]ۈ��ݙ\����ˈ[�H�[�^[�\�\[[�H�YH]H�[\�
ۛH���\���[\��]�\�[�H\��[�[Y\�[\
Kܚ]HH�[[X\�H���\�[����O�X\���˙�˔�њ[T�\�[O���O�܈\�][ۈH�]]�H��[��H\�[����O�K�ܚ]W���]\�]

O���O������KKHSPQ�W�P�R�T��][\H]H�[\����[��[��H�[��[[���Z[���]\�H�[��H�[[X\�HX�K��\[ێ���ۘ�]�X�\�
H
�ܛ�\؞J
K����S���\���\�\����H�\]Z\�Y��KO����YH��\H����\]Y[�H\��Y]Y\�[ۜ�������YH��\K\[�\ȏ����\�P\�����[]H�[�\���ς��P\����[�[�\�\�H��\[Y[�\�K�[�\�\�\�P\����\�H�X��[��܈]��]�\���O�\����\O���O���[[��
[�X�Y�]��O���XY�\�]Y]
��[K�\�]Y]�[��[�OH�X\���ȊO���O�K[�[�H�[��۝�\���Y[N���O�\�����X�K���[�\�
O���O�[���O�K�X�K����W�[�\��O���O��H�۝�\��[ۈ\�ٝ[��\��X��H�X�]\�H��X��\�Y\��[��\�HH�[YHY[[ܞH�Y��\�ˈ\�HP\����\�X�H�[�[�H�YY��[XH�۝����X[Z[���XY�܈�YX�]H\��ێ�\�H[�\��[�[�H�YY]�^[��]�H]HX[�\[][ۈTK������YH��\K\\�]Y]]��X�݈���[���[H\�H\�]Y][��XYو�Տ��ς��\�H\�]Y]�[�]�\��[H�^�H܈�XY\��ܛX[��HX]\�Έ�[\�ݙ\�HP�\[[�\�]�XYH�[YH�[H[ܙH[�ۘ�K]Y\�Y\�]ۛH�YYH�X��]و��[[��܈]H�\�Y�]�Y[��\�[\ˈ�X���]�Ո�[�H�[H�YY���H[X[�\�XYX�K�[�H�X�Z]�[���\�[H�[���[�H\�]Y]܈�܈�\�H�X[ۙK[ٙ�]H�[�ٙ\�ˈHXZ[��ۜ�YHو\�]Y]\�][�H�[����[�][�H^Y]܈KH\�H��O�K��XY�X�J��[K�\�]Y]�K���[�\�
K�XY

O���O��[��X�]������YH��\KX��\�\��[ۈ���X���\�\��[ۈ��[H\�H�܈\�]Y]�[\���ς��\�H��O���\�\��[ۏH�ۘ\H����O�
HY�][[�X[�H���H�܈H�\��[[��Hو�YY[��[H�^�K�ۘ\HX��\�\��\�[�Z[\�X�ۙ�[��YX�\��[H�^�\��H
LM�	K�\�H��O���\�\��[ۏH�������O��܈LL�	H�]\���\�\��[ۈ�]H[�\��YY�YK[ٙ�KH����܈\��]�[�[\��XY[���\]Y[�K�\�H��O���\�\��[ۏH�ޚ\����O�ۛH�[�X^[][H��\]X�[]H\��YYY
ޚ\X��\�\��Y\�]Y]\��XYX�H�HH[�����K�]��Y��O���\�\��[ۏS�ۙO���O�[���X�[ۈKH[���\�\��Y\�]Y]\�]X�\��\��]��\��ܛX[��H�[�Y�]�܈�\]Y[�X[�XYˏ�����YH��\KZ��ۈ���[�P\�����XY��ӈ�[\���ς��Y\Έ��O�[\ܝX\���˚��ۈ\�Z��X�HHZ���XYڜ�ۊ�]K���ۛ�O���O��XY��]�[�KY[[Z]Y��ӈ
ۙH��ӈؚ�X�\�[�JH[��[�\����X�H�]]]�X]X���[XH[��\�[��K��܈�Y�[\���ӈ\��^\�\�H��O�K�X�K����W�[\�
��ۋ��Y�^
JO���O���܈\��H��ӈ�[\�P\��������ӈ�XY\��\ܝ�����\�^�KX�\�Y��X[Z[�Έ��O�Z���XYڜ�ۊ�[W�]�XY��[ۜ�\Z���XY�[ۜ�������^�OLL�
�L�
JO���O�������YH��\K\�ȏ��\�P\�����ܚ��]��[���Y�ܘY�O��ς��Y\ˈ[��[��O�\[��[X\�������O���O��Y���\ܝ[�\�H��O�X\���˙�˔�њ[T�\�[J�Y�[ۏH�\�YX\�LH�O���O�\�H�[\�\�[H\��[Y[����O�K��XY�X�J��X��]��Y�^ٚ[K�\�]Y]��[\�\�[O\��O���O��H�[YH�[\�\�[H[�\��X�H�ܚ���]����H��Y�ܘY�H
��O���њ[T�\�[O���O�H[�^�\�H�؈�ܘY�H
��O�^�\�P�ؑ�[T�\�[O���O��XHH��O�Y�����O�X��Y�JK��YX�]H\��ۈ[���[[��ڙX�[ۈ�ܚ�H�[YH�^Hۈ�[[�H�[\�KHP\����ۛH�ۛ�Y�H�]\�]�YYˏ�����YH��ۘ�\�[ۈ���ۘ�\�[ۏ������P\�����]�\�]ۈ]�[�\��\�X�X��\���H\X�H\����X���\�[N��\���[[�\��XY���[XKY[��ܘ�Y\�]Y]�[\���X[Z[���XY��܈�[\�\��\�[��SK[��X�ܚ\�Y��\]H�\�][ۜˈ[�H]�H�Y[����ܙX]HX�\��]^X�]��[X\��XYۛHH��[[��[�����[�H�YY�XH��[[��ڙX�[ۈ[��YX�]H\��ۋ���\��\��H�[\�[��]�\��]��O�]\�ؘ]�\�
O���O�[��Z[H][KY�[HY�ܙY�][ۈ\[[�H�]��O��ۘ�]�X�\�
O���O�[���O�ܛ�\؞J
O���O�������H�]\�[^[��[ۈو\�\[[�H\��Y\�][ۙYܚ]\��]��O�K�ܚ]W���]\�]

O���O�
�X�ܙ�[�\�\��[\�[���X�\�X�ܚY\�Z�H��O���[��OPUK����O�H[���XY\�][ۙY]\�]��]��O�K��XY�X�J]\�]�]�[\��Vˋ��JO���O��X�XZ�\�\�]Y]�Z]�HZ�HH�[\H��[[�\�ܙH]X�\�H�]�][�H�\��\�[���\��X�\�K�������܈H��\]HP\����TH�Y�\�[��H[��Y[��T��ܛX]�Y������[�]\�]TK�YHHH�Y�H�΋��\���˘\X�K�ܙ������]ۋȏ�ٙ�X�X[\X�H\����]ۈ��[Y[�][ۏ�O�������YH��[]YX\�X�\ȏ��[]Y\�X�\�����[��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\�\��Y�܋Y�\�Y]Y��[YK[�\�][ۜ�Z[�\]ۋȏ����\�H�\���܈�\�]Q��[YH�\�][ۜ�[�]ۏ�O��O��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\]ۋ\[�\�Y�܋Y]KX[�[\�\�ȏ����\�H]ۈ[�\��܈]H[�[\�\��O��O��O�H�Y�H�΋��]ۚ�����ܘ[K���K���]�]\�K\]ۋY\��Y�܋\\�[[X[�Y\��X�]YX��\][��ȏ����\�H]ۈ\���܈\�[[[�\��X�]Y��\][���O��O���[�����]���^V��]�����[[�V��]��ܛ��V��]����X�[ۗB�KKH���]�K�X�Z�\�KO�				
					

How To Use Python Loguru for Simplified Logging

Beginner

Python’s built-in logging module requires you to write at least five lines of boilerplate before you see your first log message: import logging, call basicConfig, create a logger, set a level, set a formatter. By the time you have rotation, colour output, and exception tracebacks working, you have a small library worth of configuration. And then you copy it into every new project.

Loguru is a logging library that replaces all of that setup with a single import and a single pre-configured logger object. Install it with pip install loguru. From the first line of code you get coloured output, accurate file and line references, clean exception tracebacks, and a sensible default format. When you need to customise it, the API is a handful of methods rather than a configuration object hierarchy.

This article covers Loguru’s basic usage, log levels, adding file sinks with automatic rotation and retention, structured logging with bound context, exception capturing, filtering log output, and integrating Loguru with code that uses the standard logging module. By the end you will have a logging setup that works in development and production without changes.

Loguru in Python: Quick Example

Zero configuration is the whole point. Import logger and start logging. The pre-configured sink writes coloured, formatted output to stderr immediately.

# quick_loguru.py
from loguru import logger

logger.debug("Debugging the widget factory")
logger.info("Server started on port 8000")
logger.warning("Config file not found, using defaults")
logger.error("Failed to connect to database")
logger.critical("Disk space below 1%")

Output (colours appear in real terminals):

2026-05-21 09:00:01.234 | DEBUG    | __main__::3 - Debugging the widget factory
2026-05-21 09:00:01.235 | INFO     | __main__::4 - Server started on port 8000
2026-05-21 09:00:01.235 | WARNING  | __main__::5 - Config file not found, using defaults
2026-05-21 09:00:01.235 | ERROR    | __main__::6 - Failed to connect to database
2026-05-21 09:00:01.236 | CRITICAL | __main__::7 - Disk space below 1%

Every line includes a timestamp, the log level, the module name, the function name, and the line number — all without any setup code. Compare this to the standard library where you would need logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s'%(lineno)d %(message)s', level=logging.DEBUG) just to get a similar format. The rest of the article shows how to extend this with file output, rotation, and structured context.

What Is Loguru and How Does It Differ from logging?

Loguru is a third-party logging library designed around the idea that logging setup should be trivial. It ships with one pre-built logger object — you do not create logger instances per module. Instead of configuring handlers, formatters, and filters as separate objects, Loguru uses a single logger.add() call that accepts a destination (file path, callable, or stream) and all formatting/filtering options as keyword arguments.

Featurestdlib loggingLoguru
Setup lines5-15 lines minimum0 (works at import)
Coloured outputThird-party libraryBuilt-in
Exception tracebackManual exc_info=Truelogger.exception() or opt()
File rotationRotatingFileHandlerlogger.add() rotation= param
Structured contextLogRecord extra dictlogger.bind()
Async supportNot built-inlogger.add(enqueue=True)

The main trade-off is that Loguru is not a drop-in replacement for the standard library in code that expects logging.Logger instances. The section on integration below explains how to bridge this when you use libraries that log through the standard module. For greenfield code and scripts, Loguru is a straightforward upgrade.

Adding File Sinks with Rotation and Retention

The default Loguru sink writes to stderr. To write to a file as well, call logger.add() with a file path. The rotation parameter creates a new log file when a size or time threshold is reached, and retention automatically deletes old files to prevent disk exhaustion.

# file_logging.py
from loguru import logger
import sys

# Remove the default stderr sink
logger.remove()

# Add a clean stderr sink for development (no colours in redirected streams)
logger.add(sys.stderr, level="INFO", format="{time:HH:mm:ss} | {level} | {message}")

# Add a rotating file sink for production
logger.add(
    "logs/app_{time:YYYY-MM-DD}.log",  # One file per day
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {name}:{line} | {message}",
    rotation="10 MB",       # Rotate when file exceeds 10 MB
    retention="30 days",    # Keep files for 30 days, then delete
    compression="gz",       # Compress rotated files
    encoding="utf-8",
)

logger.info("Application started")
logger.debug("Loading configuration from /etc/app/config.toml")
logger.warning("Cache miss rate above 20%")

File output (logs/app_2026-05-21.log):

2026-05-21 09:00:01 | DEBUG    | file_logging:7 - Application started
2026-05-21 09:00:01 | DEBUG    | file_logging:8 - Loading configuration from /etc/app/config.toml
2026-05-21 09:00:01 | WARNING  | file_logging:9 - Cache miss rate above 20%

The rotation parameter accepts size strings ("10 MB", "1 GB"), time strings ("1 day", "monday"), or a datetime.time object for rotation at a specific time of day. The retention parameter accepts count (5 -- keep five files), time strings ("2 weeks"), or a callable for custom logic. Calling logger.remove() at the top removes the default stderr sink so you have full control over every output destination.

Structured Logging with bind() and contextualize()

In a web application, every log message for a single request should carry the same request ID and user ID so you can filter the log file by request. Loguru's logger.bind() creates a new logger instance with extra context fields attached to every message it produces. logger.contextualize() does the same thing but for the duration of a context manager block.

# structured_logging.py
from loguru import logger
import uuid

logger.add("logs/structured.log", format="{time} | {level} | {extra[request_id]} | {extra[user_id]} | {message}", level="DEBUG")

def handle_request(user_id: int) -> dict:
    request_id = str(uuid.uuid4())[:8]
    # Create a bound logger that tags every message with these values
    req_logger = logger.bind(request_id=request_id, user_id=user_id)

    req_logger.info("Request received")
    req_logger.debug("Fetching user profile from database")

    result = {"user_id": user_id, "name": "Alice"}
    req_logger.info("Request completed successfully")
    return result

# Simulate two concurrent requests
handle_request(user_id=42)
handle_request(user_id=99)

Output in structured.log:

2026-05-21 09:00:01... | INFO  | a1b2c3d4 | 42 | Request received
2026-05-21 09:00:01... | DEBUG | a1b2c3d4 | 42 | Fetching user profile from database
2026-05-21 09:00:01... | INFO  | a1b2c3d4 | 42 | Request completed successfully
2026-05-21 09:00:01... | INFO  | e5f6a7b8 | 99 | Request received
2026-05-21 09:00:01... | DEBUG | e5f6a7b8 | 99 | Fetching user profile from database
2026-05-21 09:00:01... | INFO  | e5f6a7b8 | 99 | Request completed successfully

Every message produced by req_logger carries the request_id and user_id fields. When you search this log for a1b2c3d4, you see every event for that request in chronological order regardless of which function produced it. For async web frameworks, use logger.contextualize() inside an async context manager to achieve the same effect without threading issues.

Capturing Exceptions with Full Tracebacks

Loguru's exception capture adds variable values to tracebacks, showing not just which line raised the exception but what each variable contained at that moment. This turns a cryptic traceback into a self-contained bug report. Use logger.exception() inside an except block, or decorate a function with @logger.catch to capture any unhandled exception it raises.

# exception_capture.py
from loguru import logger

logger.add("logs/errors.log", level="ERROR", backtrace=True, diagnose=True)

@logger.catch
def parse_config(data: dict) -> dict:
    """Parse a config dict -- crashes if 'port' is not an integer."""
    port = int(data["port"])     # Will fail if port is a string like "abc"
    host = data["host"]
    return {"host": host, "port": port}

def safe_divide(a: float, b: float) -> float:
    try:
        return a / b
    except ZeroDivisionError:
        logger.exception("Division by zero: a={}, b={}", a, b)
        return 0.0

result = safe_divide(10, 0)
print(f"safe_divide result: {result}")

# This will log the full traceback with variable values
parse_config({"host": "localhost", "port": "abc_not_a_number"})

Output:

2026-05-21 09:00:01 | ERROR | Division by zero: a=10, b=0
Traceback (most recent call last):
  File "exception_capture.py", line 14, in safe_divide
    return a / b
           ^^^^^
ZeroDivisionError: division by zero

safe_divide result: 0.0

2026-05-21 09:00:01 | ERROR | An error has been caught in function 'parse_config'
...
    port = int(data["port"])
               --> data = {'host': 'localhost', 'port': 'abc_not_a_number'}
ValueError: invalid literal for int() with base 10: 'abc_not_a_number'

The diagnose=True parameter on the sink enables Loguru's enhanced traceback output that shows variable values at each frame. The @logger.catch decorator catches any exception the function raises, logs it with the full enhanced traceback, and re-raises it (or suppresses it if you pass reraise=False). This is especially useful on entry points like Celery task functions or scheduled jobs where unhandled exceptions would otherwise disappear silently.

Real-Life Example: Application Logger Module

The following module shows a production-ready logging setup that you can drop into any project. It configures stderr for development, a rotating file for production, and provides a function to get a pre-bound logger for each module.

# app_logger.py
import sys
from loguru import logger
from pathlib import Path

def setup_logging(log_dir: str = "logs", level: str = "INFO", debug: bool = False) -> None:
    """Configure application logging for development or production."""
    logger.remove()  # Remove default handler

    console_level = "DEBUG" if debug else level
    logger.add(
        sys.stderr,
        level=console_level,
        format="{time:HH:mm:ss} | {level:<8} | {name}:{line} - {message}",
        colorize=True,
    )

    Path(log_dir).mkdir(parents=True, exist_ok=True)
    logger.add(
        f"{log_dir}/app_{{time:YYYY-MM-DD}}.log",
        level="DEBUG",
        format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level:<8} | {name}:{line} | {extra} | {message}",
        rotation="50 MB",
        retention="14 days",
        compression="gz",
        encoding="utf-8",
        backtrace=True,
        diagnose=True,
    )
    logger.info("Logging configured: console={}, file=DEBUG", console_level)

def get_logger(component: str):
    """Return a logger pre-bound with a component tag."""
    return logger.bind(component=component)

# --- Usage ---
if __name__ == "__main__":
    setup_logging(debug=True)

    db_log = get_logger("database")
    api_log = get_logger("api")

    db_log.info("Connected to PostgreSQL on localhost:5432")
    api_log.info("Listening on 0.0.0.0:8000")
    api_log.warning("Rate limit reached for IP 192.168.1.1")

    try:
        result = 100 / 0
    except ZeroDivisionError:
        db_log.exception("Unexpected divide by zero during query optimisation")

Output:

09:00:01 | INFO     | app_logger:38 - Logging configured: console=DEBUG, file=DEBUG
09:00:01 | INFO     | app_logger:42 - Connected to PostgreSQL on localhost:5432
09:00:01 | INFO     | app_logger:43 - Listening on 0.0.0.0:8000
09:00:01 | WARNING  | app_logger:44 - Rate limit reached for IP 192.168.1.1
09:00:01 | ERROR    | app_logger:48 - Unexpected divide by zero during query optimisation
Traceback (most recent call last):
  ...
ZeroDivisionError: division by zero

The setup_logging() function is called once at application startup (in main.py or your WSGI/ASGI entry point). Every module in your application calls get_logger("module_name") to get a logger that tags all its messages with the component name, making log filtering trivial. You can extend this by reading level and debug from environment variables or a configuration file rather than hard-coding them.

Frequently Asked Questions

How do I use Loguru with libraries that use the standard logging module?

Most third-party libraries (SQLAlchemy, httpx, FastAPI) log through Python's standard logging module. To route those messages into Loguru, add an InterceptHandler: create a class that inherits from logging.Handler, override emit() to call logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage()), and install it with logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True). This captures all standard library logging output and routes it through Loguru's sinks.

Does Loguru work with asyncio and async web frameworks?

Yes. For async code, use logger.add(sink, enqueue=True) to make the sink write asynchronously via a background thread queue, which avoids blocking the event loop. For per-request context in async frameworks like FastAPI or Starlette, use logger.contextualize(request_id=..., user_id=...) inside a middleware function -- it uses contextvars.ContextVar under the hood, which is async-safe.

Can I create custom log levels in Loguru?

Yes, call logger.level("TRACE", no=5, color="<dim>", icon="@") to define a custom level below DEBUG, or any numeric value for other positions. After registering it, log with logger.log("TRACE", "Very detailed trace message"). Custom levels appear in the formatted output with the colour and icon you defined, and you can filter them normally with level="TRACE" in any sink.

How do I get JSON-formatted log output for log aggregators?

Pass a callable as the format parameter: logger.add(sink, format=lambda record: json.dumps({"time": str(record["time"]), "level": record["level"].name, "message": record["message"], **record["extra"]}) + "\n", serialize=True). Alternatively, use logger.add(sink, serialize=True) which outputs Loguru's own JSON format with all record fields. Most log aggregators (Datadog, Loki, Elastic) can ingest either format.

How do I stop Loguru from writing to stderr?

Every sink added by logger.add() returns an integer ID. Pass that ID to logger.remove(sink_id) to remove that specific sink. To remove all sinks including the default stderr one, call logger.remove() with no arguments. This is the first line in most production setups -- remove the default, then add exactly the sinks you want.

Conclusion

Loguru makes Python logging something you set up once and forget. You have seen how to log at different levels with zero configuration, add rotating file sinks with automatic cleanup, attach structured context fields with bind(), capture exceptions with enhanced tracebacks, and build a reusable logging module for multi-file projects. Every pattern here works in scripts, web applications, and background workers without changes.

The next step is to add the InterceptHandler to capture third-party library logs, and to wire setup_logging() to your application's configuration system so that log level and output directory come from environment variables rather than code. From there, piping the JSON-formatted file output into a log aggregator like Grafana Loki or Elastic gives you searchable, queryable logs across your entire stack.

See the official Loguru documentation for the complete sink options, serialisation reference, and async integration guide.

How To Use Python Celery with Redis for Task Queues

Intermediate

Your web app sends a welcome email when users register. That email call takes three seconds. For those three seconds, the user’s browser sits waiting for the server to respond — and your server thread is blocked, unable to handle anyone else. The fix is a task queue: hand the email job off to a background worker and return a 200 response immediately. The user gets instant feedback, your server is free, and the email arrives a moment later.

Celery is the most widely used task queue library in the Python ecosystem. It runs background jobs in separate worker processes, supports scheduled tasks (cron-style), and retries failed jobs automatically. Redis is the most common broker for Celery — it acts as the message bus between your application and the worker processes. Install both with pip install celery redis. You will also need a running Redis server; docker run -d -p 6379:6379 redis:7-alpine spins one up in seconds.

This article covers setting up Celery with Redis, defining and calling tasks, checking task results, chaining tasks together, scheduling periodic jobs, and monitoring your workers. By the end you will have a working background processing system with retry logic and result tracking.

Celery with Redis: Quick Example

The smallest possible Celery setup defines one task and calls it asynchronously. Three files are all you need: a Celery configuration, a task definition, and a caller script.

# tasks.py
from celery import Celery

# Connect to Redis running on localhost:6379
app = Celery("demo", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def add(x: int, y: int) -> int:
    """Add two numbers in the background."""
    return x + y
# caller.py
from tasks import add

# Queue the task -- returns immediately
result = add.delay(4, 6)
print(f"Task ID: {result.id}")

# Wait for the result (up to 10 seconds)
value = result.get(timeout=10)
print(f"Result: {value}")

Start the worker in a terminal, then run caller.py in another:

# Terminal 1 -- start the worker
$ celery -A tasks worker --loglevel=info

# Terminal 2 -- call the task
$ python caller.py
Task ID: 3f8a2b1c-4d5e-6f7a-8b9c-0d1e2f3a4b5c
Result: 10

The @app.task decorator registers the function as a Celery task. Calling add.delay(4, 6) puts a message on the Redis broker queue and returns an AsyncResult object immediately — the calling process does not wait. The worker process picks up the message, runs the function, and stores the result back in Redis. result.get() fetches that stored value. The sections below show how to build on this for real application scenarios.

What Is Celery and When Should You Use It?

Celery is a distributed task queue. Your application (the producer) puts job descriptions onto a message broker (Redis or RabbitMQ). One or more worker processes read jobs from the broker, execute the underlying Python function, and optionally store the result in a result backend. The producer and workers run as separate processes, which means slow tasks cannot block your web server.

Use caseWithout CeleryWith Celery
Send welcome emailBlocks request for 3+ secReturns instantly, email queued
Generate PDF reportRequest times out for large docsBackground job, user polls for result
Nightly database cleanupCron job, hard to monitorCelery Beat schedule, tracked
Process uploaded imageServer memory spike per requestWorker pool handles burst gracefully
Call a slow third-party APICascading timeoutsIsolated failure, retried automatically

Celery is not always the right tool. If you only need to defer a single function call without result tracking, Python’s built-in concurrent.futures.ThreadPoolExecutor or asyncio may be simpler. Celery adds operational complexity — you now have Redis and worker processes to monitor and keep running. Choose Celery when you need durable queuing (jobs survive a worker restart), retry logic, scheduling, or multiple worker machines.

Configuring Celery with a Config Object

Hard-coding broker URLs inside tasks.py makes configuration difficult to change between environments. The production pattern is to create a dedicated configuration object and load it with app.config_from_object(), keeping all settings in one place.

# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/0"

# Serialise tasks as JSON (not pickle) for security
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]

# Expire results after 1 hour
result_expires = 3600

# Maximum retries for failed tasks
task_max_retries = 3

# Time limit per task (seconds)
task_soft_time_limit = 300
task_time_limit = 360
# app.py
from celery import Celery

celery_app = Celery("myproject")
celery_app.config_from_object("celery_config")

@celery_app.task(bind=True, max_retries=3)
def send_email(self, to_address: str, subject: str, body: str) -> dict:
    """Send an email, retrying up to 3 times on failure."""
    try:
        # Simulate calling an email API
        print(f"Sending email to {to_address}: {subject}")
        # In production: smtplib.sendmail() or requests.post(email_api_url, ...)
        return {"status": "sent", "to": to_address}
    except Exception as exc:
        # Retry with exponential backoff: 60s, 120s, 240s
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Output (worker log):

[2026-05-21 09:00:01,234: INFO/ForkPoolWorker-1] Task app.send_email[a1b2c3] received
Sending email to user@example.org: Welcome!
[2026-05-21 09:00:01,289: INFO/ForkPoolWorker-1] Task app.send_email[a1b2c3] succeeded: {'status': 'sent', 'to': 'user@example.org'}

The bind=True argument gives the task function access to self, which is the task instance. This is required for self.retry(). The retry call re-queues the task with a delay computed by countdown and increments self.request.retries. After max_retries failures, Celery raises the original exception rather than retrying again.

Checking Task Status and Results

When a task is queued from a web request, you typically return the task ID to the client so they can poll for the result. The AsyncResult class provides a clean interface for checking status without blocking the server.

# result_tracking.py
from celery import Celery
from celery.result import AsyncResult
import time

app = Celery("tracking", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def slow_report(rows: int) -> dict:
    """Simulate a slow report generation."""
    time.sleep(2)
    return {"rows_processed": rows, "status": "complete"}

def check_result(task_id: str) -> None:
    result = AsyncResult(task_id, app=app)

    states = {
        "PENDING":  "Task is waiting to be picked up",
        "STARTED":  "Task is currently running",
        "SUCCESS":  "Task completed",
        "FAILURE":  "Task failed",
        "RETRY":    "Task is being retried",
        "REVOKED":  "Task was cancelled",
    }
    print(f"State: {result.state} -- {states.get(result.state, 'Unknown')}")

    if result.state == "SUCCESS":
        print(f"Result: {result.result}")
    elif result.state == "FAILURE":
        print(f"Error: {result.result}")
        print(f"Traceback: {result.traceback}")

if __name__ == "__main__":
    task = slow_report.delay(10000)
    print(f"Queued task: {task.id}")

    for _ in range(5):
        check_result(task.id)
        time.sleep(0.5)

    final = task.get(timeout=10)
    print(f"Final: {final}")

Output:

Queued task: 7f8a9b0c-1d2e-3f4a-5b6c-7d8e9f0a1b2c
State: PENDING -- Task is waiting to be picked up
State: STARTED -- Task is currently running
State: STARTED -- Task is currently running
State: STARTED -- Task is currently running
State: SUCCESS -- Task completed
Final: {'rows_processed': 10000, 'status': 'complete'}

In a web application, you would return {"task_id": task.id} from a POST endpoint and expose a GET endpoint like /tasks/{task_id} that calls check_result() and returns the current state. The client polls this endpoint until the state is SUCCESS or FAILURE. This pattern keeps web responses fast while still giving users visibility into long-running operations.

Scheduling Periodic Tasks with Celery Beat

Celery Beat is a scheduler that sends tasks to the worker queue on a defined interval. It runs as a separate process alongside your workers and replaces cron jobs for Python applications, with the advantage that schedules are defined in Python code and are visible in the same place as your task definitions.

# schedule_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery("scheduler", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

app.conf.beat_schedule = {
    # Run every 30 seconds
    "health-check-every-30s": {
        "task": "schedule_app.health_check",
        "schedule": 30.0,
    },
    # Run every day at 08:00
    "daily-report-8am": {
        "task": "schedule_app.generate_daily_report",
        "schedule": crontab(hour=8, minute=0),
        "kwargs": {"recipients": ["admin@company.com"]},
    },
    # Run every Monday at midnight
    "weekly-cleanup": {
        "task": "schedule_app.cleanup_old_records",
        "schedule": crontab(day_of_week=0, hour=0, minute=0),
    },
}
app.conf.timezone = "UTC"

@app.task
def health_check() -> str:
    print("Health check OK")
    return "OK"

@app.task
def generate_daily_report(recipients: list[str]) -> dict:
    print(f"Generating report for {recipients}")
    return {"status": "sent", "recipients": recipients}

@app.task
def cleanup_old_records() -> int:
    print("Cleaning up records older than 90 days...")
    return 0  # In production: DELETE FROM records WHERE created < NOW() - INTERVAL '90 days'

Start Beat and the worker in separate terminals:

# Terminal 1 -- worker
$ celery -A schedule_app worker --loglevel=info

# Terminal 2 -- beat scheduler
$ celery -A schedule_app beat --loglevel=info

# Beat output:
[2026-05-21 08:00:00,001: INFO/MainProcess] Scheduler: Sending due task daily-report-8am

The crontab() helper accepts the same fields as a Unix crontab entry -- hour, minute, day_of_week, day_of_month, month_of_year -- with * meaning "every". Always set app.conf.timezone explicitly; if left as the default it uses the server's local timezone, which causes confusing behaviour when servers are in different timezones or when daylight saving changes the offset.

Real-Life Example: Image Processing Pipeline

The following example shows a realistic Celery pipeline that processes an uploaded image: it resizes it, generates a thumbnail, and sends a completion notification -- three tasks chained together so each step waits for the previous one to finish.

# pipeline.py
from celery import Celery, chain
import os
import time

app = Celery("pipeline", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def resize_image(filepath: str, width: int = 800) -> dict:
    """Resize an image to the given width (simulated)."""
    print(f"Resizing {filepath} to {width}px wide...")
    time.sleep(1)  # In production: use Pillow Image.resize()
    resized_path = filepath.replace(".", f"_{width}.")
    return {"original": filepath, "resized": resized_path, "width": width}

@app.task
def create_thumbnail(resize_result: dict, thumb_size: int = 150) -> dict:
    """Create a thumbnail from the resized image."""
    resized_path = resize_result["resized"]
    print(f"Creating {thumb_size}px thumbnail from {resized_path}...")
    time.sleep(0.5)  # In production: use Pillow Image.thumbnail()
    thumb_path = resized_path.replace(".", "_thumb.")
    return {**resize_result, "thumbnail": thumb_path, "thumb_size": thumb_size}

@app.task
def notify_complete(pipeline_result: dict, user_id: int) -> str:
    """Send a completion notification to the user."""
    msg = (
        f"User {user_id}: image processing complete. "
        f"Resized: {pipeline_result['resized']}, "
        f"Thumbnail: {pipeline_result['thumbnail']}"
    )
    print(f"Notifying: {msg}")
    return msg

def process_uploaded_image(filepath: str, user_id: int):
    """Chain the three tasks: resize -> thumbnail -> notify."""
    workflow = chain(
        resize_image.s(filepath, width=1200),
        create_thumbnail.s(thumb_size=150),
        notify_complete.s(user_id=user_id),
    )
    result = workflow.apply_async()
    print(f"Pipeline queued: {result.id}")
    return result.id

if __name__ == "__main__":
    task_id = process_uploaded_image("uploads/photo.jpg", user_id=42)
    final = app.AsyncResult(task_id).get(timeout=30)
    print(f"Pipeline complete: {final}")

Output:

Pipeline queued: 9a8b7c6d-5e4f-3a2b-1c0d-9e8f7a6b5c4d
Resizing uploads/photo.jpg to 1200px wide...
Creating 150px thumbnail from uploads/photo_1200.jpg...
Notifying: User 42: image processing complete. Resized: uploads/photo_1200.jpg, Thumbnail: uploads/photo_1200_thumb.jpg
Pipeline complete: User 42: image processing complete...

The chain() primitive creates a workflow where each task's return value is automatically passed as the first argument to the next task. The .s() method creates a task "signature" -- a serialisable description of a task call that can be passed around and composed with other signatures. You can extend this pipeline by adding error handling with link_error, parallel processing with group(), or conditional branching with chord().

Frequently Asked Questions

Should I use Redis or RabbitMQ as the Celery broker?

Redis is simpler to set up and is the right choice for most projects. It doubles as both the broker and the result backend, requires minimal configuration, and handles thousands of tasks per second. RabbitMQ is better suited for very high message volumes, complex routing (fanout, topic exchanges), or systems where message durability guarantees are critical. For a typical web application, start with Redis -- you can migrate to RabbitMQ later if you hit its limits.

How many worker processes should I run?

Celery defaults to one worker process per CPU core (using the prefork pool). For CPU-bound tasks (image processing, data crunching), this is usually optimal. For IO-bound tasks (API calls, database queries), use the --concurrency flag to run more workers than cores: celery worker --concurrency=20 --pool=gevent with the gevent library can run hundreds of concurrent IO-bound tasks on a single machine.

What happens when a Celery task fails?

By default, a failed task (one that raises an unhandled exception) is marked as FAILURE and the exception is stored in the result backend. The task is not retried automatically unless you call self.retry() inside the task body or set autoretry_for on the task decorator: @app.task(autoretry_for=(RequestException,), max_retries=3, default_retry_delay=60). Always set a retry limit -- without one, a persistently failing task will retry forever.

How do I monitor Celery workers in production?

Flower is the standard monitoring tool for Celery: pip install flower, then celery -A myapp flower --port=5555. It provides a web dashboard showing active workers, queued tasks, task history, and success/failure rates. For production systems, integrate Celery with your existing observability stack by using Celery's signal hooks (task_prerun, task_postrun, task_failure) to emit metrics to Prometheus or log structured events to your log aggregator.

How do I integrate Celery with Django?

Create a celery.py file in your Django project's root package, set os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings'), and configure the app with app.config_from_object('django.conf:settings', namespace='CELERY'). Add CELERY_BROKER_URL and CELERY_RESULT_BACKEND to your Django settings. In your app's __init__.py, import the Celery app to ensure tasks are discovered on startup. The official Django-Celery integration guide at docs.celeryq.dev covers the exact file structure.

Conclusion

Celery with Redis transforms slow, blocking operations into fast, asynchronous background jobs. You have seen how to define tasks with @app.task, call them with .delay(), track results with AsyncResult, configure automatic retries, schedule periodic jobs with Celery Beat, and chain multiple tasks into a pipeline with chain(). These primitives cover the vast majority of real-world background processing needs.

The image processing pipeline in the real-life example is a natural starting point to extend: add a group() to process multiple uploaded images in parallel, use a chord() to aggregate results when all parallel tasks complete, or connect to a real email API in the notification step. Each extension follows the same .s() signature pattern you have already seen.

For the complete Celery API reference including canvas primitives, beat schedulers, and monitoring integrations, see the official Celery documentation.

How To Build a Python CLI App with Typer

Intermediate

You have a Python script that works great — but only you know how to run it. The arguments are hardcoded, the help text is a comment inside the file, and every time a colleague asks how to use it you end up writing a three-paragraph Slack message. Building a proper command-line interface turns that script into a tool other people can actually use.

Typer is a modern Python library for building CLIs with almost no boilerplate. It uses Python type hints to define arguments and options automatically, generates beautiful help text, and supports subcommands for complex tools. It is built on top of Click but removes most of the ceremony. Install it with pip install typer[all] — the [all] extra includes Rich for coloured output and the shell completion helper.

In this article you will learn how to create single-command CLIs, add optional flags and required arguments, build multi-command apps with subcommands, validate inputs with callbacks, and package everything into a distributable tool. By the end you will have a working file-management CLI that demonstrates every pattern Typer supports.

Building a Typer CLI: Quick Example

Here is the shortest possible Typer app — a greeting command that takes a name and an optional times flag. Copy this into a file and run it to see Typer in action immediately.

# greet.py
import typer

app = typer.Typer()

@app.command()
def greet(name: str, times: int = 1):
    """Greet someone by name."""
    for _ in range(times):
        typer.echo(f"Hello, {name}!")

if __name__ == "__main__":
    app()

Output:

$ python greet.py Alice
Hello, Alice!

$ python greet.py Alice --times 3
Hello, Alice!
Hello, Alice!
Hello, Alice!

$ python greet.py --help
Usage: greet.py [OPTIONS] NAME

  Greet someone by name.

Arguments:
  NAME  [required]

Options:
  --times INTEGER  [default: 1]
  --help           Show this message and exit.

The @app.command() decorator registers the function as a CLI command. Typer reads the type hints (name: str, times: int = 1) and automatically creates a required positional argument and an optional integer flag. The docstring becomes the help text — no separate help string needed. Further sections show how to build on this foundation for real-world tools.

What Is Typer and Why Use It?

Typer is a library that maps Python function signatures to CLI interfaces. You write a normal Python function with type-hinted parameters, and Typer generates the argument parser, validation logic, and help text for you. This is different from Python’s built-in argparse, which requires you to define each argument imperatively in a separate setup block.

Typer sits on top of Click, a mature and battle-tested CLI library. This means it inherits Click’s reliability while adding a cleaner API. The key advantage over raw Click is that you never have to write @click.option('--name', '-n', type=str, help='...') decorators — the function’s own type hints carry all of that information.

FeatureargparseClickTyper
Argument definitionImperative setupDecoratorsType hints
Help text sourceExplicit stringsExplicit stringsDocstrings
SubcommandsSubparsersGroupsMultiple commands
Type validationManualType paramAutomatic via hints
Coloured outputNoecho + styleRich integration
Python versionstdlib3.7+3.7+

The trade-off is that Typer is not in the standard library and adds a dependency. For simple scripts that need to stay dependency-free, argparse is still a reasonable choice. For anything beyond a single command, Typer saves substantial time and produces better help text automatically.

Arguments, Options, and Defaults

Typer distinguishes between arguments (positional, required by default) and options (named flags, optional by default). This distinction comes directly from how you define the function parameter: a bare type hint creates an argument, while a default value creates an option.

# file_info.py
import typer
from pathlib import Path

app = typer.Typer()

@app.command()
def info(
    filepath: Path,
    verbose: bool = False,
    encoding: str = "utf-8",
):
    """Show information about a file."""
    if not filepath.exists():
        typer.echo(f"Error: {filepath} does not exist.", err=True)
        raise typer.Exit(code=1)

    size = filepath.stat().st_size
    typer.echo(f"File: {filepath}")
    typer.echo(f"Size: {size} bytes")

    if verbose:
        typer.echo(f"Encoding hint: {encoding}")
        typer.echo(f"Suffix: {filepath.suffix}")

if __name__ == "__main__":
    app()

Output:

$ python file_info.py README.md
File: README.md
Size: 1420 bytes

$ python file_info.py README.md --verbose
File: README.md
Size: 1420 bytes
Encoding hint: utf-8
Suffix: .md

$ python file_info.py missing.txt
Error: missing.txt does not exist.
# Exit code 1

Notice that filepath: Path is a positional argument because it has no default value, and Typer automatically validates that the string provided on the command line is a valid path object. The verbose: bool = False parameter becomes a --verbose flag that Typer pairs with an automatic --no-verbose counterpart. Calling raise typer.Exit(code=1) exits the program with an error code that shell scripts and CI pipelines can detect.

Adding Help Text and Validation with Annotated

For production tools you need more control: per-argument help text, value constraints, and custom error messages. Typer uses Python’s Annotated type to attach this metadata directly to the parameter type hint without changing the function signature’s readability.

# convert.py
import typer
from typing import Annotated

app = typer.Typer()

@app.command()
def convert(
    value: Annotated[float, typer.Argument(help="The number to convert")],
    from_unit: Annotated[str, typer.Option(help="Source unit (km, miles, kg, lbs)")] = "km",
    to_unit: Annotated[str, typer.Option(help="Target unit")] = "miles",
):
    """Convert between common measurement units."""
    conversions = {
        ("km", "miles"): 0.621371,
        ("miles", "km"): 1.60934,
        ("kg", "lbs"): 2.20462,
        ("lbs", "kg"): 0.453592,
    }
    key = (from_unit.lower(), to_unit.lower())
    if key not in conversions:
        typer.echo(f"Unsupported conversion: {from_unit} -> {to_unit}", err=True)
        raise typer.Exit(code=1)

    result = value * conversions[key]
    typer.echo(f"{value} {from_unit} = {result:.4f} {to_unit}")

if __name__ == "__main__":
    app()

Output:

$ python convert.py 10
10.0 km = 6.2137 miles

$ python convert.py 150 --from-unit lbs --to-unit kg
150.0 lbs = 68.0388 kg

$ python convert.py --help
Usage: convert.py [OPTIONS] VALUE

  Convert between common measurement units.

Arguments:
  VALUE  The number to convert  [required]

Options:
  --from-unit TEXT  Source unit (km, miles, kg, lbs)  [default: km]
  --to-unit TEXT    Target unit  [default: miles]
  --help            Show this message and exit.

The Annotated wrapper lets you keep the function signature clean while embedding the CLI metadata (help text, default display, constraints) right alongside the type. Typer automatically converts option names from Python’s snake_case to CLI’s --kebab-case convention, so from_unit becomes --from-unit in the help text and on the command line.

Building Multi-Command Apps with Subcommands

Real tools like git, docker, and pip are structured as a main command with multiple subcommands (git commit, docker build, etc.). Typer handles this through the add_typer() method, which nests a child Typer app under a name in a parent app. This keeps each subcommand group in its own module, making large CLIs easy to maintain.

# notes_app.py
import typer
from datetime import datetime

app = typer.Typer(help="A simple notes manager.")
notes_app = typer.Typer(help="Manage your notes.")
app.add_typer(notes_app, name="notes")

# In-memory storage for this demo
_notes: list[dict] = []

@notes_app.command("add")
def add_note(text: str, tag: str = "general"):
    """Add a new note."""
    note = {"id": len(_notes) + 1, "text": text, "tag": tag, "created": datetime.now().strftime("%Y-%m-%d")}
    _notes.append(note)
    typer.echo(f"Added note #{note['id']}: {text!r} [{tag}]")

@notes_app.command("list")
def list_notes(tag: str = ""):
    """List all notes, optionally filtered by tag."""
    shown = [n for n in _notes if not tag or n["tag"] == tag]
    if not shown:
        typer.echo("No notes found.")
        return
    for note in shown:
        typer.echo(f"[{note['id']}] ({note['tag']}) {note['text']}")

@app.command()
def version():
    """Show the app version."""
    typer.echo("notes-app v1.0.0")

if __name__ == "__main__":
    app()

Output:

$ python notes_app.py --help
Usage: notes_app.py [OPTIONS] COMMAND [ARGS]...

  A simple notes manager.

Commands:
  notes    Manage your notes.
  version  Show the app version.

$ python notes_app.py notes add "Buy groceries" --tag shopping
Added note #1: 'Buy groceries' [shopping]

$ python notes_app.py notes add "Read Python docs" --tag learning
Added note #2: 'Read Python docs' [learning]

$ python notes_app.py notes list
[1] (shopping) Buy groceries
[2] (learning) Read Python docs

$ python notes_app.py notes list --tag learning
[2] (learning) Read Python docs

Each subcommand module defines its own Typer() instance and its own set of commands. The parent app wires them together with add_typer(). This structure scales cleanly — you can have notes, tags, and export subcommand groups, each in separate files, all composed into one top-level CLI without any single file growing unwieldy.

Input Validation with Callbacks

Some validation logic is too complex for a type hint alone — for example, ensuring a port number is in a valid range, or that a file has the right extension. Typer supports option callbacks: functions that run immediately after an argument is parsed, either to transform its value or to reject it with a helpful error message.

# server.py
import typer
from typing import Annotated

app = typer.Typer()

def validate_port(value: int) -> int:
    if not (1 <= value <= 65535):
        raise typer.BadParameter(f"Port must be between 1 and 65535, got {value}")
    if value < 1024:
        typer.echo(f"Warning: port {value} requires root privileges.", err=True)
    return value

@app.command()
def serve(
    host: str = "127.0.0.1",
    port: Annotated[int, typer.Option(callback=validate_port, help="Port to listen on")] = 8000,
    reload: bool = False,
):
    """Start the development server."""
    typer.echo(f"Starting server on {host}:{port}")
    if reload:
        typer.echo("Auto-reload enabled.")

if __name__ == "__main__":
    app()

Output:

$ python server.py
Starting server on 127.0.0.1:8000

$ python server.py --port 99999
Error: Invalid value for '--port': Port must be between 1 and 65535, got 99999

$ python server.py --port 80
Warning: port 80 requires root privileges.
Starting server on 127.0.0.1:80

The callback receives the parsed value and either returns the (possibly transformed) value or raises typer.BadParameter with a descriptive message. Typer catches BadParameter and formats it as an error line that matches the rest of the help output, so users see consistent error messages rather than Python tracebacks.

Real-Life Example: File Organiser CLI

The following CLI tool organises files in a directory by grouping them into subfolders based on their extension. It supports a dry-run mode for safe previewing and demonstrates argument types, options, subcommands, and exit codes all working together.

# organiser.py
import typer
from pathlib import Path
from typing import Annotated
import shutil

app = typer.Typer(help="Organise files in a directory by extension.")

EXTENSION_MAP = {
    ".jpg": "images", ".jpeg": "images", ".png": "images", ".gif": "images",
    ".mp4": "videos", ".mov": "videos", ".avi": "videos",
    ".pdf": "documents", ".docx": "documents", ".txt": "documents",
    ".py": "code", ".js": "code", ".ts": "code", ".html": "code",
    ".zip": "archives", ".tar": "archives", ".gz": "archives",
}

@app.command()
def organise(
    directory: Annotated[Path, typer.Argument(help="Directory to organise")],
    dry_run: bool = typer.Option(False, "--dry-run", help="Preview changes without moving files"),
    verbose: bool = False,
):
    """Move files into subfolders grouped by file type."""
    if not directory.is_dir():
        typer.echo(f"Error: {directory} is not a directory.", err=True)
        raise typer.Exit(code=1)

    files = [f for f in directory.iterdir() if f.is_file()]
    if not files:
        typer.echo("No files found.")
        return

    moved = 0
    skipped = 0
    for f in files:
        folder_name = EXTENSION_MAP.get(f.suffix.lower(), "misc")
        destination = directory / folder_name / f.name

        if dry_run:
            typer.echo(f"[dry-run] Would move: {f.name} -> {folder_name}/")
            moved += 1
        else:
            destination.parent.mkdir(exist_ok=True)
            shutil.move(str(f), str(destination))
            if verbose:
                typer.echo(f"Moved: {f.name} -> {folder_name}/")
            moved += 1

    action = "Would move" if dry_run else "Moved"
    typer.echo(f"{action} {moved} file(s). Skipped {skipped}.")

@app.command()
def stats(directory: Path):
    """Show file type statistics for a directory."""
    if not directory.is_dir():
        typer.echo(f"Error: {directory} is not a directory.", err=True)
        raise typer.Exit(code=1)

    counts: dict[str, int] = {}
    for f in directory.iterdir():
        if f.is_file():
            ext = f.suffix.lower() or "(no extension)"
            counts[ext] = counts.get(ext, 0) + 1

    if not counts:
        typer.echo("No files found.")
        return

    typer.echo(f"File types in {directory}:")
    for ext, count in sorted(counts.items(), key=lambda x: -x[1]):
        typer.echo(f"  {ext:15s} {count} file(s)")

if __name__ == "__main__":
    app()

Output:

$ python organiser.py ~/Downloads --dry-run
[dry-run] Would move: report.pdf -> documents/
[dry-run] Would move: photo.jpg -> images/
[dry-run] Would move: archive.zip -> archives/
Would move 3 file(s). Skipped 0.

$ python organiser.py stats ~/Downloads
File types in /home/user/Downloads:
  .pdf            12 file(s)
  .jpg             8 file(s)
  .zip             3 file(s)

This example shows Typer's practical strengths: the --dry-run flag uses typer.Option with a custom flag name, the second subcommand (stats) is added automatically just by decorating a second function, and error handling uses the standard typer.Exit(code=1) pattern. You can extend this tool by adding a config subcommand that reads a TOML file to customise the EXTENSION_MAP, or a --undo flag that reads a log file of previous moves.

Frequently Asked Questions

What is the difference between Typer and Click?

Typer is built on top of Click and shares its runtime behaviour. The difference is in the authoring experience. Click requires decorators like @click.option('--count', type=int, default=1) to define each argument, while Typer infers the same information from the function's type hints. For most projects Typer is easier to write and read; for advanced use cases that need Click's lower-level features (custom parameter types, multi-value options), you can mix Typer and Click decorators.

How do I install Typer and what does typer[all] include?

Install the base package with pip install typer. The [all] extra (pip install typer[all]) adds two optional dependencies: Rich for coloured terminal output and pretty error formatting, and shellingham for automatic shell completion detection. For production CLI tools the [all] install is recommended because Rich makes the help text significantly more readable.

How do I make an option required instead of optional?

Set the default to ... (Ellipsis): name: str = typer.Option(...). The three dots tell Typer that no default exists and the flag must be provided. Typer will show it in the help text as [required] and exit with an error message if the user omits it. Alternatively, use Annotated[str, typer.Option()] without a default value in the function signature.

How do I restrict an option to a fixed set of values?

Use a Python Enum as the type hint. Define an enum class with class Format(str, enum.Enum): json = "json"; csv = "csv"; table = "table", then type the parameter as format: Format = Format.table. Typer automatically validates that the provided value is one of the enum members and displays the choices in the help text.

How do I write tests for Typer CLI commands?

Use the typer.testing.CliRunner class (which wraps Click's test runner). Create a runner with runner = CliRunner(), then invoke your app with result = runner.invoke(app, ["subcommand", "--option", "value"]). Check result.exit_code for the exit status and result.stdout for the captured output. This lets you write unit tests for CLI commands without spawning a subprocess.

How do I add shell completion to a Typer app?

With typer[all] installed, Typer automatically adds a --install-completion flag to every app. Running python myapp.py --install-completion detects the user's shell (bash, zsh, fish, or PowerShell) and installs the completion script to the appropriate location. Users then get tab-completion for subcommands, options, and file-path arguments without any extra code.

Conclusion

Typer makes building Python CLIs feel as natural as writing a regular function. You have seen how type hints automatically create arguments and options, how docstrings become help text, how add_typer() composes multiple subcommand groups into a single tool, and how callbacks enforce validation rules before your main function runs. The file organiser example brought all of these together in a tool you could ship today.

The next step is to package your Typer app so others can install it. Add a pyproject.toml with a [project.scripts] entry pointing to your app's app() call, and users can run your tool by name after pip install .. Combine that with a virtual environment managed by uv and you have a professional-grade Python tool from a handful of annotated functions.

For the full Typer reference, including password prompts, progress bars, and custom parameter types, see the official Typer documentation.

How To Use Python Pydantic AI for Structured AI Outputs

How To Use Python Pydantic AI for Structured AI Outputs

Advanced

You ask an LLM to extract product information from a customer email, and it returns a beautifully formatted paragraph — but your code needed a JSON object with specific fields and specific types. So you write a prompt that says “respond in JSON format”, and sometimes it does, and sometimes it wraps the JSON in markdown code fences, and sometimes it adds an explanation paragraph after the closing brace. Parsing raw LLM output reliably is one of the most frustrating parts of building production AI applications. Pydantic AI solves this problem at the framework level by making structured, type-validated output a first-class feature of every LLM interaction.

Pydantic AI is a Python agent framework built by the Pydantic team. It uses your existing Pydantic models to define exactly what the LLM must return, handles the retry loop when the output does not validate, and gives you a fully typed result object that your IDE can autocomplete. It supports OpenAI, Anthropic, Google Gemini, Ollama, and other providers through a unified interface. If you already know Pydantic for data validation, Pydantic AI will feel immediately familiar — because it is literally the same BaseModel you already use.

In this article we will cover how Pydantic AI works and why it beats hand-crafted JSON prompts, how to define result types with Pydantic models, how to create agents with dependencies and system prompts, how to add tools to agents, how to handle validation retries, and how to build a real extraction pipeline. By the end you will be able to replace fragile text-parsing code with robust, type-safe agent responses.

Pydantic AI Quick Example

Here is the core pattern — define a Pydantic model, create an agent that returns that model, and call it with a user prompt. The agent guarantees the response matches your schema:

# quick_pydantic_ai.py
from pydantic import BaseModel
from pydantic_ai import Agent

class MovieReview(BaseModel):
    title: str
    year: int
    rating: float  # 0.0 to 10.0
    genre: str
    one_line_summary: str

agent = Agent(
    "openai:gpt-4o-mini",
    result_type=MovieReview,
    system_prompt="You are a film critic. Extract structured data from movie descriptions.",
)

result = agent.run_sync(
    "The Matrix came out in 1999, it's a sci-fi action film about a simulated reality. Rating: 9.2/10."
)

movie = result.data
print(f"Title: {movie.title}")
print(f"Year: {movie.year}")
print(f"Rating: {movie.rating}")
print(f"Genre: {movie.genre}")
print(f"Summary: {movie.one_line_summary}")
print(f"Type: {type(movie)}")

Output:

Title: The Matrix
Year: 1999
Rating: 9.2
Genre: Sci-Fi Action
Summary: A mind-bending film about a simulated reality and humanity's fight for freedom.
Type: <class '__main__.MovieReview'>

result.data is a fully validated MovieReview instance — not a string, not a dict. Your IDE knows its type, you get autocomplete on movie.rating, and if the LLM had returned a non-numeric rating, Pydantic AI would have retried the call automatically. This is the fundamental shift from parsing text to receiving typed objects.

Debug Dee watching LLM text become typed Python objects
result_type=EmailResponse — the LLM guesses, Pydantic validates.

What Is Pydantic AI and Why Use It?

Pydantic AI is an agentic AI framework that wraps LLM calls in a type-safe layer. Every call to a Pydantic AI agent returns a validated Pydantic model instance — not raw text. If the LLM returns output that fails validation, the framework sends the validation error back to the LLM and asks it to fix the output, automatically, up to a configurable retry limit.

ApproachReliabilityType safetyRetry on failure
Raw prompt + str parsingLowNoneManual
JSON mode (OpenAI)MediumNone (just a dict)Manual
Instructor libraryHighYes (Pydantic)Automatic
Pydantic AIHighYes (Pydantic)Automatic + dependencies + tools

Install Pydantic AI with the extras for your LLM provider:

# terminal
pip install pydantic-ai           # base install
pip install pydantic-ai[openai]   # for OpenAI
pip install pydantic-ai[anthropic] # for Anthropic Claude

Defining Result Types

The result type is a standard Pydantic BaseModel with field descriptions that help the LLM understand what to put in each field. Good field descriptions are the single biggest factor in extraction accuracy:

# result_types.py
from pydantic import BaseModel, Field
from typing import List, Optional
from pydantic_ai import Agent

class JobPosting(BaseModel):
    job_title: str = Field(description="The exact job title as listed")
    company: str = Field(description="Company name")
    location: str = Field(description="City, State or 'Remote'")
    salary_min: Optional[int] = Field(None, description="Minimum salary in USD per year, or None if not stated")
    salary_max: Optional[int] = Field(None, description="Maximum salary in USD per year, or None if not stated")
    required_skills: List[str] = Field(description="List of required technical skills")
    experience_years: int = Field(description="Minimum years of experience required (0 if not specified)")
    is_remote: bool = Field(description="True if remote work is allowed")

agent = Agent(
    "openai:gpt-4o-mini",
    result_type=JobPosting,
    system_prompt="Extract structured job posting data from the provided text.",
)

posting_text = """
Senior Python Developer -- Remote OK
TechCorp Inc. | San Francisco, CA (Remote)
Salary: $140,000 - $180,000/year

We need 5+ years Python experience. Skills: FastAPI, PostgreSQL, Docker, AWS.
Must be comfortable with distributed systems and async Python.
"""

result = agent.run_sync(posting_text)
job = result.data

print(f"Title: {job.job_title}")
print(f"Company: {job.company}")
print(f"Remote: {job.is_remote}")
print(f"Salary: ${job.salary_min:,} - ${job.salary_max:,}")
print(f"Skills: {', '.join(job.required_skills)}")
print(f"Experience: {job.experience_years}+ years")

Output:

Title: Senior Python Developer
Company: TechCorp Inc.
Remote: True
Salary: $140,000 - $180,000
Skills: FastAPI, PostgreSQL, Docker, AWS, distributed systems, async Python
Experience: 5+ years

The Field(description=...) text is injected into the prompt automatically. When a field is Optional with a default of None, Pydantic AI tells the LLM it can leave that field empty if the information is not in the text — which is the correct behavior for extraction tasks where some fields may not be present.

Using Dependencies for Context

Pydantic AI has a dependency injection system that lets you pass runtime context (database connections, API clients, configuration) into your agent’s system prompt and tools without hardcoding it:

# dependencies.py
from dataclasses import dataclass
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext

@dataclass
class UserContext:
    user_name: str
    preferred_language: str
    timezone: str

class Greeting(BaseModel):
    message: str
    is_formal: bool
    language: str

agent = Agent(
    "openai:gpt-4o-mini",
    result_type=Greeting,
    deps_type=UserContext,
    system_prompt="Generate an appropriate greeting based on the user context provided.",
)

@agent.system_prompt
def build_system_prompt(ctx: RunContext[UserContext]) -> str:
    return (
        f"The user's name is {ctx.deps.user_name}. "
        f"They prefer {ctx.deps.preferred_language} and are in timezone {ctx.deps.timezone}. "
        f"Generate a greeting appropriate for their context."
    )

# Pass different contexts for different users
ctx_alice = UserContext("Alice", "English", "UTC+10")
ctx_marco = UserContext("Marco", "Italian", "UTC+1")

result1 = agent.run_sync("Morning greeting please", deps=ctx_alice)
result2 = agent.run_sync("Morning greeting please", deps=ctx_marco)

print(result1.data.message)
print(result2.data.language)

Output:

Good morning, Alice! Hope you're starting your day well in Australia.
Italian

The @agent.system_prompt decorator registers a function that builds the system prompt dynamically from the dependencies at runtime. This pattern is much cleaner than string-formatting dependencies into a hardcoded system prompt — you get typed access to context via ctx.deps, and the system prompt updates automatically when you pass different dependencies.

Sudo Sam injecting typed dependency into AI agent
RunContext[Deps] — dependency injection for agents. No globals required.

Adding Tools to Pydantic AI Agents

Tools extend what your agent can do beyond just LLM calls — the agent can look up data, call APIs, or compute values during its reasoning process:

# tools_agent.py
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
import httpx

class WeatherSummary(BaseModel):
    city: str
    temperature_celsius: float
    condition: str
    recommendation: str

agent = Agent(
    "openai:gpt-4o-mini",
    result_type=WeatherSummary,
    system_prompt="You are a weather assistant. Use the get_weather tool to fetch real data, then provide a structured summary.",
)

@agent.tool_plain
def get_weather(city: str) -> dict:
    """
    Fetch current weather for a city from wttr.in (free, no API key required).
    
    Args:
        city: City name to get weather for
    """
    try:
        r = httpx.get(f"https://wttr.in/{city}?format=j1", timeout=8)
        r.raise_for_status()
        data = r.json()
        current = data["current_condition"][0]
        return {
            "temp_c": int(current["temp_C"]),
            "feels_like_c": int(current["FeelsLikeC"]),
            "description": current["weatherDesc"][0]["value"],
            "humidity": current["humidity"],
        }
    except Exception as e:
        return {"error": str(e)}

result = agent.run_sync("What's the weather like in Sydney right now?")
summary = result.data
print(f"City: {summary.city}")
print(f"Temp: {summary.temperature_celsius}C")
print(f"Condition: {summary.condition}")
print(f"Recommendation: {summary.recommendation}")

Output:

City: Sydney
Temp: 18.0C
Condition: Partly Cloudy
Recommendation: Light jacket recommended. Good conditions for outdoor activities.

The @agent.tool_plain decorator registers a function as a tool without dependency injection. Use @agent.tool (without _plain) if your tool needs access to the run context and dependencies. The LLM decides when to call the tool based on the function’s docstring — so write your docstrings carefully, describing exactly when the tool should be used.

Real-Life Example: Email Triage Pipeline

This complete pipeline extracts structured data from customer emails and classifies them for routing to the right team — the kind of task that previously required fragile regex and string parsing:

# email_triage.py
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from typing import List, Literal

class EmailTriage(BaseModel):
    subject_clean: str = Field(description="Clean, concise subject line (max 60 chars)")
    category: Literal["billing", "technical", "sales", "complaint", "general"] = Field(
        description="Best category for routing this email"
    )
    priority: Literal["urgent", "high", "normal", "low"] = Field(
        description="Priority based on urgency indicators in the email"
    )
    sentiment: Literal["positive", "neutral", "negative", "very_negative"] = Field(
        description="Overall customer sentiment"
    )
    key_issues: List[str] = Field(
        description="List of specific issues or questions the customer raised (max 3 items)"
    )
    suggested_response_tone: str = Field(
        description="One sentence describing the appropriate tone for the response"
    )
    requires_escalation: bool = Field(
        description="True if this should go to a senior team member"
    )

triage_agent = Agent(
    "openai:gpt-4o-mini",
    result_type=EmailTriage,
    system_prompt=(
        "You are a customer service triage specialist. Analyze incoming customer emails "
        "and extract structured classification data to route them to the right team. "
        "Be conservative with 'urgent' and 'requires_escalation' -- only use them when clearly warranted."
    ),
)

sample_emails = [
    {
        "from": "angry.customer@example.com",
        "body": "I have been charged TWICE for my subscription this month!! "
                "I need this fixed immediately or I am cancelling. "
                "This is absolutely unacceptable. My account is #12345."
    },
    {
        "from": "new.lead@bigcorp.com",
        "body": "Hi, we're a 500-person company looking for enterprise pricing. "
                "Could someone from your sales team reach out to discuss volume discounts? "
                "No rush, just exploring options."
    },
]

for email in sample_emails:
    result = triage_agent.run_sync(
        f"From: {email['from']}\n\nEmail body:\n{email['body']}"
    )
    t = result.data
    print(f"\n--- {email['from']} ---")
    print(f"Category: {t.category.upper()} | Priority: {t.priority} | Sentiment: {t.sentiment}")
    print(f"Subject: {t.subject_clean}")
    print(f"Issues: {t.key_issues}")
    print(f"Escalate: {t.requires_escalation}")
    print(f"Tone: {t.suggested_response_tone}")

Output:

--- angry.customer@example.com ---
Category: BILLING | Priority: urgent | Sentiment: very_negative
Subject: Double Charge on Subscription - Immediate Resolution Needed
Issues: ['Double charge on subscription', 'Threat to cancel account', 'Account #12345 affected']
Escalate: True
Tone: Apologetic, empathetic, and action-oriented -- acknowledge the error immediately.

--- new.lead@bigcorp.com ---
Category: SALES | Priority: normal | Sentiment: positive
Subject: Enterprise Pricing Inquiry - 500-Person Company
Issues: ['Interested in enterprise pricing', 'Requesting volume discount discussion']
Escalate: False
Tone: Enthusiastic and professional -- welcome the opportunity and offer a prompt follow-up.

Every field is typed. You can store these EmailTriage objects directly in a database, pass them to routing logic without parsing, and iterate over t.key_issues as a proper Python list. Replace the sample emails with real ones from an IMAP connection or an email webhook, and this becomes a production-ready triage pipeline in under 50 lines.

Cache Katie reviewing structured email triage dashboard
Literal[‘urgent’,’normal’,’low’] — the model picks one. Not ‘URGENT’. Not ‘asap’. One.

Frequently Asked Questions

How does Pydantic AI compare to the Instructor library?

Instructor is a lightweight library focused specifically on getting structured output from LLMs using Pydantic models. Pydantic AI is a fuller agent framework — it adds dependency injection, tools, multi-turn conversation support, and streaming. For simple extraction tasks, either works well. For building agents that need to call tools and maintain state across multiple LLM turns, Pydantic AI is the better choice. Both use the same Pydantic model definition syntax, so switching between them is straightforward.

What happens when the LLM returns invalid JSON or wrong types?

Pydantic AI catches the validation error, formats it into a clear message (e.g., “field ‘rating’ must be a float between 0 and 10, got ‘excellent'”), and sends it back to the LLM as a new message in the same conversation. The LLM tries again. By default it retries up to 3 times before raising a ModelRetry exception. You can configure the retry limit with Agent(retries=5). In practice, well-defined Pydantic models with good Field(description=...) text almost never need more than one retry.

Does Pydantic AI support async?

Yes. Use await agent.run() instead of agent.run_sync(). All tools defined with async def are awaited automatically. The async API is recommended for production applications where you are running multiple agent calls concurrently — it integrates cleanly with FastAPI, asyncio event loops, and other async frameworks. For scripts and notebooks, run_sync() is more convenient.

Can I stream the output as it’s generated?

Yes, with some limitations. Use agent.run_stream() to get an async iterator that yields partial results as the LLM generates them. For structured output, streaming gives you the raw token stream until the full JSON is complete — you cannot get a partially-validated Pydantic model. However, you can stream text fields and use result.is_complete to check when validation is done. Streaming is most useful for long responses where you want to show progress to the user.

Does Pydantic AI work with local models (Ollama)?

Yes. Use Agent("ollama:llama3.2") or any model tag from your Ollama installation. Local models are less reliable at structured output than GPT-4o or Claude Sonnet, especially for complex schemas. Start with simpler schemas (3-5 fields, basic types) and increase complexity as you gain confidence in your model’s capabilities. The retry mechanism compensates for occasional failures, but if retries are frequent, consider adding more detail to your Field(description=...) text or simplifying the schema.

Conclusion

Pydantic AI eliminates the most error-prone part of LLM integration — parsing unstructured text output — by making validated, typed responses the default. We covered defining result schemas with Pydantic models and Field descriptions, using dependency injection for runtime context, registering tools the agent can call, handling retries automatically, and building a complete email triage pipeline. Every result your agent returns is a proper Python object with type hints your IDE understands.

The next step is to take one piece of existing code where you are parsing LLM output with string operations or json.loads() and rewrite it using a Pydantic AI agent. The reliability improvement is immediate. Explore the official Pydantic AI documentation for advanced topics like multi-agent orchestration, streaming validation, and custom validators on result models.

How To Use Python MCP Server for AI Tool Integration

How To Use Python MCP Server for AI Tool Integration

Advanced

Imagine giving an AI assistant the ability to read files on your computer, query your database, call your internal APIs, or search your company’s knowledge base — all without the AI ever having direct access to those systems. This is exactly what the Model Context Protocol (MCP) enables. MCP is an open standard from Anthropic that defines how AI assistants communicate with external tools and data sources through a simple server-client protocol. You write a Python MCP server that exposes tools, and any MCP-compatible AI client (Claude Desktop, VS Code with Copilot, and others) can discover and use those tools automatically.

Building an MCP server in Python requires the mcp package from PyPI and basic knowledge of Python async programming. The server runs locally on your machine (or on a server your AI client can reach), exposes named tools with typed parameters, and returns results that the AI can read and use in its reasoning. You define tools as Python functions decorated with @mcp.tool() — the framework handles the protocol, discovery, and transport layer for you.

In this article we will cover what MCP is and how it works, how to install the Python MCP SDK, how to define tools and resources, how to handle typed inputs and error responses, how to run and test your server locally, and how to connect it to Claude Desktop. By the end you will have a working MCP server exposing real tools to an AI assistant.

Python MCP Server: Quick Example

Here is the smallest complete Python MCP server — one tool that returns the current timestamp. Save it as server.py and run it:

# server.py
from datetime import datetime
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("My First MCP Server")

@mcp.tool()
def get_current_time() -> str:
    """Return the current date and time."""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

if __name__ == "__main__":
    mcp.run()

Run and test with the MCP inspector:

# Install the SDK first
pip install mcp

# Run the server
python server.py

# In another terminal, test with the MCP inspector
npx @modelcontextprotocol/inspector python server.py

The MCP inspector opens a web UI at http://localhost:5173 where you can see the registered tools, call them manually, and inspect the protocol messages. When you call get_current_time, it returns something like "2026-05-20 09:15:32". This is the same response an AI assistant would receive when it invokes your tool during a conversation.

API Alice connecting Python server to AI interface
@mcp.tool() — your function, the AI’s hands.

What Is MCP and How Does It Work?

The Model Context Protocol (MCP) is a JSON-RPC 2.0 based protocol that standardizes how AI assistants discover and invoke external tools. Before MCP, every AI integration was custom — you wrote LangChain tools, OpenAI function-call schemas, or Anthropic tool definitions separately for each provider. MCP is the USB-C of AI tool integration: write one server, connect to any compatible client.

The protocol has three main concepts:

ConceptWhat it isPython equivalent
ToolsFunctions the AI can call to perform actions@mcp.tool() decorated functions
ResourcesRead-only data sources the AI can query@mcp.resource("uri://") decorators
PromptsReusable prompt templates the AI can reference@mcp.prompt() decorators

The transport layer can be stdio (for local tools, the most common), HTTP with Server-Sent Events (for remote servers), or WebSocket. Claude Desktop uses stdio transport: it launches your Python server as a subprocess, communicates via stdin/stdout, and manages the lifecycle automatically. Install the Python SDK with pip install mcp.

Defining Tools with Typed Parameters

The FastMCP class (the high-level API) uses Python type hints to automatically generate the JSON schema that clients use to discover your tools. Here is a server with multiple tools that demonstrate different parameter types:

# tools_server.py
from mcp.server.fastmcp import FastMCP
from typing import Optional
import os, json, math

mcp = FastMCP("Data Tools Server")

@mcp.tool()
def calculate(expression: str) -> str:
    """
    Safely evaluate a mathematical expression.
    
    Args:
        expression: A math expression like '2 + 2' or 'sqrt(16)'
    """
    safe_names = {k: v for k, v in math.__dict__.items() if not k.startswith("_")}
    safe_names.update({"abs": abs, "round": round})
    try:
        result = eval(expression, {"__builtins__": {}}, safe_names)
        return str(result)
    except Exception as e:
        return f"Error: {e}"

@mcp.tool()
def read_file(path: str, max_lines: Optional[int] = None) -> str:
    """
    Read a text file and return its contents.
    
    Args:
        path: Absolute path to the file
        max_lines: If set, only return this many lines from the start
    """
    if not os.path.isfile(path):
        return f"Error: File not found at {path}"
    try:
        with open(path, "r", encoding="utf-8") as f:
            lines = f.readlines()
        if max_lines is not None:
            lines = lines[:max_lines]
        return "".join(lines)
    except PermissionError:
        return f"Error: Permission denied reading {path}"

@mcp.tool()
def list_directory(path: str) -> str:
    """
    List files and directories at a given path.
    
    Args:
        path: Directory path to list
    """
    if not os.path.isdir(path):
        return f"Error: Not a directory: {path}"
    entries = []
    for entry in sorted(os.scandir(path), key=lambda e: (not e.is_dir(), e.name)):
        kind = "DIR " if entry.is_dir() else "FILE"
        size = entry.stat().st_size if entry.is_file() else "-"
        entries.append(f"{kind}  {entry.name}  ({size} bytes)")
    return "\n".join(entries) if entries else "(empty directory)"

if __name__ == "__main__":
    mcp.run()

Testing the calculate tool:

# Via MCP inspector or a test client:
# Tool: calculate, args: {"expression": "sqrt(144) + 2**8"}
# Response: "268.0"

# Tool: list_directory, args: {"path": "/tmp"}
# Response:
# FILE  example.txt  (42 bytes)
# FILE  log.json  (1024 bytes)

Notice that each tool has a detailed docstring — this is not just documentation for you, it is the description the AI reads to understand what the tool does and when to use it. Write tool docstrings as if you are explaining the tool to a smart but uninformed colleague. The Args: section documents each parameter, and the AI uses this to fill in the right values.

Adding Resources

Resources are read-only data sources that the AI can query by URI. Unlike tools (which perform actions), resources are for exposing structured data — configuration files, database views, API responses. Here is how to add a resource to the same server:

# resources_server.py (add to tools_server.py)
from mcp.server.fastmcp import FastMCP
import json, platform, sys

mcp = FastMCP("System Info Server")

@mcp.resource("system://info")
def get_system_info() -> str:
    """Provide current system information."""
    info = {
        "os": platform.system(),
        "os_version": platform.version(),
        "python_version": sys.version,
        "processor": platform.processor(),
        "hostname": platform.node(),
    }
    return json.dumps(info, indent=2)

@mcp.resource("config://app")
def get_app_config() -> str:
    """Return the application configuration."""
    config = {
        "debug": False,
        "max_file_size_mb": 100,
        "allowed_extensions": [".txt", ".csv", ".json", ".py"],
        "log_level": "INFO",
    }
    return json.dumps(config, indent=2)

if __name__ == "__main__":
    mcp.run()

The AI client can request the resource at its URI (system://info) and receive the JSON response. Resources differ from tools in that they are expected to be read-only and safe to call repeatedly — think of them like GET endpoints in a REST API, while tools are like POST endpoints that can have side effects.

Sudo Sam organizing MCP tools and resources in server room
Tools do things. Resources return data. The protocol enforces the difference.

Error Handling in MCP Tools

MCP tools communicate errors by either returning an error string or raising a McpError. The right choice depends on whether the error is expected (wrong input, file not found) or unexpected (server crash, dependency failure):

# error_handling.py
from mcp.server.fastmcp import FastMCP
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
import httpx

mcp = FastMCP("Error Demo Server")

@mcp.tool()
def fetch_url(url: str) -> str:
    """
    Fetch the content of a URL.
    
    Args:
        url: The URL to fetch (must start with https://)
    """
    # Validate input -- return descriptive error string for bad inputs
    if not url.startswith("https://"):
        return "Error: Only HTTPS URLs are supported for security reasons."
    
    try:
        response = httpx.get(url, timeout=10, follow_redirects=True)
        response.raise_for_status()
        # Return a truncated preview
        text = response.text[:2000]
        return f"Status: {response.status_code}\nContent (first 2000 chars):\n{text}"
    except httpx.TimeoutException:
        return "Error: Request timed out after 10 seconds."
    except httpx.HTTPStatusError as e:
        return f"Error: HTTP {e.response.status_code} from {url}"
    except Exception as e:
        # For unexpected errors, raise McpError so the client knows something went wrong
        raise McpError(ErrorData(code=-32603, message=f"Unexpected error: {e}"))

if __name__ == "__main__":
    mcp.run()

The defensive pattern here — validate inputs first, catch specific exceptions with descriptive messages, and only raise McpError for truly unexpected failures — gives the AI client enough information to tell the user what went wrong and suggest a fix. Return strings for recoverable user errors; raise McpError for server-side failures.

Connecting to Claude Desktop

To connect your MCP server to Claude Desktop, add a configuration entry to Claude’s config file. The location depends on your OS:

# macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
# Windows: %APPDATA%\Claude\claude_desktop_config.json

{
  "mcpServers": {
    "my-data-tools": {
      "command": "python",
      "args": ["/absolute/path/to/tools_server.py"],
      "env": {
        "PYTHONPATH": "/absolute/path/to/your/project"
      }
    }
  }
}

After saving this file and restarting Claude Desktop, your tools appear in the tool list. Claude will automatically use them when they are relevant to the conversation — for example, if you ask “what files are in /tmp?”, Claude will call your list_directory tool and show you the result.

Real-Life Example: Notes Manager MCP Server

This complete MCP server exposes tools for managing a simple local notes system — creating, reading, listing, and searching notes stored as plain text files:

# notes_server.py
from mcp.server.fastmcp import FastMCP
from typing import Optional
import os, glob, datetime

mcp = FastMCP("Notes Manager")
NOTES_DIR = os.path.expanduser("~/mcp_notes")
os.makedirs(NOTES_DIR, exist_ok=True)

@mcp.tool()
def create_note(title: str, content: str) -> str:
    """Create a new note with the given title and content."""
    safe_title = "".join(c if c.isalnum() or c in " -_" else "_" for c in title)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"{timestamp}_{safe_title[:40]}.txt"
    path = os.path.join(NOTES_DIR, filename)
    with open(path, "w", encoding="utf-8") as f:
        f.write(f"Title: {title}\nDate: {datetime.datetime.now().isoformat()}\n\n{content}")
    return f"Note created: {filename}"

@mcp.tool()
def list_notes() -> str:
    """List all saved notes with their titles and dates."""
    files = sorted(glob.glob(os.path.join(NOTES_DIR, "*.txt")), reverse=True)
    if not files:
        return "No notes found."
    lines = []
    for path in files[:20]:  # limit to 20 most recent
        name = os.path.basename(path)
        size = os.path.getsize(path)
        lines.append(f"  {name}  ({size} bytes)")
    return f"Found {len(files)} notes:\n" + "\n".join(lines)

@mcp.tool()
def search_notes(query: str) -> str:
    """Search note contents for a keyword or phrase."""
    files = glob.glob(os.path.join(NOTES_DIR, "*.txt"))
    matches = []
    for path in files:
        try:
            content = open(path, encoding="utf-8").read()
            if query.lower() in content.lower():
                name = os.path.basename(path)
                # Extract a snippet around the match
                idx = content.lower().find(query.lower())
                snippet = content[max(0, idx-50):idx+100].replace("\n", " ")
                matches.append(f"  {name}: ...{snippet}...")
        except Exception:
            pass
    if not matches:
        return f"No notes found containing '{query}'."
    return f"Found {len(matches)} matching notes:\n" + "\n".join(matches)

if __name__ == "__main__":
    mcp.run()

Example AI conversation using this server:

User: Create a note called "Meeting notes" with the content "Discussed Q3 roadmap"
AI: [calls create_note("Meeting notes", "Discussed Q3 roadmap")]
    Note created: 20260520_091532_Meeting_notes.txt

User: Search my notes for "roadmap"
AI: [calls search_notes("roadmap")]
    Found 1 matching notes:
      20260520_091532_Meeting_notes.txt: ...Discussed Q3 roadmap...

This server is a practical starting point for any personal productivity tool. You can extend it by adding a delete_note tool, a read_note tool that takes a filename, or a resource that exposes the notes directory as a browseable URI tree.

API Alex watching AI successfully call Python tools
The AI called your function. Your function returned JSON. That’s the whole stack.

Frequently Asked Questions

Is it safe to give an AI access to my file system via MCP?

Safety depends entirely on what your tools allow. The MCP server is just Python code — you control what the AI can access. In the read_file example above, we check that the path exists and return a permission error rather than letting the AI read arbitrary system files. A good practice is to restrict tools to a specific working directory using os.path.abspath() and checking that the resolved path starts with your allowed root directory. Never expose tools that can execute arbitrary code or delete files without confirmation.

Should my MCP tools be async?

Use async when your tools make I/O-bound calls like HTTP requests, database queries, or file reads on large files. FastMCP supports both sync and async tools: just define your function with async def and use await normally. For CPU-bound operations or quick operations, sync is fine. The server handles multiple tool calls concurrently using asyncio, so async tools are more efficient when the client sends multiple requests simultaneously.

Can multiple AI clients connect to the same MCP server?

Over stdio transport (the default for Claude Desktop), each connection launches a new server process — so each AI client gets its own isolated server instance. Over HTTP transport, a single server can handle multiple concurrent clients. For most local use cases, stdio is the right choice. If you want to share a server across multiple users or machines, use HTTP transport with mcp.run(transport="sse") and deploy it as you would any web service.

How do I debug an MCP server?

The MCP inspector (npx @modelcontextprotocol/inspector python server.py) is the best tool — it shows all protocol messages in both directions and lets you call tools manually. For logging during development, use Python’s logging module and write to a file (not stdout, since stdout is the protocol channel): logging.basicConfig(filename="mcp_debug.log", level=logging.DEBUG). The log file will capture all your logging.info() and logging.error() calls without interfering with the protocol.

Which AI clients support MCP besides Claude Desktop?

The MCP ecosystem has grown rapidly. Clients that support MCP include Claude Desktop, Zed editor (built-in), Cursor IDE, VS Code with the Claude extension, Continue.dev, and several open-source chat UIs. The protocol is model-agnostic — your Python MCP server works with any client regardless of the underlying LLM. Check the official MCP clients list for the current roster, as new integrations are added frequently.

Conclusion

Building a Python MCP server turns your Python functions into AI-accessible tools without any vendor lock-in. We covered the FastMCP framework, tool definition with typed parameters and docstrings, resource registration, error handling patterns, connecting to Claude Desktop, and a complete notes management server. The protocol handles discovery, schema generation, and transport — you just write Python functions.

The best next step is to start with the notes server above and add one more tool that solves a real problem you have — maybe a tool that queries a local SQLite database, or one that calls an internal API. The MCP Inspector documentation will help you test and debug. Once your server is working locally, explore HTTP transport and the MCP server registry for sharing your server with others.

How To Use pyproject.toml for Modern Python Project Setup

How To Use pyproject.toml for Modern Python Project Setup

Intermediate

If you have ever cloned a Python project and found a setup.py, a setup.cfg, a requirements.txt, a requirements-dev.txt, and a tox.ini all sitting in the root folder, you know the problem with the old way of doing things. Each file solved one part of the packaging puzzle, but together they created a mess that was hard to read, easy to get wrong, and painful to maintain. The Python community standardized a better solution: pyproject.toml. Since PEP 517 and PEP 518 (and later PEP 621), pyproject.toml is the single file that defines your project — its name, version, dependencies, dev tools, scripts, and build system — all in one place.

You do not need any special tool to use pyproject.toml. It is a standard TOML file that pip, uv, Poetry, Hatch, and every modern Python build tool understands. Whether you are building a library to publish to PyPI or just organizing a personal project, pyproject.toml gives you a clean, declarative way to describe it. The learning curve is minimal — TOML is simpler than both YAML and INI — and the payoff is a project structure that any Python developer can understand in seconds.

In this article we will cover the structure and syntax of pyproject.toml, how to declare project metadata and dependencies, how to define optional dev dependencies, how to configure tools like pytest, Black, and Ruff from the same file, how to define console scripts, and how to use the file with both pip and uv. By the end you will be able to replace your scattered configuration files with a single clean pyproject.toml.

pyproject.toml Quick Example

Here is a complete minimal pyproject.toml for a Python project. You can drop this into any project folder and it will work immediately with pip and uv:

# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "my-data-tool"
version = "0.1.0"
description = "A tool for processing data files"
requires-python = ">=3.11"
dependencies = [
    "polars>=0.20",
    "httpx>=0.27",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "ruff>=0.4",
]

[project.scripts]
data-tool = "my_data_tool.cli:main"

This single file tells every Python tool everything it needs to know: what your project is named, what Python version it requires, what packages to install, and what command-line entry point to create. Install it in a fresh virtualenv with pip install -e .[dev] and you will have the project plus all dev dependencies ready to go.

We will cover each section in detail below, including how to add tool-specific configuration for pytest, Ruff, and Black.

Sudo Sam consolidating scattered config files
Five config files walk into a repo. One pyproject.toml walks out.

What Is pyproject.toml?

pyproject.toml is a TOML configuration file that lives in the root of your Python project. TOML (Tom’s Obvious, Minimal Language) is a simple key-value format with support for nested sections using square brackets. It is easier to read than YAML and more structured than INI files. The file has three main purposes: declaring the build system (how to turn your source code into a distributable package), declaring project metadata (name, version, dependencies), and configuring dev tools (linters, formatters, test runners).

Old approachWhat it didReplaced by in pyproject.toml
setup.pyPackage metadata + build script[project] + [build-system]
setup.cfgDeclarative package metadata[project]
requirements.txtRuntime dependencies[project].dependencies
requirements-dev.txtDev dependencies[project.optional-dependencies].dev
tox.ini / .flake8 / mypy.iniTool configuration[tool.X] sections

The [build-system] table is the only required section. Everything else is optional but strongly recommended for any project you intend to share or maintain over time.

The [build-system] Section

The [build-system] table tells pip and other tools which backend to use to build your package. The three most common choices are Hatchling (modern, fast), setuptools (traditional, most compatible), and Flit (minimal, great for simple libraries):

# pyproject.toml -- build-system options

# Option 1: Hatchling (recommended for new projects)
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Option 2: setuptools (most compatible with legacy projects)
# [build-system]
# requires = ["setuptools>=68", "wheel"]
# build-backend = "setuptools.backends.legacy:build"

# Option 3: Flit (minimal, great for pure-Python libraries)
# [build-system]
# requires = ["flit_core>=3.4"]
# build-backend = "flit_core.buildapi"

Output when installing:

$ pip install -e .
Successfully built my-data-tool
Installing collected packages: my-data-tool
Successfully installed my-data-tool-0.1.0

If you are not publishing to PyPI and just want to organize your project locally, you can omit the [build-system] section entirely. But including it means any tool — pip, uv, build — knows how to package your code without guessing.

Project Metadata and Dependencies

The [project] table is where you declare your package. Here is a complete example with all the commonly used fields:

# pyproject.toml -- full project metadata
[project]
name = "weather-fetcher"
version = "1.2.0"
description = "Fetch and cache weather data from open APIs"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.11"
authors = [
    {name = "Your Name", email = "you@example.com"},
]
keywords = ["weather", "api", "data"]
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
]

# Runtime dependencies -- installed when someone does: pip install weather-fetcher
dependencies = [
    "httpx>=0.27",
    "tenacity>=8.0",     # retry logic
    "pydantic>=2.0",     # data validation
]

[project.optional-dependencies]
# Installed with: pip install weather-fetcher[dev]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=0.23",
    "ruff>=0.4",
    "mypy>=1.8",
]
# Installed with: pip install weather-fetcher[cache]
cache = [
    "redis>=5.0",
]

[project.scripts]
# Creates a 'weather' command that calls weather_fetcher/cli.py -> main()
weather = "weather_fetcher.cli:main"

Dependency version specifiers use the same syntax as pip: >= for minimum version, ~= for compatible release (e.g., ~=1.2 allows 1.2.x but not 1.3), and == for exact version. The optional dependencies (also called “extras”) let users install additional packages for specific use cases without bloating the base install.

Cache Katie sorting project dependencies into bins
optional-dependencies: users install what they need. You ship what you built.

Configuring Tools in [tool.*] Sections

One of the most useful features of pyproject.toml is that most Python dev tools can read their configuration from it. This means you can replace .flake8, .ruff.toml, mypy.ini, and pytest.ini with sections in the same file:

# pyproject.toml -- tool configuration

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
addopts = "--tb=short -q"

[tool.ruff]
line-length = 88
target-version = "py311"

[tool.ruff.lint]
select = ["E", "F", "I", "UP"]  # pycodestyle, pyflakes, isort, pyupgrade
ignore = ["E501"]

[tool.mypy]
python_version = "3.11"
strict = true
ignore_missing_imports = true

[tool.coverage.run]
source = ["weather_fetcher"]
omit = ["tests/*"]

[tool.coverage.report]
show_missing = true

Running the configured tools:

# Run tests (uses [tool.pytest.ini_options] automatically)
$ pytest

# Lint with Ruff (uses [tool.ruff] automatically)
$ ruff check .

# Type-check with mypy (uses [tool.mypy] automatically)
$ mypy weather_fetcher/

Each tool reads its own [tool.TOOLNAME] section when invoked from a directory containing a pyproject.toml. No flags, no extra config files. This makes CI pipelines simpler — one checkout, one config file, all tools use consistent settings.

Using pyproject.toml with uv

uv is the fastest modern Python package manager (written in Rust, 10-100x faster than pip). It reads pyproject.toml natively and adds a uv.lock lockfile for reproducible installs. Here is the complete workflow:

# terminal -- using pyproject.toml with uv
# Create a new project (creates pyproject.toml automatically)
uv init my-project
cd my-project

# Add runtime dependencies (updates pyproject.toml + uv.lock)
uv add httpx polars

# Add dev dependencies to the [dev] optional group
uv add --dev pytest ruff mypy

# Install everything (reads pyproject.toml + uv.lock)
uv sync

# Run a script in the project environment
uv run python my_project/main.py

# Run tests
uv run pytest

Output of uv add httpx:

Resolved 3 packages in 0.42s
   Built my-project @ file:///path/to/my-project
Prepared 1 package in 0.89s
Installed 1 package in 0.11s
 + httpx==0.27.2

When you run uv add, it updates both pyproject.toml (with the dependency specifier) and uv.lock (with the exact resolved version). Committing uv.lock to version control means every developer and every CI run installs exactly the same package versions — no “works on my machine” dependency drift.

Real-Life Example: Complete Project Setup

Here is a complete pyproject.toml for a realistic Python CLI tool, along with the minimal project structure it expects:

# pyproject.toml -- complete real project
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "csv-cleaner"
version = "0.3.0"
description = "Clean and validate CSV files from the command line"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "polars>=0.20",
    "rich>=13.0",    # pretty terminal output
    "typer>=0.12",   # CLI framework
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-cov>=5.0",
    "ruff>=0.4",
    "mypy>=1.8",
]

[project.scripts]
csv-cleaner = "csv_cleaner.cli:app"

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "--cov=csv_cleaner --cov-report=term-missing"

[tool.ruff.lint]
select = ["E", "F", "I"]

[tool.mypy]
python_version = "3.11"
strict = true

Expected project structure:

csv-cleaner/
  pyproject.toml      # everything defined here
  README.md
  csv_cleaner/
    __init__.py
    cli.py            # defines: app = typer.Typer()
    cleaner.py
  tests/
    test_cleaner.py

Install this project for development with pip install -e .[dev] or uv sync --extra dev. The csv-cleaner command will be available in your terminal. Teammates clone the repo, run one install command, and are ready to develop — no hunting through five config files to understand the project’s requirements.

API Alice handing clean project config to teammate
pip install -e .[dev] — one command, everything ready.

Frequently Asked Questions

Do I still need requirements.txt if I use pyproject.toml?

For most projects, no. The [project].dependencies list replaces requirements.txt, and [project.optional-dependencies].dev replaces requirements-dev.txt. If you need a pinned lockfile for reproducible CI installs, use uv.lock (generated by uv) or pip freeze > requirements-lock.txt. Some teams still generate a requirements.txt from the lockfile for compatibility with older Docker images, but it is no longer the primary source of truth.

Should I use Poetry, Hatch, or uv with pyproject.toml?

All three read pyproject.toml natively. uv is the fastest and simplest — install it, run uv init, and you are done. Poetry has more opinionated defaults and its own lockfile format (poetry.lock). Hatch is great for projects that need multiple Python version testing environments. For new projects in 2026, uv is the recommended choice because of its speed and compatibility with the standard pyproject.toml format that pip also understands.

How do I manage the version number without duplicating it?

Use dynamic = ["version"] in the [project] table and set the version in your source code. With Hatchling, add [tool.hatch.version] path = "my_package/__init__.py" and define __version__ = "1.0.0" in that file. Hatchling reads the version from there automatically. This means you only update the version in one place and it flows through to both the package metadata and your code.

How do I include non-Python files (data files, templates)?

With Hatchling, non-Python files inside your package directory are included automatically. For files outside the package (like config templates in a data/ folder), add them under [tool.hatch.build.targets.wheel.include]. With setuptools, use [tool.setuptools.package-data]. Either way, the key is that these rules all live in pyproject.toml — no need for a separate MANIFEST.in file for most use cases.

How do I migrate an existing project from setup.py to pyproject.toml?

Run pip install ini2toml and then ini2toml setup.cfg — this converts most setup.cfg files to pyproject.toml format automatically. For setup.py, you will need to manually copy the metadata into [project]. After migrating, test with pip install -e . in a fresh virtualenv, run your test suite, and build a distribution with python -m build to verify the package builds correctly before deleting the old files.

Conclusion

Adopting pyproject.toml is one of the highest-leverage improvements you can make to any Python project’s developer experience. We covered the four key sections — [build-system], [project], [project.optional-dependencies], and [tool.*] — and showed how they replace five or more legacy config files. The complete example demonstrated a realistic CLI project that any developer can clone and run with a single install command.

The best next step is to open one of your existing projects and start a pyproject.toml with just the [project] section, copying your dependencies from requirements.txt. Then progressively add tool configuration sections as you need them. The official Python Packaging Guide has the complete reference for every supported field.

How To Build an AI Agent with LangGraph in Python

How To Build an AI Agent with LangGraph in Python

Advanced

Building a basic chatbot with an LLM is easy — give it a prompt, get a response. But real-world AI applications need more: they need to remember what happened three steps ago, decide which tool to call based on context, loop back when something goes wrong, and hand off to different processing nodes based on what the user needs. This is where LangGraph comes in. It is a Python library built on top of LangChain that lets you model AI workflows as stateful graphs — with nodes that do work, edges that route between them, and a shared state object that persists across every step.

LangGraph works with any LLM supported by LangChain, including OpenAI, Anthropic Claude, Google Gemini, and local models via Ollama. The core concepts — graphs, nodes, state, and conditional routing — are not tied to any specific model. You define the shape of your state, write Python functions for each node, connect them with edges, and LangGraph handles the execution loop. For local testing without an API key, we will show patterns that work with any chat model.

In this article we will cover how LangGraph works and when to use it over plain LangChain, how to define a TypedDict state schema, how to build nodes and edges, how to add conditional routing, how to integrate tool calling, and how to build a working multi-step agent. By the end you will have a complete LangGraph agent you can adapt for your own use case.

LangGraph in Python: Quick Example

Here is the smallest complete LangGraph program — a two-node graph that processes a message through a rewrite step and then a summary step, sharing state between them:

# quick_langgraph.py
from typing import TypedDict
from langgraph.graph import StateGraph, END

class AgentState(TypedDict):
    message: str
    rewritten: str
    summary: str

def rewrite_node(state: AgentState) -> dict:
    """Capitalize and clean the input message."""
    rewritten = state["message"].strip().capitalize() + "."
    return {"rewritten": rewritten}

def summarize_node(state: AgentState) -> dict:
    """Produce a brief summary of the rewritten text."""
    summary = f"Processed: {state['rewritten'][:50]}"
    return {"summary": summary}

# Build the graph
graph = StateGraph(AgentState)
graph.add_node("rewrite", rewrite_node)
graph.add_node("summarize", summarize_node)
graph.set_entry_point("rewrite")
graph.add_edge("rewrite", "summarize")
graph.add_edge("summarize", END)

app = graph.compile()
result = app.invoke({"message": "  hello world from langgraph  ", "rewritten": "", "summary": ""})
print(result)

Output:

{'message': '  hello world from langgraph  ', 'rewritten': 'Hello world from langgraph.', 'summary': 'Processed: Hello world from langgraph.'}

Each node is a plain Python function that receives the current state dict and returns a dict of updates to merge into the state. The graph wires the nodes together, and .invoke() runs the full pipeline. The state is automatically merged at each step — you only return the keys you changed, and LangGraph merges them with the existing state.

API Alice orchestrating LangGraph nodes
Nodes are just functions. Edges are just conditions. The graph is your app logic.

What Is LangGraph and When Should You Use It?

LangGraph models an AI workflow as a directed graph. Each node is a processing step (an LLM call, a tool invocation, a decision function). Each edge is a connection between steps. The state is a typed dictionary that flows through every node and accumulates results. Conditional edges let the graph branch based on state — routing to different nodes depending on what happened in the previous step.

The key difference between LangGraph and a simple function pipeline is that LangGraph supports cycles. A graph node can route back to an earlier node, enabling retry loops, multi-turn conversations, and agentic “think-act-observe” patterns where the agent can take multiple steps before returning a final answer.

ScenarioUse LangChain chainUse LangGraph
Simple prompt + responseYesOverkill
Multi-step pipeline (no loops)Yes (LCEL)Fine either way
Conditional branchingHardYes
Retry loops / cyclesNot supportedYes
Multi-agent coordinationNot supportedYes
Persistent state across turnsManualBuilt-in with checkpointers

Install LangGraph and the LangChain OpenAI integration with pip:

# terminal
pip install langgraph langchain langchain-openai

Defining the State Schema

Every LangGraph graph has a state schema defined as a TypedDict. This is the single source of truth for what data flows through the graph. Every node reads from this dict and returns a subset of it to update:

# state_schema.py
from typing import TypedDict, List, Optional

class ResearchState(TypedDict):
    # Input
    question: str
    # Accumulated during the run
    search_results: List[str]
    analysis: str
    # Control flow
    iterations: int
    max_iterations: int
    # Final output
    answer: Optional[str]

The TypedDict approach gives you type hints throughout your graph — your IDE can autocomplete state keys, and you get immediate feedback if a node returns an unexpected key. Some teams use Pydantic models instead for validation, but TypedDict is the standard pattern in LangGraph documentation and examples.

Building Nodes and Edges

Nodes are Python functions (sync or async) that take the state as input and return a dictionary of updates. Edges connect nodes and can be unconditional (always go to node B after node A) or conditional (go to B or C based on state):

# nodes_edges.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, END

class PipelineState(TypedDict):
    text: str
    tokens: List[str]
    word_count: int
    flagged: bool

def tokenize_node(state: PipelineState) -> dict:
    tokens = state["text"].lower().split()
    return {"tokens": tokens, "word_count": len(tokens)}

def check_content_node(state: PipelineState) -> dict:
    bad_words = {"spam", "scam", "free", "click"}
    flagged = any(t in bad_words for t in state["tokens"])
    return {"flagged": flagged}

def approve_node(state: PipelineState) -> dict:
    print(f"APPROVED: '{state['text']}' ({state['word_count']} words)")
    return {}

def reject_node(state: PipelineState) -> dict:
    print(f"REJECTED: '{state['text']}' -- contains flagged content")
    return {}

def route_by_flag(state: PipelineState) -> str:
    """Conditional edge: route to approve or reject based on flagged field."""
    return "reject" if state["flagged"] else "approve"

graph = StateGraph(PipelineState)
graph.add_node("tokenize", tokenize_node)
graph.add_node("check_content", check_content_node)
graph.add_node("approve", approve_node)
graph.add_node("reject", reject_node)

graph.set_entry_point("tokenize")
graph.add_edge("tokenize", "check_content")
graph.add_conditional_edges("check_content", route_by_flag, {
    "approve": "approve",
    "reject": "reject",
})
graph.add_edge("approve", END)
graph.add_edge("reject", END)

app = graph.compile()

app.invoke({"text": "Buy our product today", "tokens": [], "word_count": 0, "flagged": False})
app.invoke({"text": "Click here for free spam", "tokens": [], "word_count": 0, "flagged": False})

Output:

APPROVED: 'Buy our product today' (4 words)
REJECTED: 'Click here for free spam' -- contains flagged content

The add_conditional_edges call takes the source node, a routing function that returns a string, and a mapping from those strings to destination nodes. This is how LangGraph implements branching — the routing function inspects the state and returns a key that maps to the next node to run.

Debug Dee routing agent state decisions
add_conditional_edges() — the line between a chatbot and an agent.

Adding Tool Calling

LangGraph agents commonly use tools — Python functions the LLM can invoke. Here is a self-contained example that defines tools and routes through them without requiring an API key, using a mock LLM call:

# tool_calling.py
import json
from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph, END

# --- Define tools ---
def get_weather(city: str) -> str:
    """Mock weather tool."""
    data = {
        "Sydney": "28C, sunny",
        "London": "12C, cloudy",
        "New York": "18C, partly cloudy",
    }
    return data.get(city, "Weather data unavailable")

def calculate(expression: str) -> str:
    """Safe math evaluator."""
    try:
        result = eval(expression, {"__builtins__": {}}, {})
        return str(result)
    except Exception as e:
        return f"Error: {e}"

TOOLS = {"get_weather": get_weather, "calculate": calculate}

# --- Graph state ---
class ToolState(TypedDict):
    request: Dict[str, Any]  # {"tool": "...", "args": {...}}
    tool_result: str
    final_answer: str

def call_tool_node(state: ToolState) -> dict:
    req = state["request"]
    tool_fn = TOOLS.get(req["tool"])
    if tool_fn:
        result = tool_fn(**req["args"])
    else:
        result = f"Unknown tool: {req['tool']}"
    return {"tool_result": result}

def format_answer_node(state: ToolState) -> dict:
    answer = f"Tool '{state['request']['tool']}' returned: {state['tool_result']}"
    return {"final_answer": answer}

graph = StateGraph(ToolState)
graph.add_node("call_tool", call_tool_node)
graph.add_node("format", format_answer_node)
graph.set_entry_point("call_tool")
graph.add_edge("call_tool", "format")
graph.add_edge("format", END)
app = graph.compile()

# Test weather tool
r1 = app.invoke({"request": {"tool": "get_weather", "args": {"city": "Sydney"}},
                 "tool_result": "", "final_answer": ""})
print(r1["final_answer"])

# Test calculator tool
r2 = app.invoke({"request": {"tool": "calculate", "args": {"expression": "250 * 1.1"}},
                 "tool_result": "", "final_answer": ""})
print(r2["final_answer"])

Output:

Tool 'get_weather' returned: 28C, sunny
Tool 'calculate' returned: 275.0

In production, the tool-calling node would parse the LLM output, extract the tool name and arguments from the model’s JSON response, call the tool, and return the result to feed back into the LLM in the next iteration. LangChain provides ToolNode and tools_condition helpers that automate this pattern for OpenAI function-calling and Anthropic tool-use formats.

Real-Life Example: Multi-Step Research Agent

This example builds a complete research agent that breaks a question into sub-tasks, processes each one, checks quality, and loops if needed — demonstrating cycles, state accumulation, and conditional routing in a realistic pattern:

# research_agent.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, END

class ResearchState(TypedDict):
    question: str
    sub_questions: List[str]
    answers: List[str]
    current_index: int
    quality_score: int
    final_report: str

def decompose_node(state: ResearchState) -> dict:
    """Break the main question into sub-questions."""
    q = state["question"]
    sub_qs = [
        f"What is the definition of: {q}?",
        f"What are the main use cases of: {q}?",
        f"What are the limitations of: {q}?",
    ]
    print(f"Decomposed into {len(sub_qs)} sub-questions.")
    return {"sub_questions": sub_qs, "current_index": 0}

def research_node(state: ResearchState) -> dict:
    """Process the current sub-question (mock LLM call)."""
    idx = state["current_index"]
    sq = state["sub_questions"][idx]
    # In production: call your LLM here
    answer = f"[Answer to '{sq}' -- expand with LLM call]"
    answers = state["answers"] + [answer]
    print(f"Researched sub-question {idx + 1}/{len(state['sub_questions'])}")
    return {"answers": answers, "current_index": idx + 1}

def should_continue(state: ResearchState) -> str:
    """Continue researching or move to synthesis."""
    if state["current_index"] < len(state["sub_questions"]):
        return "continue"
    return "synthesize"

def synthesize_node(state: ResearchState) -> dict:
    """Combine answers into a final report."""
    report = f"Research Report: {state['question']}\n"
    report += "\n".join(f"  - {a}" for a in state["answers"])
    score = min(len(state["answers"]) * 30, 90)
    return {"final_report": report, "quality_score": score}

def quality_check(state: ResearchState) -> str:
    return "done" if state["quality_score"] >= 70 else "redo"

def finish_node(state: ResearchState) -> dict:
    print("\n=== FINAL REPORT ===")
    print(state["final_report"])
    return {}

graph = StateGraph(ResearchState)
graph.add_node("decompose", decompose_node)
graph.add_node("research", research_node)
graph.add_node("synthesize", synthesize_node)
graph.add_node("finish", finish_node)

graph.set_entry_point("decompose")
graph.add_edge("decompose", "research")
graph.add_conditional_edges("research", should_continue, {
    "continue": "research",
    "synthesize": "synthesize",
})
graph.add_conditional_edges("synthesize", quality_check, {
    "done": "finish",
    "redo": "decompose",
})
graph.add_edge("finish", END)

app = graph.compile()
result = app.invoke({
    "question": "Python asyncio",
    "sub_questions": [], "answers": [],
    "current_index": 0, "quality_score": 0, "final_report": ""
})

Output:

Decomposed into 3 sub-questions.
Researched sub-question 1/3
Researched sub-question 2/3
Researched sub-question 3/3

=== FINAL REPORT ===
Research Report: Python asyncio
  - [Answer to 'What is the definition of: Python asyncio?' ...]
  - [Answer to 'What are the main use cases of: Python asyncio?' ...]
  - [Answer to 'What are the limitations of: Python asyncio?' ...]

The research node cycles back to itself via the should_continue conditional edge — this is the loop that processes one sub-question per iteration. Once all sub-questions are answered, it routes to synthesize. The quality check then either finishes or loops back to decompose to try again. Replace the mock LLM calls with ChatOpenAI or ChatAnthropic to make this a real research agent.

Sudo Sam reviewing cyclic agent graph
The graph loops until the condition says stop. Make sure the condition says stop.

Frequently Asked Questions

Does LangGraph support streaming?

Yes. Use app.stream(initial_state) instead of app.invoke(). This returns an iterator that yields the state update after each node completes, letting you stream intermediate results to a UI or log in real time. You can also use app.astream() for async streaming with async for. Streaming is particularly useful when one node’s output needs to be displayed while the next node is still processing.

How do I add persistent memory to a LangGraph agent?

LangGraph supports checkpointers that save state between invocations. Use from langgraph.checkpoint.sqlite import SqliteSaver for local persistence or from langgraph.checkpoint.postgres import PostgresSaver for production. Pass the checkpointer to graph.compile(checkpointer=saver). Then provide a thread_id in the config when invoking: app.invoke(state, config={"configurable": {"thread_id": "user-123"}}). This enables multi-turn conversations where the agent remembers previous exchanges.

Can LangGraph handle multiple agents?

Yes. LangGraph supports supervisor patterns where one agent node routes tasks to specialized sub-agent nodes. Each sub-agent can itself be a compiled LangGraph graph, and you can use app.invoke() inside a node to call a sub-agent. This is the recommended pattern for building complex systems like a coding agent that delegates browser searches to a web-search specialist agent and code execution to a sandbox agent.

How does LangGraph compare to AutoGen or CrewAI?

LangGraph gives you explicit control over the graph structure, state, and routing logic — you define exactly how agents communicate. AutoGen and CrewAI abstract more away and are easier to start with but harder to customize. LangGraph is the better choice when you need precise control over agent behavior, deterministic routing, or integration with an existing LangChain codebase. AutoGen and CrewAI are better for quickly prototyping multi-agent conversations without writing graph code.

How do I debug a LangGraph graph?

LangGraph Studio (the desktop GUI) is the best debugging tool — it visualizes the graph, lets you step through nodes, and shows state at each step. For terminal debugging, use app.stream() with a print statement after each yielded event: for event in app.stream(state): print(event). You can also enable LangSmith tracing by setting LANGCHAIN_TRACING_V2=true in your environment, which logs every node invocation with inputs and outputs to the LangSmith cloud dashboard.

Conclusion

LangGraph makes stateful multi-step AI agents practical to build in Python. We covered the core building blocks — state schemas with TypedDict, nodes as plain Python functions, unconditional and conditional edges for routing, cycles for loops, and tool-calling patterns. The research agent example showed how these pieces combine into an agent that decomposes problems, processes them iteratively, and checks its own output quality before finishing.

The natural next step is to replace the mock processing in the research agent with real LLM calls using ChatOpenAI or ChatAnthropic, then add a SqliteSaver checkpointer to give it persistent memory. The official LangGraph documentation has excellent tutorials on both patterns, including pre-built ReAct agent templates that handle tool-calling boilerplate for you.

How To Use Polars for Fast DataFrame Operations in Python

How To Use Polars for Fast DataFrame Operations in Python

Intermediate

You load a CSV with a million rows into pandas, run a few groupby operations, and then — you wait. And wait. The spinner in Jupyter keeps spinning while your CPU fans kick in. If you have ever hit this wall, you know exactly why the Python data community got excited about Polars. Written in Rust and built around the Apache Arrow memory model, Polars processes data 5x to 50x faster than pandas for many common operations — without requiring you to rewrite everything in C++.

Polars is a modern DataFrame library that runs on your laptop or a server with no special setup. You install it with pip, import it, and use an API that feels familiar if you know pandas — but with some important design differences that make your code faster by default. There is no index to manage, lazy evaluation is built in, and operations are automatically parallelized across your CPU cores.

In this article we will cover how Polars works and why it is faster than pandas, how to create and load DataFrames, how to filter, group, and aggregate data, how to use lazy execution for maximum performance, and how to work with real CSV and JSON data. By the end you will be able to replace your slowest pandas scripts with Polars and see immediate speed improvements.

Polars in Python: Quick Example

Here is a complete working example that shows the core Polars workflow — create a DataFrame, filter rows, group by a column, and compute aggregates — all in under 20 lines:

# quick_polars.py
import polars as pl

df = pl.DataFrame({
    "name": ["Alice", "Bob", "Carol", "Dave", "Eve"],
    "department": ["Eng", "Eng", "HR", "HR", "Eng"],
    "salary": [95000, 88000, 72000, 68000, 102000],
})

result = (
    df.filter(pl.col("salary") > 70000)
    .group_by("department")
    .agg(
        pl.col("salary").mean().alias("avg_salary"),
        pl.col("name").count().alias("headcount"),
    )
    .sort("avg_salary", descending=True)
)

print(result)

Output:

shape: (2, 3)
+------------+------------+-----------+
| department | avg_salary | headcount |
| ---        | ---        | ---       |
| str        | f64        | u32       |
+============+============+===========+
| Eng        | 95000.0    | 3         |
| HR         | 72000.0    | 1         |
+------------+------------+-----------+

Notice how pl.col("salary") is used to reference columns — this is the Polars expression API and it is at the heart of everything. Unlike pandas where you often chain method calls on Series objects, Polars uses expressions that describe what to compute, letting the engine optimize how to compute it. The .alias() call renames the output column, and .sort() orders the final result.

We will go deeper into expressions, lazy frames, and real-world data loading in the sections below.

Cache Katie with speed gauges and bar charts
Polars: same API as pandas, minus the wait.

What Is Polars and Why Is It Faster Than pandas?

Polars is a DataFrame library written in Rust with Python bindings. It stores data in the Apache Arrow columnar format, which means all values of the same column are laid out contiguously in memory. This makes operations like summing a column or filtering rows extremely cache-friendly — the CPU can process thousands of values per second without constantly jumping around in memory.

pandas, by contrast, was designed in 2008 when multi-core CPUs were less common and the NumPy array model was state of the art. pandas operations are largely single-threaded, use Python’s GIL extensively, and have an index system that adds overhead on most operations. Polars sidesteps all three problems: it is multi-threaded by default, written in Rust with no GIL, and has no index.

FeaturepandasPolars
Language corePython + NumPyRust + Arrow
Multi-threadingNo (GIL-limited)Yes (automatic)
Lazy evaluationNoYes (LazyFrame)
IndexRequiredNone
Memory usageHigherLower (Arrow format)
API styleMethod chaining on SeriesExpression-based

The practical result: for operations on DataFrames over 100,000 rows, Polars is typically 5x to 30x faster. For CSV parsing alone, Polars is often 10x faster than pandas because it uses all available CPU cores in parallel. Let us see how to install and use it.

Installing Polars

Polars is available on PyPI and installs with a single command. It has no dependency on pandas or NumPy, so the install is clean and fast:

# terminal
pip install polars

Output:

Successfully installed polars-0.20.x

If you work with large datasets and want the optional extras (like connecting to cloud storage or reading Parquet files faster), you can install the full extras bundle with pip install polars[all]. For most tutorial purposes, the base install is all you need.

Creating DataFrames in Polars

Polars DataFrames can be created from Python dictionaries, lists, or loaded from files. The dictionary approach is the most common for creating test data:

# create_dataframes.py
import polars as pl

# From a dictionary -- each key is a column name, value is a list
df = pl.DataFrame({
    "product": ["Widget", "Gadget", "Doohickey", "Thingamajig"],
    "price": [9.99, 24.99, 4.49, 14.99],
    "units_sold": [1500, 340, 2100, 780],
    "in_stock": [True, True, False, True],
})

print(df)
print()
print("Shape:", df.shape)
print("Columns:", df.columns)
print("Dtypes:", df.dtypes)

Output:

shape: (4, 4)
+-------------+-------+------------+----------+
| product     | price | units_sold | in_stock |
| ---         | ---   | ---        | ---      |
| str         | f64   | i64        | bool     |
+=============+=======+============+==========+
| Widget      | 9.99  | 1500       | true     |
| Gadget      | 24.99 | 340        | true     |
| Doohickey   | 4.49  | 2100       | false    |
| Thingamajig | 14.99 | 780        | true     |
+-------------+-------+------------+----------+

Shape: (4, 4)
Columns: ['product', 'price', 'units_sold', 'in_stock']
Dtypes: [String, Float64, Int64, Boolean]

Polars automatically infers types from your data. Strings become String, floats become Float64, integers become Int64, and booleans become Boolean. Unlike pandas, there is no ambiguous “object” dtype for strings — Polars always knows exactly what type each column holds, which is one reason it can optimize operations so aggressively.

Reading CSV and JSON Files

Polars reads CSV files dramatically faster than pandas because it splits the file across CPU cores and parses each chunk in parallel. Here is how to read a CSV, check its schema, and do a quick preview:

# read_csv_polars.py
import polars as pl

# We will create a sample CSV first so the code runs standalone
import csv, tempfile, os

data = [
    ["date", "city", "temp_c", "humidity"],
    ["2024-01-01", "Sydney", "28.5", "62"],
    ["2024-01-01", "Melbourne", "22.1", "75"],
    ["2024-01-02", "Sydney", "30.2", "58"],
    ["2024-01-02", "Melbourne", "19.8", "80"],
    ["2024-01-03", "Sydney", "26.7", "70"],
]

tmpfile = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='')
writer = csv.writer(tmpfile)
writer.writerows(data)
tmpfile.close()

# Read with Polars
df = pl.read_csv(tmpfile.name)
print(df)
print()

# Polars can also infer date columns
df2 = pl.read_csv(tmpfile.name, try_parse_dates=True)
print("Schema with date parsing:", df2.dtypes)

os.unlink(tmpfile.name)

Output:

shape: (5, 4)
+------------+-----------+--------+----------+
| date       | city      | temp_c | humidity |
+------------+-----------+--------+----------+
| 2024-01-01 | Sydney    | 28.5   | 62       |
| 2024-01-01 | Melbourne | 22.1   | 75       |
| 2024-01-02 | Sydney    | 30.2   | 58       |
| 2024-01-02 | Melbourne | 19.8   | 80       |
| 2024-01-03 | Sydney    | 26.7   | 70       |
+------------+-----------+--------+----------+

Schema with date parsing: [Date, String, Float64, Int64]

The try_parse_dates=True argument tells Polars to automatically detect date and datetime columns. This saves you the extra step of converting string columns to dates after loading — a common pain point in pandas workflows. For JSON files, use pl.read_json() or pl.read_ndjson() for newline-delimited JSON.

Pyro Pete sprinting through data tables
Eager vs lazy: one runs every line, one waits for you to actually need the answer.

Filtering and Selecting Data

In Polars, filtering and column selection use the expression API with pl.col(). This is the biggest conceptual shift from pandas, but once you get the pattern it is very readable:

# filter_select.py
import polars as pl

df = pl.DataFrame({
    "name": ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank"],
    "age": [28, 34, 22, 45, 31, 29],
    "score": [88.5, 72.3, 95.1, 61.0, 83.7, 90.2],
    "passed": [True, True, True, False, True, True],
})

# Filter rows where passed=True AND score > 80
high_scorers = df.filter(
    (pl.col("passed") == True) & (pl.col("score") > 80)
)
print("High scorers:")
print(high_scorers)
print()

# Select specific columns
names_scores = df.select(["name", "score"])
print("Names and scores:")
print(names_scores)
print()

# Add a computed column
with_grade = df.with_columns(
    pl.when(pl.col("score") >= 90).then(pl.lit("A"))
    .when(pl.col("score") >= 80).then(pl.lit("B"))
    .otherwise(pl.lit("C"))
    .alias("grade")
)
print("With grades:")
print(with_grade)

Output:

High scorers:
shape: (4, 4)
| name  | age | score | passed |
| Alice | 28  | 88.5  | true   |
| Carol | 22  | 95.1  | true   |
| Eve   | 31  | 83.7  | true   |
| Frank | 29  | 90.2  | true   |

Names and scores:
| name  | score |
| Alice | 88.5  |
...

With grades:
| name  | age | score | passed | grade |
| Alice | 28  | 88.5  | true   | B     |
| Carol | 22  | 95.1  | true   | A     |
...

The pl.when().then().otherwise() pattern is Polars’ equivalent of a vectorized if-else, similar to numpy.where. The key difference from pandas is that all of these operations are described as expressions and can be computed in parallel — Polars may even reorder them internally for optimal performance.

Group By and Aggregation

Group-by aggregations are where Polars really shines. Multiple aggregations on the same group are computed in parallel, and the expression API lets you write complex aggregations in a single readable call:

# groupby_agg.py
import polars as pl

df = pl.DataFrame({
    "region": ["North", "South", "North", "South", "North", "East"],
    "product": ["Widget", "Widget", "Gadget", "Gadget", "Widget", "Gadget"],
    "revenue": [1200, 980, 450, 620, 1350, 800],
    "units": [120, 98, 45, 62, 135, 80],
})

summary = (
    df.group_by(["region", "product"])
    .agg([
        pl.col("revenue").sum().alias("total_revenue"),
        pl.col("units").sum().alias("total_units"),
        (pl.col("revenue").sum() / pl.col("units").sum()).alias("revenue_per_unit"),
    ])
    .sort(["region", "product"])
)

print(summary)

Output:

shape: (4, 5)
+--------+---------+---------------+-------------+------------------+
| region | product | total_revenue | total_units | revenue_per_unit |
+--------+---------+---------------+-------------+------------------+
| East   | Gadget  | 800           | 80          | 10.0             |
| North  | Widget  | 2550          | 255         | 10.0             |
| North  | Gadget  | 450           | 45          | 10.0             |
| South  | Widget  | 980           | 98          | 10.0             |
+--------+---------+---------------+-------------+------------------+

You can pass multiple aggregations in a single list to .agg(). Polars computes all of them simultaneously using all available CPU cores. On a 4-core machine running a 10-million-row dataset, this single change from pandas to Polars can take a groupby from 8 seconds down to under 1 second.

Lazy Evaluation with LazyFrame

Polars has an eager mode (what we have been using) and a lazy mode. Lazy frames do not execute operations immediately — they build up a query plan that gets optimized and executed only when you call .collect(). This is where the biggest performance gains live for complex pipelines:

# lazy_frame.py
import polars as pl

# Create a larger sample dataset
import random
random.seed(42)
n = 100_000
df = pl.DataFrame({
    "user_id": list(range(n)),
    "country": random.choices(["AU", "US", "UK", "CA", "DE"], k=n),
    "spend": [round(random.uniform(5, 500), 2) for _ in range(n)],
    "category": random.choices(["Electronics", "Clothing", "Food", "Books"], k=n),
})

# Build a lazy query -- nothing executes yet
lazy_result = (
    df.lazy()
    .filter(pl.col("spend") > 100)
    .group_by(["country", "category"])
    .agg(
        pl.col("spend").sum().alias("total_spend"),
        pl.col("user_id").count().alias("customers"),
    )
    .sort("total_spend", descending=True)
    .limit(5)
)

# Execute the optimized query
result = lazy_result.collect()
print(result)

Output:

shape: (5, 4)
+------+-------------+-------------+-----------+
| country | category    | total_spend | customers |
+---------+-------------+-------------+-----------+
| US      | Electronics | 1254832.0   | 5012      |
| AU      | Electronics | 1248901.0   | 4988      |
| CA      | Electronics | 1241234.0   | 4956      |
| UK      | Electronics | 1238123.0   | 4942      |
| US      | Clothing    | 1225891.0   | 4893      |
+---------+-------------+-------------+-----------+

The lazy frame optimizer can push the .filter() before the .group_by(), reducing the number of rows processed in the aggregation. It can also eliminate columns that are not needed in the final output. When reading from files, pl.scan_csv() creates a lazy frame that only reads the columns you actually use, making I/O much faster for wide files with many columns.

Sudo Sam controlling lazy evaluation
collect() is not optional. Neither is knowing why it exists.

Real-Life Example: Sales Data Analyzer

Let us build a practical sales analysis tool that loads data, cleans it, and produces a summary report — the kind of script you might actually use to analyze a business dataset:

# sales_analyzer.py
import polars as pl
import tempfile, csv, os

# Create a realistic sample sales CSV
rows = [
    ["order_id", "date", "customer", "product", "category", "quantity", "unit_price"],
    ["1001", "2024-01-05", "Acme Corp", "Widget Pro", "Hardware", "50", "29.99"],
    ["1002", "2024-01-07", "Beta Ltd", "Cloud Sub", "Software", "10", "99.00"],
    ["1003", "2024-01-10", "Acme Corp", "Widget Pro", "Hardware", "30", "29.99"],
    ["1004", "2024-01-12", "Gamma Inc", "Gadget Plus", "Hardware", "20", "49.99"],
    ["1005", "2024-01-15", "Beta Ltd", "Widget Pro", "Hardware", "15", "29.99"],
    ["1006", "2024-01-20", "Delta Co", "Cloud Sub", "Software", "5", "99.00"],
    ["1007", "2024-01-22", "Acme Corp", "Gadget Plus", "Hardware", "8", "49.99"],
    ["1008", "2024-01-28", "Gamma Inc", "Cloud Sub", "Software", "12", "99.00"],
]

tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='')
csv.writer(tmp).writerows(rows)
tmp.close()

# Load and analyze with Polars LazyFrame
result = (
    pl.scan_csv(tmp.name, try_parse_dates=True)
    .with_columns(
        (pl.col("quantity") * pl.col("unit_price")).alias("revenue")
    )
    .group_by(["customer", "category"])
    .agg([
        pl.col("revenue").sum().alias("total_revenue"),
        pl.col("quantity").sum().alias("total_units"),
        pl.col("order_id").count().alias("orders"),
    ])
    .sort("total_revenue", descending=True)
    .collect()
)

print("=== Sales Summary by Customer and Category ===")
print(result)
print()

# Top customer overall
top = result.group_by("customer").agg(
    pl.col("total_revenue").sum().alias("grand_total")
).sort("grand_total", descending=True)

print("=== Top Customers ===")
print(top)

os.unlink(tmp.name)

Output:

=== Sales Summary by Customer and Category ===
shape: (6, 5)
+----------+----------+---------------+-------------+--------+
| customer | category | total_revenue | total_units | orders |
+----------+----------+---------------+-------------+--------+
| Beta Ltd | Software | 1980.0        | 20          | 2      |
| Acme Corp| Hardware | 2289.7        | 88          | 3      |
...

=== Top Customers ===
+----------+-------------+
| customer | grand_total |
+----------+-------------+
| Acme Corp| 2689.42     |
| Beta Ltd | 1980.0      |
...

This script uses pl.scan_csv() for lazy reading, adds a computed revenue column before aggregating (so the computation is pushed into the optimal position by the query planner), and chains two group-by operations without ever converting to an intermediate pandas DataFrame. You can extend this by replacing the temp file with your actual CSV path and adding more aggregation columns to the .agg() call.

Frequently Asked Questions

Can I replace pandas completely with Polars?

For most data processing tasks — loading, filtering, grouping, aggregating, and joining — yes. Polars covers the same ground pandas does. However, some ML libraries like scikit-learn expect pandas DataFrames or NumPy arrays as input, so you may still need to convert with .to_pandas() at the final step. Polars and pandas can coexist in the same project without issues.

How do I join two DataFrames in Polars?

Use df1.join(df2, on="common_column", how="inner"). Polars supports inner, left, outer, cross, semi, and anti joins. The how parameter accepts these strings directly. For joining on multiple columns, pass a list: on=["col1", "col2"]. All joins are executed in parallel and are significantly faster than pandas merges on large datasets.

How does Polars handle missing values?

Polars uses null for missing values, not NaN. This distinction matters: NaN is a floating-point concept; null is a general absence-of-value concept that works for any data type. Use pl.col("x").is_null() to check for nulls, .drop_nulls() to remove rows with nulls, and .fill_null(value) to replace them. Polars propagates nulls through computations correctly by default.

When should I use LazyFrame vs DataFrame?

Use LazyFrame (via df.lazy() or pl.scan_csv()) whenever you have a multi-step pipeline with filtering, joining, or aggregation. The query optimizer will find the fastest execution plan. Use eager DataFrame when you are doing exploratory work at the REPL, need to inspect intermediate results, or have a single simple operation. The performance difference is most visible on datasets over 500,000 rows with complex pipelines.

How do I convert between Polars and pandas?

Converting is one line in each direction: pandas_df = polars_df.to_pandas() and polars_df = pl.from_pandas(pandas_df). Both conversions use the Arrow memory format internally, so they are fast and memory-efficient. If you are integrating Polars into an existing pandas-heavy codebase, start by swapping just the slow parts — typically the CSV loading and groupby steps — and keep everything else in pandas until you are comfortable.

Conclusion

Polars brings genuine speed improvements to Python data work — not through tricks, but through better fundamentals: Rust’s performance, the Arrow memory model, automatic multi-threading, and a lazy query optimizer. We covered how to install Polars, create and load DataFrames, use the expression API for filtering and selection, run fast group-by aggregations, and use LazyFrame for query-plan optimization. The real-life example showed how these pieces combine into a practical data pipeline.

The best next step is to take one slow pandas script you already have and rewrite the data-loading and groupby sections in Polars. Time both versions with time.time() before and after. The improvement will be obvious and motivating. From there, explore joins and window functions — the official Polars documentation has excellent coverage of both.

How To Use Python Watchfiles for Real-Time File Monitoring

How To Use Python Watchfiles for Real-Time File Monitoring

Beginner

You have a script that processes CSV files dropped into a folder, or a service that should reload its configuration when the config file changes, or a development tool that reruns tests whenever you save a file. All of these need the same thing: a reliable way to watch for file system changes and react immediately. The standard Python approach — polling a folder in a loop with time.sleep() — works but is slow to react and wastes CPU. The watchfiles library does this properly: it uses OS-native file system events (inotify on Linux, kqueue on macOS, ReadDirectoryChangesW on Windows) to detect changes instantly with no polling overhead.

watchfiles is a modern Python file watching library written in Rust, which makes it significantly faster than pure-Python alternatives like watchdog. It supports both synchronous and async usage, integrates cleanly with asyncio, and has a simple API that gets you from installation to working watcher in about ten lines of code. You need Python 3.8+ and pip install watchfiles — no additional system packages are required on any major platform.

This article covers the complete watchfiles API: basic synchronous watching, async watching with awatch, filtering which files to monitor, handling specific change types (added, modified, deleted), watching multiple directories, and building a real-time CSV processing pipeline. By the end you will have a production-ready file watching pattern you can adapt for any live processing use case.

File Watching: Quick Example

Install watchfiles and try this minimal watcher. In one terminal, run the script. In another, create or modify files in the current directory to see events:

# quick_watch.py
from watchfiles import watch
import sys

print("Watching current directory for changes. Press Ctrl+C to stop.")

for changes in watch("."):
    for change_type, path in changes:
        print(f"{change_type.name}: {path}")

Output (when files change):

Watching current directory for changes. Press Ctrl+C to stop.
added: /home/user/project/new_file.txt
modified: /home/user/project/config.json
deleted: /home/user/project/temp.log

watch() returns a generator that yields a set of (Change, path) tuples each time files change. Change is an enum with three values: Change.added, Change.modified, and Change.deleted. Each iteration of the for loop blocks until at least one change is detected, then yields all changes that happened in that batch. Multiple changes (a rename that creates one file and deletes another) arrive in the same batch.

Watchfiles OS-native change detection
OS-native events. Zero polling. Detects changes before you lift your finger off Save.

What Is watchfiles and Why Use It?

Before watchfiles, the standard Python file watching library was watchdog. Both use OS-native events, but watchfiles has a simpler API and better async support. The Rust implementation also makes it noticeably faster for high-frequency change detection.

FeatureManual pollingwatchdogwatchfiles
Change detection speedUp to sleep intervalNear-instantNear-instant
CPU usage (idle)ConstantNear-zeroNear-zero
Async supportManualComplexFirst-class (awatch)
API complexitySimple (wrong)Observer/Handler classesSimple generator
PerformanceSlowGoodVery fast (Rust)

watchfiles is already used internally by Uvicorn and FastAPI for their --reload development mode — so if you have used uvicorn app:app --reload, you have already relied on watchfiles under the hood.

Installation

# Terminal
pip install watchfiles

# Verify
python3 -c "import watchfiles; print(watchfiles.__version__)"

Output:

0.24.0

No additional system packages are required. On Linux, watchfiles uses inotify (built into the kernel). On macOS it uses FSEvents via kqueue. On Windows it uses ReadDirectoryChangesW. The Rust extension handles all platform differences internally.

Filtering by Change Type and Path

You often only care about specific change types or specific file extensions. watchfiles provides the Change enum and filter functions for this:

# change_filtering.py
from watchfiles import watch, Change

# Only react to modified files, ignore additions and deletions
print("Watching for file modifications only...")
for changes in watch(".", watch_filter=lambda change, path: change == Change.modified):
    for change_type, path in changes:
        print(f"Modified: {path}")

Output (when a file is modified):

Watching for file modifications only...
Modified: /home/user/project/data.csv
# extension_filter.py
from watchfiles import watch, Change

# Only watch Python files
def python_only(change: Change, path: str) -> bool:
    return path.endswith(".py")

print("Watching for Python file changes...")
for changes in watch(".", watch_filter=python_only):
    for change_type, path in changes:
        print(f"{change_type.name}: {path}")

Output (when a .py file changes):

Watching for Python file changes...
modified: /home/user/project/main.py
added: /home/user/project/utils.py

The watch_filter parameter accepts any callable that takes a Change value and a path string and returns a boolean. When the filter returns False for an event, that event is silently discarded. If all events in a batch are filtered out, watchfiles does not yield that batch at all — your loop only runs when at least one passing event occurs.

Watchfiles watch_filter for file types
watch_filter: the change happened. Your code decides if it cares.

Watching Multiple Directories

watch() accepts multiple paths as positional arguments. All paths are monitored in a single watcher instance, and changes from any of them arrive in the same iterator:

# multi_directory_watch.py
from watchfiles import watch
import os

# Create test directories
os.makedirs("configs", exist_ok=True)
os.makedirs("data/incoming", exist_ok=True)

print("Watching configs/ and data/incoming/ for changes...")
for changes in watch("configs", "data/incoming"):
    for change_type, path in changes:
        # Determine which directory the change came from
        if "configs" in path:
            print(f"Config changed: {change_type.name} -- {path}")
            print("  --> Triggering config reload")
        elif "incoming" in path:
            print(f"New data: {change_type.name} -- {path}")
            print("  --> Triggering data processing")

Output (when files change):

Watching configs/ and data/incoming/ for changes...
Config changed: modified -- /home/user/project/configs/app.yaml
  --> Triggering config reload
New data: added -- /home/user/project/data/incoming/report_2026-05-19.csv
  --> Triggering data processing

Async Watching with awatch

For async applications — FastAPI handlers, async scripts, or any asyncio-based code — use awatch() instead of watch(). The API is identical, but it is an async generator:

# async_watch.py
import asyncio
from watchfiles import awatch

async def process_change(path: str) -> None:
    """Simulate async processing of a changed file."""
    await asyncio.sleep(0.1)  # replace with real async I/O
    print(f"  Processed: {path}")

async def main():
    print("Async watcher running...")
    async for changes in awatch("."):
        print(f"Batch of {len(changes)} change(s):")
        tasks = []
        for change_type, path in changes:
            print(f"  {change_type.name}: {path}")
            if change_type.name in ("added", "modified"):
                tasks.append(process_change(path))
        if tasks:
            await asyncio.gather(*tasks)

asyncio.run(main())

Output (when files change):

Async watcher running...
Batch of 2 change(s):
  added: /home/user/project/log_001.txt
  modified: /home/user/project/log_002.txt
  Processed: /home/user/project/log_001.txt
  Processed: /home/user/project/log_002.txt

Using asyncio.gather() to process multiple changes concurrently is the right pattern here — if three files arrive in the same batch, you want to process them in parallel, not sequentially. The awatch() generator itself runs in a background thread so the async for loop does not block the event loop while waiting for file system events.

Watchfiles asyncio batch processing
asyncio.gather() on the batch. Process 10 files in the time it takes to process one.

Stopping and Timeout Control

watchfiles provides built-in timeout and stop event support for clean shutdown in services:

# controlled_watcher.py
import asyncio
import threading
from watchfiles import awatch

async def watch_with_timeout():
    """Watch for 30 seconds then stop automatically."""
    print("Watching for 30 seconds...")
    async for changes in awatch(".", stop_event=asyncio.Event(), timeout=30000):
        for change_type, path in changes:
            print(f"{change_type.name}: {path}")
    print("Watch period ended.")

async def watch_with_stop_event():
    """Watch until explicitly stopped via an event."""
    stop = asyncio.Event()

    # Schedule a stop after 10 seconds (simulating shutdown signal)
    async def stopper():
        await asyncio.sleep(10)
        print("\nSignaling stop...")
        stop.set()

    asyncio.create_task(stopper())

    async for changes in awatch(".", stop_event=stop):
        for change_type, path in changes:
            print(f"{change_type.name}: {path}")
    print("Stopped cleanly.")

asyncio.run(watch_with_stop_event())

Output:

modified: /home/user/project/test.txt
Signaling stop...
Stopped cleanly.

The stop_event pattern is the correct way to handle graceful shutdown in a service. When your application receives a SIGTERM signal (from systemd, Docker, or Kubernetes), set the stop event and the watcher loop exits cleanly after processing any pending events. The timeout parameter is in milliseconds and causes the watcher to exit if no changes occur within that period — useful for integration tests or bounded watch sessions.

Real-Life Example: Live CSV Processing Pipeline

Watchfiles asyncio ETL pipeline
awatch + asyncio: your ETL pipeline wakes up the moment the file lands.
# csv_pipeline.py
"""
Real-time CSV processing pipeline using watchfiles.
Watches an incoming/ folder and processes new CSV files immediately.
"""
import asyncio
import csv
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
from watchfiles import awatch, Change

# Directory structure
INCOMING = Path("incoming")
PROCESSED = Path("processed")
FAILED = Path("failed")
SUMMARY_FILE = Path("pipeline_summary.json")

for d in [INCOMING, PROCESSED, FAILED]:
    d.mkdir(exist_ok=True)

async def process_csv(path: str) -> dict:
    """Read a CSV file, compute basic stats, return summary."""
    filepath = Path(path)
    rows = []
    try:
        with open(filepath, newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                rows.append(row)

        if not rows:
            raise ValueError("Empty CSV file")

        summary = {
            "file": filepath.name,
            "processed_at": datetime.utcnow().isoformat(),
            "row_count": len(rows),
            "columns": list(rows[0].keys()) if rows else [],
            "status": "ok"
        }

        # Move to processed folder
        dest = PROCESSED / filepath.name
        shutil.move(str(filepath), str(dest))
        print(f"  [OK] {filepath.name} -- {len(rows)} rows, cols: {summary['columns']}")
        return summary

    except Exception as e:
        # Move to failed folder for investigation
        dest = FAILED / filepath.name
        shutil.move(str(filepath), str(dest))
        print(f"  [FAIL] {filepath.name} -- {e}")
        return {"file": filepath.name, "status": "failed", "error": str(e)}

def update_summary(results: list[dict]) -> None:
    existing = []
    if SUMMARY_FILE.exists():
        with open(SUMMARY_FILE) as f:
            existing = json.load(f)
    existing.extend(results)
    with open(SUMMARY_FILE, "w") as f:
        json.dump(existing, f, indent=2)

async def main():
    print(f"Pipeline watching {INCOMING}/ for new CSV files...")
    print(f"Results -> {PROCESSED}/, failures -> {FAILED}/\n")

    async for changes in awatch(str(INCOMING)):
        new_csvs = [
            path for change, path in changes
            if change == Change.added and path.endswith(".csv")
        ]
        if not new_csvs:
            continue

        print(f"Detected {len(new_csvs)} new CSV file(s):")
        tasks = [process_csv(path) for path in new_csvs]
        results = await asyncio.gather(*tasks)
        update_summary(list(results))
        print(f"Batch complete. Summary updated.\n")

if __name__ == "__main__":
    asyncio.run(main())

Test it:

# In terminal 1: run the pipeline
python3 csv_pipeline.py

# In terminal 2: drop test CSV files into incoming/
echo "name,age,city
Alice,32,Melbourne
Bob,28,Sydney" > incoming/users_001.csv

echo "product,price,qty
Widget,9.99,100
Gadget,24.99,50" > incoming/products_001.csv

# Terminal 1 output:
Pipeline watching incoming/ for new CSV files...
Results -> processed/, failures -> failed/

Detected 2 new CSV file(s):
  [OK] users_001.csv -- 2 rows, cols: ['name', 'age', 'city']
  [OK] products_001.csv -- 2 rows, cols: ['product', 'price', 'qty']
Batch complete. Summary updated.

This pipeline handles the real production concerns: concurrent processing of multiple files in the same batch, error isolation (a bad CSV does not block good ones), file movement after processing (so no file is ever processed twice), and a running summary log. Extend it by adding a database write step in process_csv(), or replace the CSV processing with any async operation — API calls, image processing, document parsing — without changing the watcher infrastructure.

Frequently Asked Questions

How is watchfiles different from watchdog?

watchdog uses an Observer/EventHandler class pattern where you subclass FileSystemEventHandler and attach it to an Observer. watchfiles uses a simpler generator pattern and has first-class async support. The Rust core of watchfiles also makes it faster and more memory-efficient for high-frequency change detection. Both work reliably for typical use cases; watchfiles is the better choice for new projects, especially those using async Python.

Does watchfiles watch subdirectories automatically?

Yes — by default, watch() and awatch() watch recursively. All files and subdirectories within the watched path are monitored. You can scope the monitoring by using a specific subdirectory as the watched path, or by filtering with watch_filter to only react to paths matching your pattern.

This depends on the OS. On Linux with inotify, symbolic links are not followed — you need to watch the link target directly. On macOS with FSEvents, symbolic links within the watched directory tree are followed. For cross-platform reliability, watch the real path rather than a symlink: use os.path.realpath(path) before passing it to watch().

What happens with large directories (thousands of files)?

OS-native file watching scales much better than polling because the kernel only notifies your process when something actually changes. For very large directory trees (tens of thousands of files), inotify on Linux has a limit on the number of watches per user (/proc/sys/fs/inotify/max_user_watches, default 8192). If you hit this limit, increase it: echo 524288 | sudo tee /proc/sys/fs/inotify/max_user_watches. Persist the change in /etc/sysctl.conf for system reboots.

How do I debounce rapid changes to the same file?

watchfiles already does some internal debouncing — multiple changes to the same file within a short period are collapsed into one batch. For additional debouncing, track the last-processed time per file in a dict and skip files that were processed within the last N seconds: if (now - last_processed.get(path, 0)) < debounce_seconds: continue. This is especially useful when monitoring log files that are written to many times per second.

Conclusion

The watchfiles library gives you instant, CPU-efficient file system monitoring in about ten lines of Python. We covered synchronous watching with watch(), async watching with awatch(), filtering by change type and file extension, watching multiple directories, clean shutdown with stop events, and built a complete CSV processing pipeline that handles batches concurrently.

The next step is to integrate the pipeline with your existing data infrastructure — swap the CSV parsing for database writes, add a message queue to distribute work across multiple workers, or wrap the watcher in a systemd service for reliable production deployment. The file watching layer stays exactly as it is.

Full documentation and advanced usage examples are at watchfiles.helpmanual.io.

How To Use Python Kivy for Cross-Platform Mobile and Desktop Apps

How To Use Python Kivy for Cross-Platform Mobile and Desktop Apps

Intermediate

Most Python developers live in the terminal and the browser, but there are plenty of situations where you need a real graphical application: a data entry tool for a client who does not want a web server, an offline utility that works without internet, or a mobile app for field technicians who use Python scripts already. Most Python GUI options either look dated (Tkinter), require learning a new language-within-a-language (Qt’s QML), or tie you to a single platform. Kivy does something different — it lets you write Python that runs on Windows, Mac, Linux, Android, and iOS from the same codebase.

Kivy is an open-source Python framework for building touch-ready, GPU-accelerated graphical applications. It uses its own widget toolkit that renders consistently across platforms, so your layout does not look different on each OS. You need Python 3.8+, and pip install kivy handles most desktop setups. Mobile deployment requires additional packaging tools (Buildozer for Android, kivy-ios for iOS), but the development cycle happens on your desktop.

This article covers the Kivy fundamentals you need to build real apps: the App class and widget tree, layouts (BoxLayout, GridLayout, FloatLayout), event handling, the KV language for UI definitions, property bindings, screen management for multi-screen apps, and file dialogs. By the end you will have built a multi-screen expense tracker that runs on any platform.

Kivy App: Quick Example

Install Kivy and run a minimal app to confirm your setup:

# quick_kivy.py
from kivy.app import App
from kivy.uix.label import Label
from kivy.uix.button import Button
from kivy.uix.boxlayout import BoxLayout

class QuickApp(App):
    def build(self):
        layout = BoxLayout(orientation="vertical", padding=20, spacing=10)

        self.label = Label(text="Click the button!", font_size=24)
        btn = Button(text="Click Me", size_hint=(1, 0.3), font_size=18)
        btn.bind(on_press=self.on_button_press)

        layout.add_widget(self.label)
        layout.add_widget(btn)
        return layout

    def on_button_press(self, instance):
        self.label.text = "Button pressed!"

if __name__ == "__main__":
    QuickApp().run()

Expected behavior:

# A window opens with a label and a button.
# Clicking the button changes the label text to "Button pressed!"
# Close the window to exit.

Every Kivy app inherits from App and implements a build() method that returns the root widget. Widgets are composed into a tree: the root widget contains child widgets, which can contain further children. BoxLayout arranges children vertically or horizontally. The bind() method connects events to handler methods — on_press fires when a button is clicked or tapped.

Kivy widgets and layouts stacking
Kivy widgets: it’s turtles and layouts all the way down.

What Is Kivy and When Should You Use It?

Kivy renders every widget using OpenGL ES 2 — the same graphics API used by mobile games. This means your UI looks identical on every platform and handles touch input natively (no retrofitted touch support). The tradeoff is that Kivy apps do not look like native OS apps — the buttons and text fields have Kivy’s own style, not Windows or macOS native controls.

FrameworkPlatformsNative LookTouchBest For
TkinterDesktop onlyYesNoSimple utilities
PyQt/PySideDesktop onlyYesPartialComplex desktop apps
wxPythonDesktop onlyYesNoTraditional GUIs
KivyDesktop + MobileNo (own style)YesCross-platform + mobile
BeeWareDesktop + MobileYesYesNative-looking mobile

Use Kivy when you need mobile deployment or touch input, when you have existing Python business logic and want to wrap it in a GUI, or when you need the same UI to work on a touchscreen kiosk and a desktop simultaneously. If you need your app to look indistinguishable from a native macOS or Windows application, BeeWare or a web-based approach may serve you better.

Layouts and Widget Positioning

Kivy has several layout classes. BoxLayout is the most common — it arranges children in a horizontal or vertical line. GridLayout creates a grid, and FloatLayout lets you position widgets with precise coordinates or percentages:

# layouts_demo.py
from kivy.app import App
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.gridlayout import GridLayout
from kivy.uix.label import Label
from kivy.uix.button import Button
from kivy.uix.textinput import TextInput

class LayoutDemoApp(App):
    def build(self):
        # Main vertical layout
        main = BoxLayout(orientation="vertical", padding=10, spacing=5)

        # Header label
        main.add_widget(Label(
            text="Simple Calculator",
            font_size=22,
            size_hint=(1, 0.1)
        ))

        # Input field
        self.input = TextInput(
            hint_text="Enter a number",
            multiline=False,
            size_hint=(1, 0.1),
            font_size=18
        )
        main.add_widget(self.input)

        # 4x4 grid of number buttons
        grid = GridLayout(cols=4, size_hint=(1, 0.6), spacing=3)
        for label in ["7","8","9","/", "4","5","6","*", "1","2","3","-", "C","0","=","+"]:
            btn = Button(text=label, font_size=20)
            btn.bind(on_press=self.on_key)
            grid.add_widget(btn)
        main.add_widget(grid)

        # Result display
        self.result = Label(text="", font_size=20, size_hint=(1, 0.1))
        main.add_widget(self.result)

        return main

    def on_key(self, instance):
        key = instance.text
        if key == "C":
            self.input.text = ""
            self.result.text = ""
        elif key == "=":
            try:
                self.result.text = str(eval(self.input.text))
            except Exception:
                self.result.text = "Error"
        else:
            self.input.text += key

if __name__ == "__main__":
    LayoutDemoApp().run()

Expected behavior:

# A calculator window opens.
# Clicking number buttons appends digits to the input field.
# Clicking = evaluates the expression and shows the result.
# Clicking C clears the input and result.

The size_hint property controls how much of the parent’s space a widget takes — size_hint=(1, 0.1) means “use 100% of the width and 10% of the height.” Setting size_hint=None and using size=(width, height) with pixel values gives you fixed sizes instead. GridLayout requires a cols (or rows) parameter; it fills in grid cells in order, left-to-right and top-to-bottom.

Kivy size_hint layout system
size_hint=(1, 0.1): your layout math, Kivy’s problem to render.

The KV Language for UI Definitions

Writing complex UIs in Python code gets verbose. Kivy includes KV language — a YAML-like DSL for describing widget trees separately from Python logic. This keeps your UI definition out of your business logic:

# my_app.kv -- saved alongside my_app.py
# Kivy automatically loads this file when your App class is named "MyApp"

<RootWidget>:
    orientation: "vertical"
    padding: 20
    spacing: 10

    Label:
        text: "Welcome to My App"
        font_size: 28
        size_hint: (1, 0.2)
        color: 0.2, 0.6, 1, 1  # RGBA

    TextInput:
        id: name_input
        hint_text: "Enter your name"
        size_hint: (1, 0.15)
        multiline: False

    Button:
        text: "Greet Me"
        size_hint: (0.5, 0.15)
        pos_hint: {"center_x": 0.5}
        on_press: root.greet(name_input.text)

    Label:
        id: greeting_label
        text: ""
        font_size: 20
        size_hint: (1, 0.2)
# my_app.py -- Python code stays clean
from kivy.app import App
from kivy.uix.boxlayout import BoxLayout

class RootWidget(BoxLayout):
    def greet(self, name: str) -> None:
        if name.strip():
            self.ids.greeting_label.text = f"Hello, {name}!"
        else:
            self.ids.greeting_label.text = "Please enter your name."

class MyApp(App):
    def build(self):
        return RootWidget()

if __name__ == "__main__":
    MyApp().run()

Expected behavior:

# Window with a text field and button.
# Type a name, click Greet Me.
# Label shows: "Hello, [name]!"

The KV file is named after your App class in lowercase (MyApp loads my_app.kv). Widget IDs declared in KV become accessible in Python via self.ids.widget_id. The on_press: root.greet(name_input.text) line in KV directly calls a Python method and passes a value from another widget — all without writing binding code in Python. This separation is similar to how HTML/CSS/JavaScript work: structure in KV, logic in Python.

Multi-Screen Navigation with ScreenManager

Real apps have multiple screens. Kivy’s ScreenManager handles navigation between named screens:

# screen_navigation.py
from kivy.app import App
from kivy.uix.screenmanager import ScreenManager, Screen
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.button import Button
from kivy.uix.label import Label
from kivy.uix.textinput import TextInput

class LoginScreen(Screen):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        layout = BoxLayout(orientation="vertical", padding=40, spacing=15)
        layout.add_widget(Label(text="Login", font_size=28, size_hint=(1, 0.2)))
        self.username = TextInput(hint_text="Username", multiline=False, size_hint=(1, 0.15))
        self.password = TextInput(hint_text="Password", password=True, multiline=False, size_hint=(1, 0.15))
        btn = Button(text="Login", size_hint=(0.5, 0.15), pos_hint={"center_x": 0.5})
        btn.bind(on_press=self.do_login)
        layout.add_widget(self.username)
        layout.add_widget(self.password)
        layout.add_widget(btn)
        self.add_widget(layout)

    def do_login(self, instance):
        # In a real app: validate credentials here
        if self.username.text:
            self.manager.current = "dashboard"
            self.manager.get_screen("dashboard").set_user(self.username.text)

class DashboardScreen(Screen):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        layout = BoxLayout(orientation="vertical", padding=20, spacing=10)
        self.welcome = Label(text="Welcome!", font_size=22, size_hint=(1, 0.2))
        btn = Button(text="Logout", size_hint=(0.4, 0.12), pos_hint={"center_x": 0.5})
        btn.bind(on_press=lambda x: setattr(self.manager, "current", "login"))
        layout.add_widget(self.welcome)
        layout.add_widget(btn)
        self.add_widget(layout)

    def set_user(self, username: str) -> None:
        self.welcome.text = f"Welcome, {username}!"

class MultiScreenApp(App):
    def build(self):
        sm = ScreenManager()
        sm.add_widget(LoginScreen(name="login"))
        sm.add_widget(DashboardScreen(name="dashboard"))
        return sm

if __name__ == "__main__":
    MultiScreenApp().run()

Expected behavior:

# Login screen appears first.
# Enter any username, click Login.
# Transitions to Dashboard showing "Welcome, [username]!"
# Logout returns to Login screen.

Switching screens is as simple as setting self.manager.current = "screen_name". Screens can pass data to each other by calling methods on the target screen before or after the transition. The ScreenManager also supports transition animations — SlideTransition, FadeTransition, and SwapTransition — by setting ScreenManager(transition=SlideTransition()).

Real-Life Example: Expense Tracker App

Kivy ScrollView and TextInput app
ScrollView + TextInput + a list: the whole app architecture, visible.
# expense_tracker.py
"""
Simple cross-platform expense tracker using Kivy.
Runs on Windows, Mac, Linux, Android, iOS from the same code.
"""
from kivy.app import App
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.gridlayout import GridLayout
from kivy.uix.scrollview import ScrollView
from kivy.uix.label import Label
from kivy.uix.button import Button
from kivy.uix.textinput import TextInput
from kivy.uix.popup import Popup
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json, os

@dataclass
class Expense:
    description: str
    amount: float
    date: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d"))

DATA_FILE = "expenses.json"

def load_expenses() -> List[Expense]:
    if os.path.exists(DATA_FILE):
        with open(DATA_FILE) as f:
            return [Expense(**e) for e in json.load(f)]
    return []

def save_expenses(expenses: List[Expense]) -> None:
    with open(DATA_FILE, "w") as f:
        json.dump([e.__dict__ for e in expenses], f, indent=2)

class ExpenseApp(App):
    def build(self):
        self.expenses: List[Expense] = load_expenses()

        main = BoxLayout(orientation="vertical", padding=10, spacing=8)

        # Title
        main.add_widget(Label(text="Expense Tracker", font_size=24, size_hint=(1, 0.08)))

        # Input row
        input_row = BoxLayout(orientation="horizontal", size_hint=(1, 0.1), spacing=5)
        self.desc_input = TextInput(hint_text="Description", multiline=False, size_hint=(0.5, 1))
        self.amt_input = TextInput(hint_text="Amount", multiline=False, input_filter="float", size_hint=(0.25, 1))
        add_btn = Button(text="Add", size_hint=(0.25, 1))
        add_btn.bind(on_press=self.add_expense)
        input_row.add_widget(self.desc_input)
        input_row.add_widget(self.amt_input)
        input_row.add_widget(add_btn)
        main.add_widget(input_row)

        # Total label
        self.total_label = Label(text="Total: $0.00", font_size=18, size_hint=(1, 0.07))
        main.add_widget(self.total_label)

        # Scrollable expense list
        scroll = ScrollView(size_hint=(1, 0.75))
        self.list_layout = GridLayout(cols=1, spacing=3, size_hint_y=None)
        self.list_layout.bind(minimum_height=self.list_layout.setter("height"))
        scroll.add_widget(self.list_layout)
        main.add_widget(scroll)

        self.refresh_list()
        return main

    def add_expense(self, instance):
        desc = self.desc_input.text.strip()
        try:
            amount = float(self.amt_input.text)
        except ValueError:
            self.show_error("Enter a valid amount")
            return
        if not desc:
            self.show_error("Enter a description")
            return
        self.expenses.append(Expense(description=desc, amount=amount))
        save_expenses(self.expenses)
        self.desc_input.text = ""
        self.amt_input.text = ""
        self.refresh_list()

    def refresh_list(self):
        self.list_layout.clear_widgets()
        total = 0.0
        for expense in reversed(self.expenses):
            row = BoxLayout(orientation="horizontal", size_hint=(1, None), height=44, spacing=3)
            row.add_widget(Label(text=expense.date, size_hint=(0.2, 1), font_size=12))
            row.add_widget(Label(text=expense.description, size_hint=(0.5, 1), font_size=14))
            row.add_widget(Label(text=f"${expense.amount:.2f}", size_hint=(0.3, 1), font_size=14))
            self.list_layout.add_widget(row)
            total += expense.amount
        self.total_label.text = f"Total: ${total:.2f}"

    def show_error(self, message: str) -> None:
        popup = Popup(title="Error", size_hint=(0.7, 0.3),
                      content=Label(text=message))
        popup.open()

if __name__ == "__main__":
    ExpenseApp().run()

Expected behavior:

# App opens showing an empty expense list.
# Enter "Coffee" and 4.50, click Add.
# List shows: 2026-05-19 | Coffee | $4.50 | Total: $4.50
# Data is saved to expenses.json and persists between runs.

To deploy this to Android, install Buildozer (pip install buildozer), run buildozer init to generate a spec file, then buildozer android debug to compile the APK. The Python code does not change — Buildozer packages your Kivy app with a Python interpreter into a standard Android APK. iOS packaging works similarly with kivy-ios on macOS.

Frequently Asked Questions

How hard is it to deploy a Kivy app to Android?

Buildozer automates most of the Android packaging process. You run buildozer android debug and it downloads the Android NDK, compiles your Python and dependencies, and produces an APK. The first build takes 20-30 minutes; subsequent builds are much faster. Common issues include missing dependencies in your buildozer.spec requirements list and library compatibility. Pure Python libraries work well; C extension libraries need ARM-compatible wheels or must be compiled by Buildozer.

Is Kivy fast enough for real apps?

For business apps (forms, lists, data display), Kivy’s performance is more than adequate. The GPU-accelerated rendering means the UI stays responsive. For computation-heavy tasks, use a background thread with threading.Thread and update the UI via Clock.schedule_once(callback, 0) from the main Kivy thread — direct UI updates from background threads cause crashes. Do not run heavy computation in event handlers.

Should I use KV language or pure Python for layouts?

KV language is strongly recommended for anything beyond a handful of widgets. The declarative structure is much easier to read and maintain than nested Python constructor calls. Use Python for logic (event handlers, data processing, state management) and KV for structure (widget tree, positions, styles, property bindings). You can mix both: define the widget tree in KV and modify widget properties from Python via self.ids.

Can I change Kivy’s default visual style?

Yes — Kivy supports custom themes via the Atlas system and custom widget styling. At the basic level, every widget has a background_color, color, and font_size you can set. For consistent theming, define a custom style in your KV file and inherit from it in all your widgets. The KivyMD library (pip install kivymd) provides Material Design widgets built on top of Kivy if you want a modern, polished look.

How do I store data in a Kivy app?

For simple key-value storage, use kivy.storage.jsonstore.JsonStore — it handles the platform-appropriate storage location automatically. For structured data with relationships, use SQLite via Python’s built-in sqlite3 module. For cloud sync, call a REST API from a background thread. On Android, the app has read/write access to its own data directory; requesting permission for external storage (photos, downloads) is done via Plyer, the Kivy-maintained platform API library.

Conclusion

Kivy lets you build genuinely cross-platform graphical applications with Python you already know. We covered the App class and widget tree, BoxLayout and GridLayout for positioning, the KV language for clean UI definitions, the ScreenManager for multi-screen navigation, and built a complete expense tracker that saves data and runs on any platform.

The natural next step is to add a chart screen to the expense tracker using kivy.garden.matplotlib for spending trends, or to package it as an Android APK using Buildozer. The app structure you have already is solid enough to extend without rewriting — add screens via ScreenManager and add features to existing screens without touching the navigation logic.

Full documentation is at kivy.org/doc/stable. The Android packaging guide covers the full Buildozer workflow in detail.

How To Use Python Litestar for Async Web APIs

How To Use Python Litestar for Async Web APIs

Intermediate

FastAPI popularized the pattern of using Python type hints to define API contracts, but if you have spent time with it in production you have probably hit its limitations: limited built-in dependency injection, no first-class support for data transfer objects (DTOs), and a plugin ecosystem that requires assembling multiple third-party packages. Litestar (formerly Starlite) takes the same type-hint-first philosophy and extends it significantly — built-in DTOs, a layered dependency injection system, first-class WebSocket and Server-Sent Events support, and automatic OpenAPI schema generation that actually stays in sync with your code.

Litestar is a fully async Python web framework built on Starlette and Uvicorn. It is production-ready, actively maintained, and designed specifically for building APIs rather than full-stack web applications. You need Python 3.9+, and pip install litestar[standard] gets you the full stack including Uvicorn and the development server.

This article walks through everything you need to build async APIs with Litestar: routing and handlers, path and query parameters, request body validation, dependency injection, middleware, DTOs for input/output control, WebSocket handlers, and a real-life task management API. By the end you will have a production-ready Litestar API that you can extend for your own projects.

Litestar Async API: Quick Example

Install Litestar and run a minimal API to see the structure:

# quick_litestar.py
from litestar import Litestar, get, post
from pydantic import BaseModel

class Item(BaseModel):
    name: str
    price: float

# In-memory store (replace with a database in production)
items: list[Item] = []

@get("/items")
async def list_items() -> list[Item]:
    return items

@post("/items")
async def create_item(data: Item) -> Item:
    items.append(data)
    return data

app = Litestar(route_handlers=[list_items, create_item])

# Run with: uvicorn quick_litestar:app --reload
# Or: litestar run --reload

Test it:

# POST an item
curl -X POST http://127.0.0.1:8000/items \
  -H "Content-Type: application/json" \
  -d '{"name": "Python Book", "price": 49.99}'

# Output:
{"name":"Python Book","price":49.99}

# GET all items
curl http://127.0.0.1:8000/items

# Output:
[{"name":"Python Book","price":49.99}]

Litestar automatically generates an OpenAPI spec at /schema and a Swagger UI at /schema/swagger. Unlike frameworks where you annotate your functions with OpenAPI decorators after the fact, Litestar derives the schema directly from your Python type annotations — no duplication, no drift.

Litestar routing with type annotations
Type annotations in. OpenAPI docs out. No extra decorators needed.

What Is Litestar and Why Use It?

Litestar is positioned between FastAPI (simple, widely adopted) and frameworks like Django REST Framework (comprehensive, heavyweight). It shares FastAPI’s type-hint-driven approach but adds features that FastAPI requires plugins for:

FeatureFlask/DjangoFastAPILitestar
Async supportLimitedFirst-classFirst-class
Type-driven validationManualPydanticPydantic + attrs + msgspec
OpenAPI generationPluginBuilt-inBuilt-in, more control
Dependency injectionManual/pluginBasicLayered, scoped
DTOsManualManualBuilt-in
WebSocketsPluginBasicFirst-class

The library works with Pydantic, attrs, and msgspec as data validation backends, so you are not locked into one serialization library. For high-performance APIs where serialization speed matters, msgspec is significantly faster than Pydantic.

Installation and Project Setup

# Terminal
pip install "litestar[standard]"
# For msgspec support (faster serialization):
pip install "litestar[standard,msgspec]"
# project structure
# myapi/
#   app.py       -- main application
#   routes/      -- route handlers
#   models.py    -- data models
#   deps.py      -- dependencies
# verify_install.py
import litestar
print(f"Litestar version: {litestar.__version__}")

# Quick health check to confirm the app starts
from litestar import Litestar, get

@get("/health")
async def health() -> dict:
    return {"status": "ok"}

app = Litestar(route_handlers=[health])
print("App created successfully")

Output:

Litestar version: 2.x.x
App created successfully

Routing and Route Handlers

Litestar uses decorator-based routing. Each HTTP method has its own decorator: @get, @post, @put, @patch, @delete. Handlers are plain async functions — no class inheritance required (though controllers are available for grouping):

# routing_example.py
from litestar import Litestar, get, post, put, delete
from litestar.exceptions import NotFoundException
from pydantic import BaseModel
from typing import Optional

class Product(BaseModel):
    id: int
    name: str
    price: float
    in_stock: bool = True

# Simulated database
_db: dict[int, Product] = {
    1: Product(id=1, name="Widget", price=9.99),
    2: Product(id=2, name="Gadget", price=24.99),
}

@get("/products")
async def list_products(in_stock: Optional[bool] = None) -> list[Product]:
    """List products, optionally filtered by stock status."""
    products = list(_db.values())
    if in_stock is not None:
        products = [p for p in products if p.in_stock == in_stock]
    return products

@get("/products/{product_id:int}")
async def get_product(product_id: int) -> Product:
    """Get a single product by ID."""
    if product_id not in _db:
        raise NotFoundException(f"Product {product_id} not found")
    return _db[product_id]

@post("/products")
async def create_product(data: Product) -> Product:
    """Create a new product."""
    _db[data.id] = data
    return data

@delete("/products/{product_id:int}", status_code=204)
async def delete_product(product_id: int) -> None:
    """Delete a product. Returns 204 No Content."""
    if product_id not in _db:
        raise NotFoundException(f"Product {product_id} not found")
    del _db[product_id]

app = Litestar(route_handlers=[list_products, get_product, create_product, delete_product])

Path parameters use the {name:type} syntax where the type is int, str, float, uuid, or path. Litestar validates and converts the path segment automatically — if someone passes a non-integer to /products/{product_id:int}, they get a 400 error before your handler is ever called. Query parameters are declared as typed function parameters with optional defaults.

Litestar path parameter coercion
{product_id:int} — path coercion before your handler even runs.

Dependency Injection

Litestar has a layered dependency injection system where dependencies can be scoped to the entire app, a router, a controller, or a single handler. This is more flexible than FastAPI’s flat dependency approach:

# dependency_injection.py
from litestar import Litestar, get
from litestar.di import Provide
import asyncio

# A simulated async database connection
class Database:
    def __init__(self, url: str):
        self.url = url
        self.connected = False

    async def connect(self) -> None:
        await asyncio.sleep(0.001)  # simulate connection
        self.connected = True

    async def query(self, sql: str) -> list[dict]:
        # Simulate a query result
        return [{"id": 1, "result": f"Data from: {sql[:30]}"}]

async def get_db() -> Database:
    """Dependency: creates and returns a connected database instance."""
    db = Database(url="postgresql://localhost/myapp")
    await db.connect()
    return db

@get("/users", dependencies={"db": Provide(get_db)})
async def list_users(db: Database) -> list[dict]:
    """Handler receives db via dependency injection."""
    users = await db.query("SELECT * FROM users LIMIT 10")
    return users

# App-level dependency (available to all handlers)
app = Litestar(
    route_handlers=[list_users],
    dependencies={"db": Provide(get_db)}  # set at app level for global access
)

print("Dependency injection app created")

Output:

Dependency injection app created

Litestar supports dependency scoping: a database connection defined at the router level is shared across all handlers in that router but not across the entire app. This prevents accidentally sharing state between requests. When a dependency is an async generator function, Litestar treats it as a context manager — the setup code runs before the handler and the teardown code runs after, making it clean to handle connections and transactions.

Data Transfer Objects (DTOs)

DTOs control which fields are exposed in the API, separate from your internal data models. This solves the common problem of accidentally exposing password hashes or internal IDs in your API responses:

# dto_example.py
from litestar import Litestar, get, post
from litestar.dto import DTOConfig
from litestar.contrib.pydantic import PydanticDTO
from pydantic import BaseModel
from typing import Optional

class User(BaseModel):
    """Internal model -- has fields we don't want to expose."""
    id: int
    username: str
    email: str
    password_hash: str          # never expose this
    internal_notes: str = ""    # internal admin field

# ReadDTO: expose only safe fields in responses
class UserReadDTO(PydanticDTO[User]):
    config = DTOConfig(exclude={"password_hash", "internal_notes"})

# WriteDTO: accept only these fields on creation
class UserCreateDTO(PydanticDTO[User]):
    config = DTOConfig(exclude={"id", "password_hash", "internal_notes"})

_users: dict[int, User] = {}
_next_id = 1

@post("/users", dto=UserCreateDTO, return_dto=UserReadDTO)
async def create_user(data: User) -> User:
    global _next_id
    # data arrives with id=None and no password_hash (excluded by DTO)
    user = User(
        id=_next_id,
        username=data.username,
        email=data.email,
        password_hash="$2b$12$hashed_value_here",  # would bcrypt in real app
        internal_notes="",
    )
    _users[_next_id] = user
    _next_id += 1
    return user  # UserReadDTO strips password_hash before serializing

@get("/users/{user_id:int}", return_dto=UserReadDTO)
async def get_user(user_id: int) -> User:
    return _users[user_id]

app = Litestar(route_handlers=[create_user, get_user])
print("DTO app created -- password_hash excluded from all responses")

Output:

DTO app created -- password_hash excluded from all responses

The dto= parameter controls the input shape (what clients send), and return_dto= controls the output shape (what clients receive). Litestar enforces both automatically — your handler works with the full internal model, while the API contract only exposes what you allow. This separation prevents the most common API security mistake: including sensitive fields in responses because the internal model has them.

Real-Life Example: Task Management API

Litestar CRUD response filtering
CRUD is easy. The interesting part is what you exclude from the response.
# task_api.py
"""
Complete task management API using Litestar.
Demonstrates routing, validation, DTOs, and error handling.
"""
from __future__ import annotations
from litestar import Litestar, get, post, patch, delete
from litestar.exceptions import NotFoundException
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
from enum import Enum

class Priority(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class Status(str, Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    DONE = "done"

class Task(BaseModel):
    id: int
    title: str = Field(min_length=1, max_length=200)
    description: str = ""
    priority: Priority = Priority.MEDIUM
    status: Status = Status.PENDING
    created_at: datetime = Field(default_factory=datetime.utcnow)
    completed_at: Optional[datetime] = None

class TaskCreate(BaseModel):
    title: str = Field(min_length=1, max_length=200)
    description: str = ""
    priority: Priority = Priority.MEDIUM

class TaskUpdate(BaseModel):
    title: Optional[str] = Field(None, min_length=1, max_length=200)
    description: Optional[str] = None
    status: Optional[Status] = None
    priority: Optional[Priority] = None

# In-memory store
_tasks: dict[int, Task] = {}
_next_id = 1

@get("/tasks")
async def list_tasks(
    status: Optional[Status] = None,
    priority: Optional[Priority] = None,
) -> list[Task]:
    tasks = list(_tasks.values())
    if status:
        tasks = [t for t in tasks if t.status == status]
    if priority:
        tasks = [t for t in tasks if t.priority == priority]
    return sorted(tasks, key=lambda t: t.created_at, reverse=True)

@post("/tasks", status_code=201)
async def create_task(data: TaskCreate) -> Task:
    global _next_id
    task = Task(id=_next_id, **data.model_dump())
    _tasks[_next_id] = task
    _next_id += 1
    return task

@get("/tasks/{task_id:int}")
async def get_task(task_id: int) -> Task:
    if task_id not in _tasks:
        raise NotFoundException(f"Task {task_id} not found")
    return _tasks[task_id]

@patch("/tasks/{task_id:int}")
async def update_task(task_id: int, data: TaskUpdate) -> Task:
    if task_id not in _tasks:
        raise NotFoundException(f"Task {task_id} not found")
    task = _tasks[task_id]
    updates = data.model_dump(exclude_none=True)
    if "status" in updates and updates["status"] == Status.DONE:
        updates["completed_at"] = datetime.utcnow()
    updated = task.model_copy(update=updates)
    _tasks[task_id] = updated
    return updated

@delete("/tasks/{task_id:int}", status_code=204)
async def delete_task(task_id: int) -> None:
    if task_id not in _tasks:
        raise NotFoundException(f"Task {task_id} not found")
    del _tasks[task_id]

app = Litestar(route_handlers=[list_tasks, create_task, get_task, update_task, delete_task])

Test the API:

# Create a task
curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "Write unit tests", "priority": "high"}'

# Output:
{
  "id": 1,
  "title": "Write unit tests",
  "description": "",
  "priority": "high",
  "status": "pending",
  "created_at": "2026-05-19T08:00:00",
  "completed_at": null
}

# Mark as done
curl -X PATCH http://localhost:8000/tasks/1 \
  -H "Content-Type: application/json" \
  -d '{"status": "done"}'

# Filter by status
curl "http://localhost:8000/tasks?status=done"

This API is ready to extend: swap the in-memory dict for SQLAlchemy or SQLite with aiosqlite, add JWT authentication as an app-level middleware, or add pagination parameters to list_tasks. The Litestar structure keeps each concern separate — routing logic in the handlers, data shape in the models, and access control in middleware or dependencies.

Frequently Asked Questions

How does Litestar compare to FastAPI in production?

Litestar benchmarks show similar or slightly better performance than FastAPI on most workloads, since both are built on Starlette and ASGI. The main practical differences are: Litestar has built-in DTOs (no need to create separate response models), a more structured dependency injection system with scoping, and first-class support for msgspec serialization which is significantly faster than Pydantic for high-throughput APIs. FastAPI has a larger ecosystem and more tutorials; Litestar is the better choice if you value stronger typing and want to avoid extra plugins.

Can I use SQLAlchemy with Litestar?

Yes — Litestar has first-class SQLAlchemy integration via litestar.contrib.sqlalchemy. It includes base classes for async SQLAlchemy models and a repository pattern that handles sessions and transactions cleanly. Install with pip install "litestar[sqlalchemy]". The dependency injection system makes it straightforward to inject a database session per-request with proper cleanup after each response.

How do I test a Litestar application?

Litestar provides a TestClient that wraps httpx.AsyncClient. Use it with pytest-asyncio for async test support: from litestar.testing import TestClient. The client sends requests directly to your ASGI app without a network connection, making tests fast and deterministic. Dependency overrides work the same as in FastAPI — replace the real database dependency with an in-memory version in tests.

Does Litestar support WebSockets?

Yes, WebSocket handlers are first-class in Litestar. Decorate an async function with @websocket("/ws/chat") and use WebSocket as the parameter type. Litestar handles connection upgrades and provides ws.receive_data() and ws.send_data() methods. For Server-Sent Events (one-way streaming), use the ServerSentEvent response type.

Can I customize the generated OpenAPI schema?

Litestar gives you fine-grained control over the OpenAPI output through the OpenAPIConfig class passed to the app. You can set the title, version, contact info, license, and custom schema paths. Individual handlers support tags, summary, description, and deprecated parameters. For fields, Pydantic’s Field(description=..., example=...) annotations flow directly into the schema.

Conclusion

Litestar provides a production-ready async API framework with capabilities that FastAPI requires plugins to match: built-in DTOs, layered dependency injection, and support for multiple serialization backends. We covered route handlers and HTTP verbs, path and query parameters, dependency injection with scoping, DTOs for input/output control, and built the complete task management API.

The logical next step is to connect the task API to a real database using litestar.contrib.sqlalchemy and add JWT authentication middleware. Litestar’s layered architecture makes both additions clean — the routing code stays untouched while you swap the data layer underneath it.

Full documentation is at docs.litestar.dev with comprehensive guides on SQLAlchemy integration, authentication, testing, and deployment.

How To Use Python litellm for Multi-Model LLM Integration

How To Use Python litellm for Multi-Model LLM Integration

Intermediate

You build an application on GPT-4o. Then the pricing changes, or you need Claude for a specific task, or a new open-source model outperforms both on your benchmark. Switching providers means rewriting your API calls, updating your response parsing, handling different error formats, and adjusting your retry logic — because every major LLM provider has a slightly different interface. That is the problem litellm exists to solve.

litellm is a unified Python interface for calling 100+ LLM providers — OpenAI, Anthropic, Google Gemini, Mistral, Cohere, local Ollama models, and more — using the same function signature. Switching from GPT-4o to Claude 3.5 Sonnet is literally a one-character change to the model parameter. You need Python 3.8+, relevant API keys for the providers you use, and pip install litellm.

This article covers the core litellm API, model routing and fallbacks, cost tracking, load balancing across multiple providers, streaming responses, async calls, and using litellm as a local proxy server. By the end you will have a flexible LLM integration layer that lets you swap providers without touching your application code.

Multi-Model LLM Calls: Quick Example

Install litellm and try calling two different providers with identical code:

# quick_litellm.py
import litellm
import os

# litellm reads API keys from environment variables automatically:
# OPENAI_API_KEY, ANTHROPIC_API_KEY, GEMINI_API_KEY, etc.

def ask(model: str, question: str) -> str:
    response = litellm.completion(
        model=model,
        messages=[{"role": "user", "content": question}]
    )
    return response.choices[0].message.content

question = "In one sentence, what is a Python decorator?"

# Same function, different providers
print("GPT-4o-mini:", ask("gpt-4o-mini", question))
print("Claude Haiku:", ask("claude-3-haiku-20240307", question))

Output:

GPT-4o-mini: A Python decorator is a function that wraps another function to extend or modify its behavior without changing its source code.
Claude Haiku: A decorator is a callable that takes a function and returns a modified version of it, adding behavior before or after the original function runs.

The response object follows the OpenAI format for every provider — response.choices[0].message.content works the same whether you are calling GPT, Claude, or Gemini. litellm translates each provider’s native response format into the OpenAI schema internally. This means your parsing code never changes, only the model string does.

LiteLLM connecting multiple LLM providers
One interface, a hundred models. The provider is just a string.

What Is litellm and Why Use It?

Different LLM providers expose different APIs: OpenAI uses client.chat.completions.create(), Anthropic uses client.messages.create() with a different message format, Google uses generativeai.GenerativeModel(). When you write directly against one provider’s SDK, switching providers requires changing not just the API call but also message formatting, response parsing, error handling, and token counting.

litellm acts as an adapter layer. You write once against the OpenAI-style interface, and litellm translates the call to whatever the chosen provider expects. It also normalizes error types across providers — a rate limit error from Anthropic and one from OpenAI both become the same exception class in litellm.

FeatureDirect Provider SDKslitellm
Switch providersRewrite API callsChange model string
Error handlingPer-provider exceptionsUnified exceptions
Cost trackingManual calculationBuilt-in per-call
FallbacksManual try/exceptBuilt-in routing
Local modelsCustom client setupollama/model-name

The tradeoff is a small abstraction overhead and occasional lag when a new provider model is released (litellm needs to add support for it). For most production use cases, the flexibility far outweighs these limitations.

Installation and Configuration

# Terminal
pip install litellm

litellm reads API keys from standard environment variable names. Set them once in your shell profile or .env file:

# .env (use python-dotenv to load in your app)
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GEMINI_API_KEY=...
MISTRAL_API_KEY=...
COHERE_API_KEY=...
# config_check.py
import litellm
import os
from dotenv import load_dotenv

load_dotenv()

# litellm.check_valid_key verifies a key works for a given model
# (makes a tiny test call -- costs a fraction of a cent)
valid = litellm.utils.check_valid_key(
    model="gpt-4o-mini",
    api_key=os.environ.get("OPENAI_API_KEY", "")
)
print(f"OpenAI key valid: {valid}")

Output:

OpenAI key valid: True

Built-In Cost Tracking

Every litellm response includes token usage and cost metadata. This is invaluable for monitoring spend across providers in multi-model applications:

# cost_tracking.py
import litellm

response = litellm.completion(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Explain Python generators in 3 sentences."}]
)

# Token usage is on every response
usage = response.usage
print(f"Prompt tokens:     {usage.prompt_tokens}")
print(f"Completion tokens: {usage.completion_tokens}")
print(f"Total tokens:      {usage.total_tokens}")

# Cost calculation (litellm knows pricing for 100+ models)
cost = litellm.completion_cost(completion_response=response)
print(f"Cost: ${cost:.6f}")

Output:

Prompt tokens:     18
Completion tokens: 67
Total tokens:      85
Cost: $0.000043

You can use litellm.completion_cost() to calculate costs without making a real API call, if you already have token counts. This lets you estimate costs before a batch job or build a budget-enforcing wrapper. For a multi-provider application where you want to track spend by provider, aggregate the cost values per call and store them in your database alongside the model name.

LiteLLM cost tracking per provider
completion_cost() per call. Budget leaks found before the invoice arrives.

Automatic Fallbacks and Routing

One of litellm‘s most powerful features is automatic fallbacks: if the primary model fails (rate limit, downtime, API error), it automatically tries the next model in your fallback list. This makes your application resilient to provider outages without any custom retry logic:

# fallback_routing.py
import litellm

# Define a priority list -- litellm tries each in order on failure
litellm.set_verbose = False  # suppress debug logs

response = litellm.completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What is Python's GIL in one sentence?"}],
    fallbacks=[
        "claude-3-5-sonnet-20241022",   # first fallback
        "gemini/gemini-1.5-flash",       # second fallback
        "gpt-4o-mini"                    # final fallback
    ],
    # Optional: only fall back on specific error types
    context_window_fallbacks=[          # for token limit errors specifically
        {"gpt-4o": ["gpt-4o-mini"]},
        {"claude-3-5-sonnet-20241022": ["claude-3-haiku-20240307"]}
    ]
)

print(f"Model used: {response.model}")
print(f"Response: {response.choices[0].message.content}")

Output:

Model used: gpt-4o
Response: Python's GIL (Global Interpreter Lock) is a mutex that prevents multiple threads from executing Python bytecode simultaneously in CPython, limiting true parallelism.

The context_window_fallbacks parameter handles a common real-world scenario: you hit the context limit of a large model mid-conversation and need to automatically switch to a model with a larger context window (or vice versa — a smaller model for simple tasks). litellm detects context window exceeded errors and routes accordingly without your code needing to catch and handle those exceptions manually.

Load Balancing Across Multiple API Keys

When you have multiple API keys for the same provider (common in organizations with per-team keys), litellm can distribute calls across them to stay under rate limits:

# load_balancing.py
from litellm import Router

# Define your pool of models/keys
model_list = [
    {
        "model_name": "gpt4-pool",       # your internal name for this pool
        "litellm_params": {
            "model": "gpt-4o-mini",
            "api_key": "sk-team-a-key",  # Team A's key
        },
    },
    {
        "model_name": "gpt4-pool",
        "litellm_params": {
            "model": "gpt-4o-mini",
            "api_key": "sk-team-b-key",  # Team B's key
        },
    },
    {
        "model_name": "gpt4-pool",
        "litellm_params": {
            "model": "claude-3-haiku-20240307",  # cheaper fallback
            "api_key": "sk-ant-key",
        },
    },
]

router = Router(
    model_list=model_list,
    routing_strategy="least-busy",  # or "simple-shuffle", "usage-based-routing"
    num_retries=3,
    timeout=30,
)

response = router.completion(
    model="gpt4-pool",
    messages=[{"role": "user", "content": "Define 'duck typing' in Python."}]
)
print(response.choices[0].message.content)

Output:

Duck typing in Python means an object's suitability is determined by the presence of certain methods and attributes rather than its actual type -- if it walks like a duck and quacks like a duck, Python treats it like one.

The Router class is designed for production deployments. The "least-busy" strategy routes each request to whichever deployment currently has the fewest in-flight requests. Combined with the fallback list, this gives you automatic load balancing and failover without an external proxy service.

LiteLLM load balancing across providers
routing_strategy=”least-busy”. Your rate limits are now someone else’s problem.

Streaming Responses

For chat applications and real-time displays, streaming lets you show the response as it generates instead of waiting for completion. litellm streaming works the same across all providers:

# streaming_example.py
import litellm
import sys

response = litellm.completion(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Write a 3-step guide to Python list comprehensions."}],
    stream=True
)

print("Streaming response:")
full_text = ""
for chunk in response:
    delta = chunk.choices[0].delta.content
    if delta:
        print(delta, end="", flush=True)
        full_text += delta

print()  # newline after stream
print(f"\nTotal chars received: {len(full_text)}")

Output:

Streaming response:
1. **Basic syntax**: [expression for item in iterable]
2. **With condition**: [x*2 for x in range(10) if x % 2 == 0]
3. **Nested**: [cell for row in matrix for cell in row]

Total chars received: 112

Async Support

For high-throughput applications, use litellm.acompletion() to make concurrent API calls. This is far faster than sequential calls when processing multiple prompts:

# async_litellm.py
import asyncio
import litellm

async def classify(text: str, index: int) -> dict:
    response = await litellm.acompletion(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"Classify as 'technical', 'business', or 'other': '{text}'. Reply with just the category."
        }]
    )
    return {
        "index": index,
        "text": text[:40],
        "category": response.choices[0].message.content.strip().lower()
    }

async def main():
    texts = [
        "How do I implement a binary search tree in Python?",
        "What is our Q3 revenue forecast?",
        "Please schedule a team lunch for Friday.",
        "Explain async/await concurrency model.",
        "Update the marketing deck for the board meeting.",
    ]

    # Run all 5 calls concurrently -- takes ~1s instead of ~5s
    tasks = [classify(text, i) for i, text in enumerate(texts)]
    results = await asyncio.gather(*tasks)

    for r in results:
        print(f"[{r['index']}] {r['category']:10s} | {r['text']}...")

asyncio.run(main())

Output:

[0] technical  | How do I implement a binary search tr...
[1] business   | What is our Q3 revenue forecast?...
[2] other      | Please schedule a team lunch for Frida...
[3] technical  | Explain async/await concurrency model....
[4] business   | Update the marketing deck for the board...

By using asyncio.gather(), all 5 API calls run concurrently. The total time is roughly equal to the slowest single call rather than the sum of all calls. For batch classification, summarization, or embedding tasks with many inputs, this pattern reduces wall-clock time dramatically — a 50-item batch that takes 50 seconds sequentially often completes in 5-8 seconds with async.

Running litellm as a Local Proxy Server

litellm can run as an OpenAI-compatible proxy server, so any tool that supports OpenAI’s API (like VS Code extensions, LangChain, or custom apps) can use it to route to any backend. Start it from the terminal:

# Terminal -- start the proxy server
litellm --model gpt-4o-mini --port 8000

# Or with a config file for multiple models:
# litellm --config litellm_config.yaml --port 8000
# litellm_config.yaml
model_list:
  - model_name: gpt-4o
    litellm_params:
      model: gpt-4o
      api_key: sk-...
  - model_name: claude-sonnet
    litellm_params:
      model: claude-3-5-sonnet-20241022
      api_key: sk-ant-...

router_settings:
  routing_strategy: least-busy
# proxy_client.py -- connect to the local proxy using standard OpenAI SDK
from openai import OpenAI

# Point to your local litellm proxy
client = OpenAI(base_url="http://localhost:8000", api_key="any-string")

response = client.chat.completions.create(
    model="claude-sonnet",    # name from your config
    messages=[{"role": "user", "content": "What is Python's walrus operator?"}]
)
print(response.choices[0].message.content)

Output:

Python's walrus operator (:=) is an assignment expression introduced in Python 3.8 that lets you assign and return a value in the same expression -- useful in while loops and comprehensions to avoid evaluating the same expression twice.

Real-Life Example: Multi-Provider Summarization Service

LiteLLM parallel provider routing
One service, three providers. The cheapest available wins each call.
# summarization_service.py
"""
Multi-provider summarization service with cost tracking and fallbacks.
Uses litellm Router to distribute load and fall back automatically.
"""
import litellm
from litellm import Router
from dataclasses import dataclass
from typing import Optional

@dataclass
class SummaryResult:
    text: str
    model_used: str
    tokens: int
    cost_usd: float

# Router with three providers, cheapest first
router = Router(
    model_list=[
        {"model_name": "summarizer", "litellm_params": {"model": "gpt-4o-mini", "api_key": "ENV/OPENAI_API_KEY"}},
        {"model_name": "summarizer", "litellm_params": {"model": "claude-3-haiku-20240307", "api_key": "ENV/ANTHROPIC_API_KEY"}},
        {"model_name": "summarizer", "litellm_params": {"model": "gemini/gemini-1.5-flash", "api_key": "ENV/GEMINI_API_KEY"}},
    ],
    routing_strategy="simple-shuffle",  # spread load across providers
    num_retries=2,
)

def summarize(text: str, max_words: int = 50) -> Optional[SummaryResult]:
    """Summarize text using the next available provider."""
    prompt = f"Summarize the following in {max_words} words or fewer:\n\n{text}"
    try:
        response = router.completion(
            model="summarizer",
            messages=[{"role": "user", "content": prompt}]
        )
        cost = litellm.completion_cost(completion_response=response)
        return SummaryResult(
            text=response.choices[0].message.content,
            model_used=response.model,
            tokens=response.usage.total_tokens,
            cost_usd=cost,
        )
    except Exception as e:
        print(f"All providers failed: {e}")
        return None

if __name__ == "__main__":
    articles = [
        "Python 3.14 introduces t-strings (template strings), a new string prefix that gives developers programmatic access to string interpolation before it happens. Unlike f-strings which evaluate immediately, t-strings return a Template object that can be inspected, modified, or sanitized before rendering -- making them ideal for use cases like SQL queries, HTML templates, and logging where untrusted data could cause injection attacks.",
        "The Python Software Foundation announced that Python 3.12 will receive security-only fixes starting in January 2025, with end-of-life in October 2028. Users of Python 3.11 and earlier are encouraged to upgrade, as each version receives bug fixes for 18 months after release and security fixes for 5 years total.",
    ]

    total_cost = 0.0
    for i, article in enumerate(articles, 1):
        result = summarize(article, max_words=30)
        if result:
            print(f"\n--- Article {i} ---")
            print(f"Summary: {result.text}")
            print(f"Model: {result.model_used} | Tokens: {result.tokens} | Cost: ${result.cost_usd:.6f}")
            total_cost += result.cost_usd

    print(f"\nTotal cost for {len(articles)} summaries: ${total_cost:.6f}")

Output:

--- Article 1 ---
Summary: Python 3.14 t-strings return a Template object instead of evaluating immediately, enabling inspection and sanitization before rendering -- ideal for preventing injection attacks.
Model: gpt-4o-mini | Tokens: 103 | Cost: $0.000052

--- Article 2 ---
Summary: Python 3.12 enters security-only support in Jan 2025, ending Oct 2028. Users on 3.11 or earlier should upgrade.
Model: claude-3-haiku-20240307 | Tokens: 88 | Cost: $0.000018

Total cost for 2 summaries: $0.000070

This service pattern is immediately deployable as a FastAPI endpoint. Add authentication, a database to log each call’s cost and model, and a monthly budget cap per user — and you have a production-ready multi-provider LLM service. The router handles provider selection and failover transparently, and the SummaryResult dataclass gives your callers full visibility into which model was used and what it cost.

Frequently Asked Questions

Which models does litellm support?

litellm supports 100+ models across providers including OpenAI, Anthropic, Google (Gemini and Vertex AI), AWS Bedrock, Azure OpenAI, Mistral, Cohere, Hugging Face inference endpoints, and local models via Ollama. The full list is at docs.litellm.ai/docs/providers. Model naming follows the convention provider/model-name for non-OpenAI models (e.g., gemini/gemini-1.5-flash, ollama/llama3.2).

How accurate is the cost tracking?

litellm maintains a pricing database that is updated with each library release. For providers with stable public pricing (OpenAI, Anthropic, Google), the cost estimates are highly accurate. For providers that change pricing frequently or offer custom contracts, verify against your provider’s billing dashboard. The completion_cost() function returns 0.0 if the model is not in the pricing database, so you can detect gaps.

Does litellm add latency?

The overhead is typically 1-5 milliseconds per call — the time to translate the request and response format. For most LLM use cases where the model call takes 500ms-5000ms, this is negligible. If you are benchmarking latency-sensitive applications, measure with and without litellm and confirm the overhead is acceptable for your use case.

Can I use litellm with local models?

Yes. For Ollama, set model="ollama/llama3.2" and ensure Ollama is running locally. For other local inference servers (vLLM, LM Studio), point api_base at your local endpoint and use the openai/ prefix. No API key is needed for local models — pass api_key="local" as a placeholder.

How many concurrent calls can I make with acompletion?

This is limited by your provider’s rate limits, not by litellm. For OpenAI’s Tier 1, you can typically make 3,500 requests per minute on GPT-4o-mini. Use a semaphore in your async code to limit concurrency if you are hitting rate limits: async with asyncio.Semaphore(20): result = await litellm.acompletion(...).

How does litellm compare to LangChain’s model abstractions?

LangChain provides a complete pipeline framework (prompts, chains, agents, memory) and uses its own model wrapper classes. litellm is focused solely on the API call layer — it is a drop-in replacement for the raw API call, not a pipeline framework. They complement each other: LangChain can use litellm‘s proxy endpoint as its OpenAI backend, giving you LangChain’s pipeline features plus litellm‘s multi-provider routing. For projects that do not need LangChain’s pipeline abstractions, litellm alone is simpler.

Conclusion

The litellm library removes the vendor lock-in problem from LLM applications. We covered the unified completion interface, built-in cost tracking, automatic fallbacks, the Router class for load balancing, streaming, async concurrency, and the local proxy server. The summarization service example showed how these features combine into a production-ready multi-provider service.

The natural next step is to extend the real-life example into a FastAPI service with per-user cost caps. Once your application is behind a litellm abstraction layer, you can experiment with new models freely — the day a better or cheaper model is released, switching is a one-line config change.

Full documentation and provider setup guides are at docs.litellm.ai. The GitHub repository has an active community and frequent updates as new providers are added.

How To Use Python instructor for Structured LLM Outputs

How To Use Python instructor for Structured LLM Outputs

Intermediate

You ask an LLM to extract a user’s name, age, and email from a paragraph of text. Sometimes it returns clean JSON. Sometimes it returns JSON wrapped in markdown fences. Sometimes it returns a paragraph explaining why it extracted those fields. If you have ever built a pipeline that breaks because the model decided today was a good day to add “Sure! Here is the extracted data:” before the JSON, you already understand why instructor exists.

The instructor library patches the OpenAI client (and any OpenAI-compatible API) to force the model to return a fully validated Pydantic model — every time. When validation fails, it retries automatically. You define exactly what fields you need, with their types and constraints, and instructor handles the conversation with the model until the output matches your schema. You need Python 3.9+, an OpenAI API key (or compatible endpoint), and pip install instructor.

This article walks through everything you need to get structured LLM outputs in production: installing and patching the client, defining Pydantic schemas, extracting nested objects, handling lists, using validation hooks, working with non-OpenAI models via LiteLLM, and building a real extraction pipeline. By the end you will have a reusable pattern for reliable structured data from any LLM.

Structured LLM Output: Quick Example

The fastest way to see instructor in action is to extract a structured object from a single sentence. Install the library and try this:

# quick_instructor.py
import instructor
from openai import OpenAI
from pydantic import BaseModel

client = instructor.from_openai(OpenAI())

class Person(BaseModel):
    name: str
    age: int
    city: str

person = client.chat.completions.create(
    model="gpt-4o-mini",
    response_model=Person,
    messages=[{"role": "user", "content": "Alice is 32 years old and lives in Melbourne."}]
)

print(person.name)   # Alice
print(person.age)    # 32
print(person.city)   # Melbourne
print(type(person))  # <class '__main__.Person'>

Output:

Alice
32
Melbourne
<class '__main__.Person'>

The key line is instructor.from_openai(OpenAI()) — this patches the standard OpenAI client. After that, you pass response_model=Person to any chat.completions.create call, and instructor automatically: sends the Pydantic schema to the model as a tool definition, parses the model’s tool-call response, validates it against your schema, and retries if validation fails. The return value is a fully typed Pydantic object, not a string or dict.

That example covers the simplest case. The sections below show how to handle nested models, lists, validation rules, retry configuration, and real-world pipelines.

Instructor converts chaotic LLM output to clean schema
response_model= and the chaos becomes a schema.

What Is instructor and Why Use It?

When you call an LLM without constraints, it returns free-form text. Parsing that text into structured data is fragile — you write regex, JSON parsers, and fallback handlers that break every time the model changes its wording. instructor solves this by using OpenAI’s function/tool calling feature under the hood: it converts your Pydantic model into a JSON Schema tool definition, forces the model to call that tool, and validates the returned arguments against your schema.

The result is LLM output that behaves like a typed function return value instead of a string you have to parse. If the model returns a field with the wrong type (for example, age as a string “thirty-two” instead of an integer), instructor sends the validation error back to the model and asks it to try again — up to a configurable number of retries.

ApproachReliabilityType SafetyAuto-Retry
Parse raw LLM textFragileNoneManual
Parse JSON from promptModerateManualManual
OpenAI function callingGoodPartialNone
instructor + PydanticHighFullBuilt-in

The library supports multiple backends: instructor.from_openai, instructor.from_anthropic, instructor.from_gemini, and any OpenAI-compatible endpoint via base_url. This makes it the same interface regardless of which model you use.

Installation and Setup

Install instructor and the OpenAI SDK together. If you are using a different provider, you may also need their SDK:

# Terminal
pip install instructor openai pydantic

Set your API key as an environment variable so it never appears in your code:

# setup_env.py -- run once, or add to your shell profile
import os
# In practice, set this in your shell:
# export OPENAI_API_KEY="sk-..."
print("OPENAI_API_KEY set:", bool(os.environ.get("OPENAI_API_KEY")))

Output:

OPENAI_API_KEY set: True

Patch the client once at startup and reuse it for all calls. Creating a new patched client for every request is wasteful:

# client_setup.py
import instructor
from openai import OpenAI

# Patch once at startup
client = instructor.from_openai(OpenAI())  # reads OPENAI_API_KEY from env

# The client now has response_model support on all completion calls
print(type(client))  # <class 'instructor.client.Instructor'>

Output:

<class 'instructor.client.Instructor'>
Patching OpenAI client with Pydantic validator
One patch. Every completion call now speaks schema.

Defining Pydantic Schemas for Extraction

Your Pydantic model defines exactly what fields the LLM must return. Field descriptions improve accuracy significantly — the model uses them as instructions for what to put in each field. Use Field(description=...) to guide the extraction:

# schema_example.py
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import Optional

client = instructor.from_openai(OpenAI())

class JobPosting(BaseModel):
    title: str = Field(description="The exact job title as written in the posting")
    company: str = Field(description="Company name offering the position")
    location: str = Field(description="City and country, or 'Remote'")
    salary_min: Optional[int] = Field(None, description="Minimum annual salary in USD if mentioned")
    salary_max: Optional[int] = Field(None, description="Maximum annual salary in USD if mentioned")
    is_remote: bool = Field(description="True if the role allows remote work")

text = """
Senior Python Developer at DataFlow Inc. -- Remote (US timezones preferred).
Salary range: $140,000 - $175,000 per year. Must have 5+ years Python experience.
"""

job = client.chat.completions.create(
    model="gpt-4o-mini",
    response_model=JobPosting,
    messages=[{"role": "user", "content": f"Extract the job details from: {text}"}]
)

print(f"Title: {job.title}")
print(f"Company: {job.company}")
print(f"Location: {job.location}")
print(f"Salary: ${job.salary_min:,} - ${job.salary_max:,}")
print(f"Remote: {job.is_remote}")

Output:

Title: Senior Python Developer
Company: DataFlow Inc.
Location: Remote (US timezones preferred)
Salary: $140,000 - $175,000
Remote: True

The Optional[int] type tells instructor (and the model) that salary fields may be absent. When the source text does not mention a salary, these fields will be None instead of hallucinated values. Always use Optional for fields that may not appear in the input — without it, the model will invent plausible-sounding values rather than leaving the field empty.

Extracting Nested and List Objects

Real-world extraction often requires nested structures — for example, an invoice with multiple line items, or a resume with a list of work experiences. instructor handles nested Pydantic models and List types natively:

# nested_extraction.py
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import List

client = instructor.from_openai(OpenAI())

class LineItem(BaseModel):
    description: str
    quantity: int
    unit_price: float

class Invoice(BaseModel):
    vendor: str
    invoice_number: str
    items: List[LineItem]
    total: float

invoice_text = """
Invoice #INV-2024-0891 from CloudHost Solutions
- 3x Server instances @ $45.00 each
- 1x SSL Certificate @ $12.00
- 2x Domain registrations @ $15.00 each
Total: $222.00
"""

result = client.chat.completions.create(
    model="gpt-4o-mini",
    response_model=Invoice,
    messages=[{"role": "user", "content": f"Extract invoice data: {invoice_text}"}]
)

print(f"Vendor: {result.vendor}")
print(f"Invoice #: {result.invoice_number}")
for item in result.items:
    print(f"  {item.quantity}x {item.description} @ ${item.unit_price:.2f}")
print(f"Total: ${result.total:.2f}")

Output:

Vendor: CloudHost Solutions
Invoice #: INV-2024-0891
  3x Server instances @ $45.00
  1x SSL Certificate @ $12.00
  2x Domain registrations @ $15.00
Total: $222.00

Nested models work because instructor converts the entire schema — including nested classes — into a JSON Schema definition that the model understands. The model fills in every field of every nested object, and Pydantic validates the whole structure recursively. If the items list is missing or a line item has an invalid type, instructor retries the extraction with the validation error as feedback.

Instructor nested Pydantic models
Nested Pydantic models: recursion that actually works.

Adding Custom Validation Rules

Pydantic’s field_validator lets you add business logic on top of type checking. instructor automatically feeds validation errors back to the model, so the model gets a second (or third) chance to return values that satisfy your rules:

# custom_validation.py
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field, field_validator
from typing import List

client = instructor.from_openai(OpenAI())

class ProductReview(BaseModel):
    product_name: str
    rating: int = Field(description="Rating from 1 to 5")
    pros: List[str] = Field(description="List of positive aspects, at least one")
    cons: List[str] = Field(description="List of negative aspects, can be empty")
    summary: str = Field(description="One-sentence summary under 150 characters")

    @field_validator("rating")
    @classmethod
    def rating_in_range(cls, v: int) -> int:
        if not 1 <= v <= 5:
            raise ValueError(f"Rating must be between 1 and 5, got {v}")
        return v

    @field_validator("pros")
    @classmethod
    def at_least_one_pro(cls, v: List[str]) -> List[str]:
        if not v:
            raise ValueError("Must include at least one positive aspect")
        return v

    @field_validator("summary")
    @classmethod
    def summary_length(cls, v: str) -> str:
        if len(v) > 150:
            raise ValueError(f"Summary too long: {len(v)} chars (max 150)")
        return v

text = """
The new Python IDE is pretty solid. Boot time is fast, autocomplete works well.
The memory usage is high and the plugin store is still sparse. Overall a decent
choice for Python development. I'd give it 4 out of 5.
"""

review = client.chat.completions.create(
    model="gpt-4o-mini",
    response_model=ProductReview,
    messages=[{"role": "user", "content": f"Extract review details: {text}"}]
)

print(f"Product: {review.product_name}")
print(f"Rating: {review.rating}/5")
print(f"Pros: {review.pros}")
print(f"Cons: {review.cons}")
print(f"Summary: {review.summary}")

Output:

Product: Python IDE
Rating: 4/5
Pros: ['Fast boot time', 'Good autocomplete']
Cons: ['High memory usage', 'Sparse plugin store']
Summary: A solid Python IDE with fast performance but limited plugins and high memory usage.

When a validator raises ValueError, instructor captures the error message and sends it back to the model in a follow-up message: “Validation failed: Rating must be between 1 and 5, got 6. Please fix and try again.” The model then self-corrects. By default, instructor retries up to 3 times before raising an exception. You can configure this with max_retries=N on the completion call.

Configuring Retries and Modes

instructor supports several extraction modes depending on what your model supports. The default mode uses OpenAI’s tool calling, but you can switch to JSON mode or other strategies:

# retry_config.py
import instructor
from instructor import Mode
from openai import OpenAI
from pydantic import BaseModel

# Default: tool calling (most reliable for OpenAI models)
client_tools = instructor.from_openai(OpenAI())

# JSON mode: model returns raw JSON instead of a tool call
client_json = instructor.from_openai(OpenAI(), mode=Mode.JSON)

# MD_JSON mode: model wraps JSON in markdown fences (useful for some fine-tunes)
client_md = instructor.from_openai(OpenAI(), mode=Mode.MD_JSON)

class City(BaseModel):
    name: str
    country: str
    population: int

# Control retries per-call
city = client_tools.chat.completions.create(
    model="gpt-4o-mini",
    response_model=City,
    max_retries=5,           # retry up to 5 times on validation failure
    messages=[{"role": "user", "content": "Tell me about Tokyo"}]
)

print(f"{city.name}, {city.country}: pop {city.population:,}")

Output:

Tokyo, Japan: pop 13,960,000

For most OpenAI models, the default tool-calling mode is most reliable. Use Mode.JSON for models that support JSON mode but not tool calling — for example, some fine-tuned models or older GPT versions. The max_retries parameter controls how many times instructor will re-prompt the model when validation fails. For production pipelines where data quality matters more than cost, set this to 3-5.

Instructor retry and self-correction loop
Three retries and a Pydantic error. That’s the whole self-correction system.

Using instructor with Non-OpenAI Models

If you are using Anthropic’s Claude, Google Gemini, or a local model via Ollama, instructor has provider-specific patches. For OpenAI-compatible endpoints (like local LLMs with an OpenAI-compatible API), you can pass a custom base_url:

# multi_provider.py
import instructor
from anthropic import Anthropic
from pydantic import BaseModel

# Anthropic Claude -- uses a different client class
anthropic_client = instructor.from_anthropic(Anthropic())

class Sentiment(BaseModel):
    label: str   # "positive", "negative", or "neutral"
    score: float # confidence from 0.0 to 1.0
    reason: str  # one-sentence explanation

result = anthropic_client.messages.create(
    model="claude-3-haiku-20240307",
    max_tokens=256,
    response_model=Sentiment,
    messages=[{
        "role": "user",
        "content": "This new Python library is fantastic, saves me hours every week!"
    }]
)

print(f"Sentiment: {result.label} ({result.score:.0%})")
print(f"Reason: {result.reason}")

Output:

Sentiment: positive (96%)
Reason: The user expresses strong enthusiasm and quantifies time savings, indicating genuine satisfaction.

For local models via Ollama (which provides an OpenAI-compatible API on localhost:11434), create the client with a custom base URL:

# ollama_instructor.py
import instructor
from openai import OpenAI
from pydantic import BaseModel

# Ollama runs an OpenAI-compatible server locally
ollama_client = instructor.from_openai(
    OpenAI(base_url="http://localhost:11434/v1", api_key="ollama"),
    mode=instructor.Mode.JSON  # use JSON mode for local models
)

class Summary(BaseModel):
    headline: str
    key_points: list[str]

# Works the same as OpenAI -- just a different backend
# summary = ollama_client.chat.completions.create(
#     model="llama3.2",
#     response_model=Summary,
#     messages=[{"role": "user", "content": "Summarize Python's async/await model"}]
# )
print("Local model client ready -- uncomment to use with Ollama running")

Output:

Local model client ready -- uncomment to use with Ollama running

Real-Life Example: Job Posting Extraction Pipeline

Here is a complete pipeline that reads job postings from a list of texts, extracts structured data, filters by criteria, and exports to CSV — the kind of task that comes up in recruiting tools, market research, and job aggregators:

Instructor batch structured extraction
Structured extraction at scale: parsing 50 job posts is just a for loop.
# job_extraction_pipeline.py
import instructor
import csv
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import Optional, List

client = instructor.from_openai(OpenAI())

class JobPosting(BaseModel):
    title: str = Field(description="Job title exactly as written")
    company: str
    location: str = Field(description="City/country or 'Remote'")
    salary_min: Optional[int] = Field(None, description="Min annual salary USD")
    salary_max: Optional[int] = Field(None, description="Max annual salary USD")
    required_years: Optional[int] = Field(None, description="Years of experience required")
    technologies: List[str] = Field(description="List of technologies mentioned")
    is_remote: bool

# Sample job postings to process
JOB_TEXTS = [
    """Senior Python Engineer at Nexaflow -- Remote-first.
    $150k-$190k. 5+ years Python, FastAPI, PostgreSQL, AWS required.""",

    """Junior Data Scientist at BioMetrics Ltd (London, UK).
    GBP 45,000-55,000. 0-2 years exp, pandas, scikit-learn, matplotlib.""",

    """Staff ML Engineer at Quantra -- San Francisco CA.
    $220,000 - $280,000/yr. 8+ years, PyTorch, CUDA, distributed training.""",
]

def extract_jobs(texts: List[str]) -> List[JobPosting]:
    """Extract structured job data from raw posting texts."""
    jobs = []
    for i, text in enumerate(texts, 1):
        job = client.chat.completions.create(
            model="gpt-4o-mini",
            response_model=JobPosting,
            max_retries=3,
            messages=[{"role": "user", "content": f"Extract job details:\n\n{text}"}]
        )
        jobs.append(job)
        print(f"[{i}/{len(texts)}] Extracted: {job.title} at {job.company}")
    return jobs

def filter_remote(jobs: List[JobPosting]) -> List[JobPosting]:
    return [j for j in jobs if j.is_remote]

def export_csv(jobs: List[JobPosting], path: str) -> None:
    with open(path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Title", "Company", "Location", "Salary Min", "Salary Max",
                         "Yrs Required", "Technologies", "Remote"])
        for j in jobs:
            writer.writerow([
                j.title, j.company, j.location,
                j.salary_min or "", j.salary_max or "",
                j.required_years or "",
                ", ".join(j.technologies),
                j.is_remote
            ])

if __name__ == "__main__":
    print("Extracting job postings...")
    jobs = extract_jobs(JOB_TEXTS)
    remote_jobs = filter_remote(jobs)
    print(f"\nTotal extracted: {len(jobs)}, Remote: {len(remote_jobs)}")
    export_csv(jobs, "jobs_extracted.csv")
    print("Saved to jobs_extracted.csv")

Output:

Extracting job postings...
[1/3] Extracted: Senior Python Engineer at Nexaflow
[2/3] Extracted: Junior Data Scientist at BioMetrics Ltd
[3/3] Extracted: Staff ML Engineer at Quantra

Total extracted: 3, Remote: 1
Saved to jobs_extracted.csv

This pipeline is easy to extend: add a database write step, connect it to a web scraper that feeds real job pages, or add more validation rules to the JobPosting model. The core pattern — extract once, validate automatically, retry on failure — stays the same regardless of the scale. You can process thousands of postings by replacing JOB_TEXTS with a generator that reads from a queue or database, keeping the extraction logic identical.

Frequently Asked Questions

Does instructor increase API costs because of retries?

Yes, each retry is an additional API call, so failed extractions cost more. In practice, with well-designed schemas and clear field descriptions, validation failures are rare — under 5% for most extraction tasks. The cost increase is usually worth the reliability gain. If cost is a concern, use max_retries=1 and handle exceptions in your code rather than retrying automatically.

Does instructor support streaming responses?

Yes. Use response_model=Iterable[YourModel] for streaming lists, or Partial[YourModel] for streaming partial updates to a single model. Streaming is useful for large extractions where you want to process results as they arrive rather than waiting for the full response. See the instructor documentation for the streaming API details.

What happens when the model cannot extract a field?

If the field is typed as Optional[X], the model will return None for missing information. If the field is required (non-Optional), the model will either hallucinate a value or fail validation, triggering a retry. For fields that may legitimately be absent in the source text, always use Optional with a None default. This is the most common mistake new users make.

Can I extract data from large documents?

Yes, but be aware of token limits. For documents larger than a few thousand words, split them into chunks and extract from each chunk separately. Use a List[YourModel] return type if a single document contains multiple items to extract (like a list of transactions in a bank statement). For very large documents, consider summarizing first with a regular completion call, then extracting from the summary.

How is this different from just prompting for JSON output?

Prompting for JSON works until it does not — the model adds markdown fences, writes a preamble sentence, or omits fields. instructor uses tool calling (not prompting) to enforce the schema, so the model cannot deviate from the structure. It also runs Pydantic validation on the result and retries if types or constraints are violated. The difference in reliability for production use is significant — JSON prompting is fine for experiments, but instructor is the right tool for pipelines where data quality matters.

Is my data sent to OpenAI when I use instructor?

instructor is a thin wrapper around the OpenAI SDK — your data goes to whatever API endpoint you configure, subject to that provider’s data policy. If you are processing sensitive data, use a self-hosted model via Ollama or another local inference server, and point instructor at your local endpoint with a custom base_url. The library itself does not send data anywhere — it only wraps the client you provide.

Conclusion

The instructor library solves one of the most persistent frustrations in LLM application development: getting the model to return data in the shape your code expects, every time. We covered patching the OpenAI client, defining Pydantic schemas with field descriptions, extracting nested and list objects, adding custom validation rules, configuring retries and modes, and using instructor with non-OpenAI providers. The job extraction pipeline demonstrated how these pieces combine into a production-ready pattern.

The next step is to extend the real-life example: add a web scraper to pull live job postings, or connect the extracted data to a database. With instructor handling the model-to-schema translation, you can focus entirely on the business logic of what to extract and what to do with it.

Full documentation and more examples are at python.useinstructor.com. The library’s GitHub has a large collection of real-world examples including classification, knowledge graph extraction, and citation-backed answers.

How To Use Python tox for Multi-Environment Test Automation

How To Use Python tox for Multi-Environment Test Automation

Intermediate

You have a Python library that works perfectly on your machine running Python 3.11. Then a user files an issue — it crashes on 3.9. Another user is on 3.12. You fix the 3.9 bug and accidentally break 3.11 compatibility. Sound familiar? Testing across multiple Python versions manually means juggling virtual environments, remembering which one to activate, and running your test suite in each — a process so tedious it simply doesn’t happen. Bugs slip through. Users get hurt.

tox solves this by automating multi-environment testing in a single command. It reads a configuration file that lists which Python versions and dependencies to test against, creates isolated virtual environments for each one, installs your package into them, and runs your test suite in every environment — reporting failures per environment. One command, all Python versions, zero manual juggling.

In this article you will learn how to install and configure tox, write a tox.ini file from scratch, run tests across Python 3.9 through 3.12, pass environment variables and extra dependencies, run only a subset of environments, integrate tox with pytest and coverage, use the modern pyproject.toml configuration style, and build a real-world tox setup for a small utility library. By the end you will have a test automation setup that works identically on your laptop and in CI.

Running tox: Quick Example

Before diving into configuration details, here is a minimal tox setup that runs pytest across two Python versions. Create a small project directory, add a module, a test, and a tox.ini file, then run tox.

# project layout
# mylib/
# ├── mylib/
# │   └── utils.py
# ├── tests/
# │   └── test_utils.py
# ├── tox.ini
# └── setup.py (or pyproject.toml)

# mylib/utils.py
def add(a, b):
    return a + b

def greet(name):
    return f"Hello, {name}!"
# tests/test_utils.py
from mylib.utils import add, greet

def test_add():
    assert add(2, 3) == 5

def test_greet():
    assert greet("Alice") == "Hello, Alice!"
# tox.ini
[tox]
envlist = py39, py311

[testenv]
deps = pytest
commands = pytest tests/
# Run from the project root
$ tox
py39 create: /home/user/mylib/.tox/py39
py39 installdeps: pytest
py39 inst: /home/user/mylib/.tox/.tmp/package/1/mylib-0.1.0.tar.gz
py39 run-test: pytest tests/
============================= test session starts ==============================
platform linux -- Python 3.9.18, pytest-8.2.0
collected 2 items
tests/test_utils.py ..                                                   [100%]
============================== 2 passed in 0.12s ===============================
py311 create: /home/user/mylib/.tox/py311
py311 installdeps: pytest
py311 inst: /home/user/mylib/.tox/.tmp/package/1/mylib-0.1.0.tar.gz
py311 run-test: pytest tests/
============================= test session starts ==============================
platform linux -- Python 3.11.8, pytest-8.2.0
collected 2 items
tests/test_utils.py ..                                                   [100%]
============================== 2 passed in 0.12s ===============================
___________________________________ summary ___________________________________
  py39: commands succeeded
  py311: commands succeeded
  congratulations :)

Tox created two completely separate virtual environments — one for Python 3.9, one for 3.11 — installed your package and pytest into each, ran the test suite, and reported results. The key parts are envlist (which environments to run), deps (what to install), and commands (what to execute). Everything below digs into these and more.

What Is tox and Why Use It?

Tox is a generic virtualenv management and test command-line tool. At its core it does three things: creates isolated virtual environments, installs specified dependencies into each, and runs your commands inside them. It was originally designed for testing Python packages across multiple interpreter versions, but it handles any environment-based task — linting, type checking, building docs, running formatters.

The key insight is that tox installs your package into each environment from a source distribution, the same way a user would install it with pip install mylib. This means your tests run against the installed package, not the raw source files. If you forget to list a dependency in your setup.py or pyproject.toml, tox will catch it — the test environment simply won’t have that import available.

ApproachWhat it solvesWhat it misses
Manually activate venvsIsolationRepeatability, automation
pytest onlyTest runningMulti-version, missing deps detection
toxMulti-version + isolation + automationRequires Python versions to be installed
tox + pyenvEverythingSlightly more setup upfront

Install tox into your system Python or a dedicated virtual environment — do not install it inside the project venv you are testing, as this creates circular dependency problems:

# install_tox.sh
pip install tox --break-system-packages
# or into a dedicated tools venv
python -m venv ~/.venvs/tox && ~/.venvs/tox/bin/pip install tox
tox --version
tox 4.15.0 from /home/user/.local/lib/python3.11/site-packages/tox/__init__.py

Tox 4 (released 2022) changed several configuration defaults from tox 3. This article uses tox 4 conventions throughout. The most important difference: tox 4 no longer requires a setup.py — it works with pyproject.toml out of the box.

Debug Dee at futuristic control panel configuring test environments
tox spins up a fresh venv for each envlist entry. No shared state, no surprises.

The tox.ini Configuration File

The tox.ini file lives in your project root alongside setup.py or pyproject.toml. It uses INI syntax with sections that map to environments. Understanding the full set of options unlocks tox’s real power.

[tox] — Global Settings

# tox.ini — full global section
[tox]
# Environments to run when you type just "tox"
envlist = py39, py310, py311, py312

# Minimum tox version required
minversion = 4.0

# Skip missing Python interpreters instead of failing
skip_missing_interpreters = true

# Where to store environment data (default: {toxinidir}/.tox)
toxworkdir = {toxinidir}/.tox

skip_missing_interpreters = true is extremely useful in CI: if you have only Python 3.11 and 3.12 installed, tox skips the 3.9 and 3.10 environments with a warning rather than failing the build. On a developer laptop you might have only one or two Python versions — this setting keeps things friendly. Set it to false in CI if you want strict enforcement.

[testenv] — The Base Environment

The [testenv] section defines defaults inherited by all environments. Any specific environment like [testenv:py39] inherits everything from [testenv] and can override individual values.

# tox.ini — complete testenv section
[tox]
envlist = py39, py311, py312

[testenv]
# Dependencies to install (separate from your package's requirements)
deps =
    pytest>=8.0
    pytest-cov
    requests-mock

# The command to run
commands =
    pytest {posargs:tests/} --cov=mylib --cov-report=term-missing

# Environment variables to pass through or set
setenv =
    PYTHONPATH = {toxinidir}/src
    APP_ENV = testing

# Pass these variables from your shell into the environment
passenv =
    HOME
    CI
    GITHUB_*

# Install the package itself (default: true)
# Set to false for environments that don't need it (e.g., linting)
package = wheel
# Run tox and pass extra pytest args via posargs
$ tox -- -k test_add -v
py39 run-test: pytest tests/ -k test_add -v --cov=mylib --cov-report=term-missing
...
PASSED tests/test_utils.py::test_add

The {posargs} placeholder is how you forward extra arguments to the underlying command. Everything after -- on the tox command line becomes {posargs}. This lets you run a single test or pass -x to stop on first failure without changing tox.ini.

Per-Environment Overrides

Sometimes a specific Python version needs different dependencies or commands. Define a named environment section to override just those values:

# tox.ini — per-environment overrides
[tox]
envlist = py39, py311, py312, lint, typecheck

[testenv]
deps =
    pytest
    pytest-cov
commands = pytest tests/ --cov=mylib

# py39 needs a backport not required on 3.11+
[testenv:py39]
deps =
    pytest
    pytest-cov
    importlib-metadata>=4.0

# Linting environment — no package install needed
[testenv:lint]
package = skip
deps =
    ruff
    black
commands =
    ruff check mylib/ tests/
    black --check mylib/ tests/

# Type checking
[testenv:typecheck]
package = skip
deps = mypy
commands = mypy mylib/ --strict
# Run only the lint environment
$ tox -e lint

# Run multiple specific environments
$ tox -e py311,typecheck

# List all configured environments
$ tox list
default environments:
py39        -> [no description]
py311       -> [no description]
py312       -> [no description]
lint        -> [no description]
typecheck   -> [no description]

The package = skip setting tells tox not to build and install your package for that environment. This speeds up linting and type checking runs significantly since they only need the source files, not a full package installation.

API Alice with clipboard celebrating all tests passing
tox.ini: four lines to test against Python 3.9, 3.10, 3.11, and 3.12 simultaneously.

Integrating pytest and Coverage

Tox and pytest work seamlessly together. The most useful addition is coverage reporting — knowing not just that your tests pass, but that they actually exercise your code.

# tox.ini — pytest + coverage setup
[tox]
envlist = py311, py312

[testenv]
deps =
    pytest>=8.0
    pytest-cov

commands =
    pytest tests/ \
        --cov=mylib \
        --cov-report=term-missing \
        --cov-report=html:htmlcov \
        --cov-fail-under=80

# Separate environment to combine coverage from all Python versions
[testenv:coverage-report]
package = skip
deps = coverage[toml]
commands =
    coverage combine
    coverage report --fail-under=80
    coverage html

To combine coverage data across all Python version environments, add --cov-append to your pytest command and ensure all environments write to the same .coverage file:

# tox.ini — combined coverage across Python versions
[testenv]
deps =
    pytest
    pytest-cov

setenv =
    COVERAGE_FILE = {toxinidir}/.coverage.{envname}

commands =
    pytest tests/ --cov=mylib --cov-report= --cov-append

[testenv:coverage-report]
package = skip
deps = coverage
setenv =
    COVERAGE_FILE = {toxinidir}/.coverage

commands =
    coverage combine .coverage.py311 .coverage.py312
    coverage report --show-missing
$ tox -e py311,py312,coverage-report
Name                 Stmts   Miss  Cover   Missing
--------------------------------------------------
mylib/utils.py          12      1    92%   45
mylib/parser.py         30      4    87%   22-25
--------------------------------------------------
TOTAL                   42      5    88%

The combined coverage report aggregates line hit data from every Python version. A line that only executes under Python 3.9’s sys.version_info branch will now be properly credited, giving you a truer picture of what the test suite actually exercises.

Using pyproject.toml Instead of tox.ini

Modern Python projects often consolidate all tool configuration into pyproject.toml. Tox 4 supports this natively — put your tox configuration in the [tool.tox] table and delete tox.ini:

# pyproject.toml — tox config inside the project file
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.backends.legacy:build"

[project]
name = "mylib"
version = "0.1.0"
requires-python = ">=3.9"
dependencies = ["requests>=2.28"]

[tool.tox]
legacy_tox_ini = """
[tox]
envlist = py39, py311, py312
skip_missing_interpreters = true

[testenv]
deps =
    pytest
    pytest-cov
commands = pytest tests/ --cov=mylib --cov-report=term-missing

[testenv:lint]
package = skip
deps = ruff
commands = ruff check mylib/ tests/
"""

The legacy_tox_ini key holds an INI string — the same syntax as a standalone tox.ini — inside the TOML file. Tox reads it transparently. There is also a native TOML-based configuration format (available in tox 4.2+) that avoids the embedded string, but legacy_tox_ini is the most compatible approach for projects that also need to support tox 3 users.

Environment Variables and Secrets

Test environments often need credentials or configuration that should not be committed to source control. Tox provides setenv for constants and passenv for forwarding values from your shell:

# tox.ini — handling secrets and config
[testenv]
deps = pytest

# Set constants needed by tests
setenv =
    APP_ENV = testing
    DATABASE_URL = sqlite:///test.db
    LOG_LEVEL = WARNING

# Pass secrets from the shell environment
passenv =
    AWS_ACCESS_KEY_ID
    AWS_SECRET_ACCESS_KEY
    GITHUB_TOKEN
    CI
    CODECOV_TOKEN

commands = pytest tests/
# In your shell before running tox:
export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxx
tox -e py311
# tests/test_api.py — accessing the passed variable
import os

def test_token_available():
    token = os.environ.get("GITHUB_TOKEN")
    # In CI this will be the real token; locally it must be set
    assert token is not None, "GITHUB_TOKEN not set in environment"

Tox deliberately strips most environment variables from the test environment by default. This prevents hidden dependencies on your local shell configuration — the same isolation that makes tox results trustworthy also means you must explicitly declare every environment variable your tests need. Use passenv = * only as a last resort during debugging; it defeats tox’s isolation guarantee.

Running tox in GitHub Actions CI

The real payoff of a tox configuration is running it automatically on every push. GitHub Actions has first-class support for matrix builds across Python versions:

# .github/workflows/tests.yml
name: Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.9", "3.10", "3.11", "3.12"]

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install tox
        run: pip install tox tox-gh-actions

      - name: Run tox
        run: tox
# tox.ini — gh-actions section maps matrix Python to tox env
[tox]
envlist = py39, py310, py311, py312, lint

[gh-actions]
python =
    3.9: py39
    3.10: py310
    3.11: py311
    3.12: py312

[testenv]
deps = pytest
commands = pytest tests/

[testenv:lint]
package = skip
deps = ruff
commands = ruff check mylib/

The tox-gh-actions plugin reads the GITHUB_ACTIONS and PYTHON_VERSION environment variables set by GitHub’s matrix runner and automatically selects the matching tox environment. When the matrix job runs Python 3.11, tox automatically runs only the py311 environment rather than all of them. This is more efficient than running the full envlist on every matrix node.

Real-Life Example: Testing a String Utilities Library

# string_utils_project/
# ├── strutils/
# │   ├── __init__.py
# │   ├── transform.py
# │   └── validate.py
# ├── tests/
# │   ├── test_transform.py
# │   └── test_validate.py
# ├── pyproject.toml
# └── tox.ini
# strutils/transform.py
def slugify(text: str) -> str:
    """Convert a string to a URL-friendly slug."""
    import re
    text = text.lower().strip()
    text = re.sub(r'[^\w\s-]', '', text)
    text = re.sub(r'[\s_-]+', '-', text)
    text = re.sub(r'^-+|-+$', '', text)
    return text

def truncate(text: str, max_length: int, suffix: str = "...") -> str:
    """Truncate text to max_length characters."""
    if len(text) <= max_length:
        return text
    return text[:max_length - len(suffix)] + suffix
# strutils/validate.py
import re

def is_valid_email(email: str) -> bool:
    """Basic email format validation."""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def is_strong_password(password: str) -> bool:
    """Check password has 8+ chars, upper, lower, digit, special."""
    if len(password) < 8:
        return False
    has_upper = any(c.isupper() for c in password)
    has_lower = any(c.islower() for c in password)
    has_digit = any(c.isdigit() for c in password)
    has_special = any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in password)
    return all([has_upper, has_lower, has_digit, has_special])
# tests/test_transform.py
import pytest
from strutils.transform import slugify, truncate

@pytest.mark.parametrize("text,expected", [
    ("Hello World", "hello-world"),
    ("  Python 3.11  ", "python-311"),
    ("café & bistro!", "caf-bistro"),
])
def test_slugify(text, expected):
    assert slugify(text) == expected

def test_truncate_short():
    assert truncate("hello", 10) == "hello"

def test_truncate_long():
    assert truncate("hello world", 8) == "hello..."

def test_truncate_custom_suffix():
    assert truncate("hello world", 8, suffix="…") == "hello w…"
# tests/test_validate.py
import pytest
from strutils.validate import is_valid_email, is_strong_password

@pytest.mark.parametrize("email,valid", [
    ("user@example.com", True),
    ("bad-email", False),
    ("missing@tld.", False),
    ("ok+tag@sub.domain.org", True),
])
def test_is_valid_email(email, valid):
    assert is_valid_email(email) == valid

def test_strong_password():
    assert is_strong_password("Secure@123") is True
    assert is_strong_password("weakpass") is False
    assert is_strong_password("NoSpecial1") is False
# tox.ini — full production config for strutils
[tox]
envlist = py39, py310, py311, py312, lint, typecheck, coverage-report
skip_missing_interpreters = true

[gh-actions]
python =
    3.9: py39
    3.10: py310
    3.11: py311
    3.12: py312

[testenv]
deps =
    pytest>=8.0
    pytest-cov
setenv =
    COVERAGE_FILE = {toxinidir}/.coverage.{envname}
commands =
    pytest tests/ --cov=strutils --cov-report=

[testenv:lint]
package = skip
deps = ruff
commands = ruff check strutils/ tests/

[testenv:typecheck]
package = skip
deps = mypy
commands = mypy strutils/ --strict

[testenv:coverage-report]
package = skip
deps = coverage
setenv =
    COVERAGE_FILE = {toxinidir}/.coverage
commands =
    coverage combine
    coverage report --show-missing --fail-under=90
$ tox -e py311,lint,coverage-report
py311 create: .tox/py311
py311 run-test: pytest tests/ --cov=strutils --cov-report=
============================= test session starts ==============================
collected 11 items
tests/test_transform.py ....                                             [100%]
tests/test_validate.py .......                                           [100%]
============================== 11 passed in 0.18s ==============================

lint run-test: ruff check strutils/ tests/
All checks passed!

coverage-report run-test: coverage combine && coverage report --show-missing --fail-under=90
Name                      Stmts   Miss  Cover   Missing
-------------------------------------------------------
strutils/transform.py        12      0   100%
strutils/validate.py         10      0   100%
-------------------------------------------------------
TOTAL                        22      0   100%
___________________________________ summary ___________________________________
  py311: commands succeeded
  lint: commands succeeded
  coverage-report: commands succeeded
  congratulations :)

This configuration gives you a complete quality gate: unit tests across Python versions, linting with ruff, strict type checking with mypy, and a combined coverage report with a minimum threshold. Add this to GitHub Actions with the matrix config shown earlier and every pull request will automatically validate against all supported Python versions before merging.

Frequently Asked Questions

What happens if a Python version isn't installed?

With skip_missing_interpreters = true, tox prints a warning and marks that environment as skipped rather than failing. The final summary shows SKIPPED for missing interpreters and only fails if an installed environment's tests actually fail. Without that setting, tox exits with an error if any interpreter in envlist cannot be found. On developer machines, use skip_missing_interpreters = true; in CI, use false to enforce that all required versions are present.

When does tox recreate environments?

Tox caches virtual environments in the .tox/ directory and reuses them across runs for speed. It recreates an environment only when deps change, the Python interpreter changes, or you pass --recreate (or -r). If your tests behave strangely after a dependency upgrade, run tox -r to force a clean rebuild. You can also delete the entire .tox/ directory — tox will rebuild everything from scratch on the next run.

How do I run a single test with tox?

Use the {posargs} placeholder in your commands and pass arguments after -- on the command line. For example, tox -e py311 -- tests/test_transform.py::test_slugify -v runs only that one test with verbose output. The -- separator tells tox everything after it should be forwarded as {posargs} rather than interpreted as tox options. This is the cleanest way to do rapid test-driven development while keeping tox's isolation.

Should I pin dependency versions in tox.ini?

For library projects, leave deps unpinned (e.g., pytest>=8.0) so tox installs the latest compatible versions — this surfaces breakage from upstream changes early. For application projects where you want reproducible builds, pin exact versions (e.g., pytest==8.2.1) or use a requirements-test.txt file referenced with deps = -r requirements-test.txt. The -r syntax in tox deps works the same way as pip install -r.

What changed between tox 3 and tox 4?

Tox 4 dropped Python 2 support and changed several defaults: isolated_build = true is now the default, meaning tox uses PEP 517/518 build systems instead of python setup.py install. The [gh-actions] section syntax changed slightly. The package option replaces skip_install. Most importantly, tox 4 requires a valid pyproject.toml or setup.py — if your project has neither, add a minimal pyproject.toml with [build-system]. Check the official upgrade guide when migrating.

Conclusion

Tox turns multi-environment testing from a manual, error-prone process into a single tox command. You learned how to write a tox.ini with envlist, deps, and commands; create per-environment overrides for linting, type checking, and version-specific dependencies; integrate pytest coverage across Python versions; use setenv and passenv for environment variables; configure tox-gh-actions for CI matrix builds; and build a complete test pipeline for a real utility library.

Extend the string utilities example by adding a docs environment that builds Sphinx documentation, a security environment that runs bandit, or a benchmark environment that runs pytest-benchmark. Each new environment is just a few lines in tox.ini. Official documentation: tox.wiki.

How To Use Python aiosmtplib for Async Email Sending

How To Use Python aiosmtplib for Async Email Sending

Intermediate

Sending emails from Python is straightforward with smtplib — until your application needs to handle hundreds of notifications at once without blocking. A web server that pauses every request while waiting for an SMTP handshake is a web server that frustrates users. If your application already uses asyncio for concurrent I/O — whether through FastAPI, aiohttp, or plain async/await — blocking SMTP calls break the async model entirely, forcing every email to serialize through a synchronous bottleneck.

aiosmtplib is a fully async SMTP client that integrates cleanly with Python’s asyncio event loop. It exposes the same concepts as smtplib — connections, TLS, authentication, MIME messages — but with await-based calls that never block the event loop. You’ll need Python 3.7+ and pip install aiosmtplib. No other dependencies are required for basic use, and it works with Gmail, Outlook, SendGrid, Mailgun, or any standard SMTP server.

This article walks through everything you need to send emails asynchronously in Python: connecting to an SMTP server with TLS, building plain-text and HTML messages with email.message, adding attachments, sending to multiple recipients concurrently with asyncio.gather, and building a practical async notification system. By the end you’ll have a reusable async email module ready to drop into any Python application.

Sending an Email with aiosmtplib: Quick Example

Here’s the minimal working example — connect to Gmail’s SMTP server over TLS and send a plain-text message in under 20 lines:

# quick_email.py
import asyncio
import aiosmtplib
from email.message import EmailMessage

async def send_quick_email():
    msg = EmailMessage()
    msg["From"] = "you@gmail.com"
    msg["To"] = "recipient@example.com"
    msg["Subject"] = "Hello from aiosmtplib"
    msg.set_content("This email was sent asynchronously with Python.")

    await aiosmtplib.send(
        msg,
        hostname="smtp.gmail.com",
        port=465,
        username="you@gmail.com",
        password="your-app-password",
        use_tls=True,
    )
    print("Email sent successfully.")

asyncio.run(send_quick_email())

Output:

Email sent successfully.

The aiosmtplib.send() convenience function handles connecting, authenticating, sending, and closing the connection in one call. The EmailMessage object from Python’s standard library email.message module handles message formatting. For Gmail, use an App Password (Google Account → Security → 2-Step Verification → App passwords) rather than your regular login password — Google blocks direct password auth for third-party apps.

The sections below cover TLS vs STARTTLS, HTML messages, attachments, bulk sending, and error handling in detail.

What Is aiosmtplib and When Should You Use It?

aiosmtplib is an async implementation of the SMTP protocol built on top of Python’s asyncio transport layer. It mirrors the API of the standard library’s smtplib.SMTP class but every network operation is a coroutine. This matters because SMTP involves multiple round-trips: TCP connect, EHLO handshake, STARTTLS upgrade, AUTH, DATA transfer, and QUIT — any of which can take hundreds of milliseconds on a slow network.

LibraryBlocking?Async supportBest for
smtplibYesNoScripts, CLI tools, simple apps
aiosmtplibNoNative asyncioFastAPI, aiohttp, async web apps
smtplib + thread poolOffloadedVia executorQuick workaround in async code

Use aiosmtplib when your application already uses asyncio and you need non-blocking email delivery. For simple one-off scripts, the standard smtplib is perfectly fine. Running smtplib inside loop.run_in_executor() works as a stopgap but adds threading overhead and complexity that aiosmtplib avoids entirely.

Stack Trace Steve watching slow pneumatic tube email system
asyncio.gather() sends 100 emails in the time blocking SMTP sends 1.

Installing aiosmtplib

# install_aiosmtplib.sh
pip install aiosmtplib

Output:

Successfully installed aiosmtplib-3.0.1

The package has no mandatory third-party dependencies — it uses only Python’s standard library (asyncio, ssl, email). For building rich HTML emails with inline images or attachments, you’ll use the standard library’s email.mime or email.message modules, which are already installed. Check the version with python -m pip show aiosmtplib.

TLS vs STARTTLS: Choosing the Right Connection

SMTP servers offer two encryption modes. Getting this wrong causes connection errors that look identical but have completely different fixes:

ModePortHow it worksaiosmtplib parameter
SSL/TLS (implicit)465TLS from the first byteuse_tls=True
STARTTLS (explicit)587Plaintext connect, then upgradestart_tls=True
Plain (no encryption)25Unencrypted — do not use for authNeither
# tls_connection.py
import asyncio
import aiosmtplib
from email.message import EmailMessage

async def send_via_starttls():
    msg = EmailMessage()
    msg["From"] = "you@yourdomain.com"
    msg["To"] = "recipient@example.com"
    msg["Subject"] = "STARTTLS connection test"
    msg.set_content("Sent via STARTTLS on port 587.")

    # Port 587 with STARTTLS (common for most business SMTP providers)
    await aiosmtplib.send(
        msg,
        hostname="smtp.gmail.com",
        port=587,
        username="you@yourdomain.com",
        password="your-app-password",
        start_tls=True,   # upgrade to TLS after connecting
    )
    print("Sent via STARTTLS.")

async def send_via_ssl():
    msg = EmailMessage()
    msg["From"] = "you@yourdomain.com"
    msg["To"] = "recipient@example.com"
    msg["Subject"] = "SSL/TLS connection test"
    msg.set_content("Sent via implicit TLS on port 465.")

    # Port 465 with implicit TLS
    await aiosmtplib.send(
        msg,
        hostname="smtp.gmail.com",
        port=465,
        username="you@yourdomain.com",
        password="your-app-password",
        use_tls=True,     # TLS from first byte
    )
    print("Sent via SSL/TLS.")

asyncio.run(send_via_starttls())

Output:

Sent via STARTTLS.

Most modern providers (Gmail, Outlook/Hotmail, SendGrid, Mailgun) support both ports. Gmail prefers port 465 with use_tls=True. Office 365 uses port 587 with start_tls=True. Never pass both use_tls=True and start_tls=True — that raises a ValueError. If you see SMTPConnectError, you’re almost certainly using the wrong port/mode combination.

Sending HTML Emails with Fallback Plain Text

Real notification emails need HTML — formatted text, buttons, links with proper styling. The email.mime module’s MIMEMultipart("alternative") structure lets you include both a plain-text fallback and the HTML version. Email clients choose the richest format they support:

# html_email.py
import asyncio
import aiosmtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

SMTP_HOST = "smtp.gmail.com"
SMTP_PORT = 465
USERNAME = "you@gmail.com"
PASSWORD = "your-app-password"

async def send_html_email(to: str, subject: str, plain: str, html: str) -> None:
    msg = MIMEMultipart("alternative")
    msg["From"] = USERNAME
    msg["To"] = to
    msg["Subject"] = subject

    # Plain text part first (fallback for clients that don't render HTML)
    msg.attach(MIMEText(plain, "plain", "utf-8"))
    # HTML part second (preferred by modern clients)
    msg.attach(MIMEText(html, "html", "utf-8"))

    await aiosmtplib.send(
        msg,
        hostname=SMTP_HOST,
        port=SMTP_PORT,
        username=USERNAME,
        password=PASSWORD,
        use_tls=True,
    )

plain_body = "Your order #1234 has shipped. Track it at: https://example.com/track/1234"
html_body = """

Your order has shipped!

Order #1234 is on its way.

Track Your Order

""" asyncio.run(send_html_email( to="customer@example.com", subject="Your order has shipped", plain=plain_body, html=html_body, )) print("HTML email sent.")

Output:

HTML email sent.

The order of attach() calls matters: plain text first, HTML second. MIME clients render the last attached alternative they understand, which is why HTML goes last. Always include a plain-text version — spam filters penalise HTML-only messages, and some corporate mail servers strip HTML entirely.

Adding File Attachments

To attach a file, switch to MIMEMultipart("mixed") and add a MIMEBase or MIMEApplication part. The pattern works for any file type — PDFs, images, CSV exports:

# email_with_attachment.py
import asyncio
import aiosmtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from pathlib import Path

SMTP_CONFIG = dict(
    hostname="smtp.gmail.com", port=465,
    username="you@gmail.com", password="your-app-password",
    use_tls=True
)

async def send_with_attachment(to: str, subject: str, body: str, filepath: str) -> None:
    msg = MIMEMultipart()
    msg["From"] = SMTP_CONFIG["username"]
    msg["To"] = to
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain", "utf-8"))

    # Read file and attach
    path = Path(filepath)
    with open(path, "rb") as f:
        attachment = MIMEApplication(f.read(), Name=path.name)
    attachment["Content-Disposition"] = f'attachment; filename="{path.name}"'
    msg.attach(attachment)

    await aiosmtplib.send(msg, **SMTP_CONFIG)
    print(f"Sent email with attachment: {path.name}")

# Create a sample CSV to attach
Path("report.csv").write_text("date,sales\n2026-05-01,1200\n2026-05-02,1450\n")

asyncio.run(send_with_attachment(
    to="manager@example.com",
    subject="Daily Sales Report",
    body="Please find the daily sales report attached.",
    filepath="report.csv",
))

Output:

Sent email with attachment: report.csv

The Content-Disposition header tells the email client to treat the part as a downloadable file rather than inline content. The Name parameter in MIMEApplication and the filename in Content-Disposition should match. For images you want to display inline rather than attach, use Content-Disposition: inline and a Content-ID header referenced in the HTML body with cid: URIs.

Cache Katie launching bulk emails simultaneously
smtplib blocks the event loop. aiosmtplib gives it back.

Sending Emails Concurrently with asyncio.gather

The biggest advantage of aiosmtplib over smtplib is concurrent sending. Instead of waiting for each SMTP round-trip to finish before starting the next, asyncio.gather() fires all sends simultaneously and waits for them all to complete:

# bulk_email.py
import asyncio
import aiosmtplib
from email.message import EmailMessage
from typing import NamedTuple

SMTP_CONFIG = dict(
    hostname="smtp.gmail.com", port=465,
    username="you@gmail.com", password="your-app-password",
    use_tls=True
)

class Recipient(NamedTuple):
    email: str
    name: str

async def send_one(recipient: Recipient, subject: str, body_template: str) -> tuple[str, bool]:
    """Send to a single recipient; return (email, success)."""
    msg = EmailMessage()
    msg["From"] = SMTP_CONFIG["username"]
    msg["To"] = recipient.email
    msg["Subject"] = subject
    msg.set_content(body_template.format(name=recipient.name))

    try:
        await aiosmtplib.send(msg, **SMTP_CONFIG)
        return recipient.email, True
    except aiosmtplib.SMTPException as e:
        print(f"  Failed {recipient.email}: {e}")
        return recipient.email, False

async def send_bulk(recipients: list[Recipient], subject: str, template: str) -> None:
    tasks = [send_one(r, subject, template) for r in recipients]
    results = await asyncio.gather(*tasks, return_exceptions=False)

    sent = sum(1 for _, ok in results if ok)
    print(f"Sent {sent}/{len(recipients)} emails successfully.")

RECIPIENTS = [
    Recipient("alice@example.com", "Alice"),
    Recipient("bob@example.com", "Bob"),
    Recipient("carol@example.com", "Carol"),
]
TEMPLATE = "Hi {name},\n\nYour weekly summary is ready. Log in to view your dashboard.\n\nThanks!"

asyncio.run(send_bulk(RECIPIENTS, "Your Weekly Summary", TEMPLATE))

Output:

Sent 3/3 emails successfully.

With asyncio.gather(), all three SMTP connections are opened in parallel. For a list of 50 recipients, this can reduce total send time from 50× the per-email latency to roughly 1× (plus connection setup overhead). For very large batches (1000+), add a semaphore to cap concurrent connections: sem = asyncio.Semaphore(20) and wrap each coroutine with async with sem: to avoid overwhelming the SMTP server’s connection limit.

Reusing a Connection with SMTP Context Manager

The aiosmtplib.send() convenience function opens and closes a new SMTP connection for every message. For batch sending within a single function, it’s more efficient to open one connection and send multiple messages through it:

# persistent_smtp.py
import asyncio
import aiosmtplib
from email.message import EmailMessage

async def send_batch_single_connection(messages: list[dict]) -> None:
    """Send multiple messages over a single persistent SMTP connection."""
    async with aiosmtplib.SMTP(
        hostname="smtp.gmail.com",
        port=465,
        username="you@gmail.com",
        password="your-app-password",
        use_tls=True,
    ) as smtp:
        for m in messages:
            msg = EmailMessage()
            msg["From"] = "you@gmail.com"
            msg["To"] = m["to"]
            msg["Subject"] = m["subject"]
            msg.set_content(m["body"])
            await smtp.send_message(msg)
            print(f"Sent to {m['to']}")

messages = [
    {"to": "user1@example.com", "subject": "Invoice #001", "body": "Your invoice is attached."},
    {"to": "user2@example.com", "subject": "Invoice #002", "body": "Your invoice is attached."},
    {"to": "user3@example.com", "subject": "Invoice #003", "body": "Your invoice is attached."},
]

asyncio.run(send_batch_single_connection(messages))

Output:

Sent to user1@example.com
Sent to user2@example.com
Sent to user3@example.com

The async with aiosmtplib.SMTP(...) as smtp: context manager connects, authenticates, and ensures the connection is properly closed via QUIT even if an exception occurs. smtp.send_message(msg) sends a pre-built EmailMessage object without re-connecting. This approach is ideal for sending invoices, receipts, or reports in a nightly batch job where all messages are sent in one pass.

Real-Life Example: Async Notification System

API Alice connecting async email pipeline chain
STARTTLS or SSL — either way, await smtp.connect() and you’re in.
# notification_system.py
"""
Async email notification system with retry logic.
Sends transactional emails (welcome, password reset, order confirmation)
without blocking the event loop.
"""
import asyncio
import aiosmtplib
from dataclasses import dataclass
from email.message import EmailMessage
from enum import Enum

class NotificationType(Enum):
    WELCOME = "welcome"
    PASSWORD_RESET = "password_reset"
    ORDER_CONFIRM = "order_confirm"

TEMPLATES = {
    NotificationType.WELCOME: {
        "subject": "Welcome to MyApp, {name}!",
        "body": "Hi {name},\n\nYour account is ready. Log in at https://myapp.example.com\n\nWelcome aboard!"
    },
    NotificationType.PASSWORD_RESET: {
        "subject": "Reset your MyApp password",
        "body": "Hi {name},\n\nClick here to reset your password (valid 30 min):\nhttps://myapp.example.com/reset/{token}\n\nIgnore this if you didn't request it."
    },
    NotificationType.ORDER_CONFIRM: {
        "subject": "Order #{order_id} confirmed",
        "body": "Hi {name},\n\nYour order #{order_id} for {item} has been confirmed.\nEstimated delivery: {delivery_date}\n\nThank you for your purchase!"
    },
}

@dataclass
class Notification:
    to: str
    type: NotificationType
    context: dict  # template variables

SMTP_CONFIG = dict(
    hostname="smtp.gmail.com", port=465,
    username="notifications@myapp.example.com",
    password="your-app-password", use_tls=True
)

async def send_notification(notif: Notification, retries: int = 3) -> bool:
    """Send a single notification with automatic retry on transient errors."""
    template = TEMPLATES[notif.type]
    subject = template["subject"].format(**notif.context)
    body = template["body"].format(**notif.context)

    msg = EmailMessage()
    msg["From"] = SMTP_CONFIG["username"]
    msg["To"] = notif.to
    msg["Subject"] = subject
    msg.set_content(body)

    for attempt in range(1, retries + 1):
        try:
            await aiosmtplib.send(msg, **SMTP_CONFIG)
            print(f"[OK] {notif.type.value} → {notif.to}")
            return True
        except aiosmtplib.SMTPRecipientsRefused:
            print(f"[SKIP] Invalid address: {notif.to}")
            return False  # don't retry invalid addresses
        except (aiosmtplib.SMTPConnectError, aiosmtplib.SMTPServerDisconnected) as e:
            if attempt < retries:
                await asyncio.sleep(2 ** attempt)  # exponential backoff
            else:
                print(f"[FAIL] {notif.to} after {retries} attempts: {e}")
                return False

async def process_notification_queue(queue: list[Notification]) -> None:
    """Process all notifications concurrently with a max of 10 parallel connections."""
    semaphore = asyncio.Semaphore(10)

    async def guarded_send(n: Notification) -> bool:
        async with semaphore:
            return await send_notification(n)

    results = await asyncio.gather(*[guarded_send(n) for n in queue])
    sent = sum(results)
    print(f"\nQueue complete: {sent}/{len(queue)} notifications delivered.")

# Example notification queue
notifications = [
    Notification("alice@example.com", NotificationType.WELCOME, {"name": "Alice"}),
    Notification("bob@example.com", NotificationType.ORDER_CONFIRM,
                 {"name": "Bob", "order_id": "5501", "item": "Python Handbook",
                  "delivery_date": "May 22, 2026"}),
    Notification("carol@example.com", NotificationType.PASSWORD_RESET,
                 {"name": "Carol", "token": "abc123xyz"}),
]

asyncio.run(process_notification_queue(notifications))

Output:

[OK] welcome → alice@example.com
[OK] order_confirm → bob@example.com
[OK] password_reset → carol@example.com

Queue complete: 3/3 notifications delivered.

This system handles three real-world concerns: template-based messages that keep HTML/text out of application logic, a semaphore that caps concurrent SMTP connections at 10 (preventing server-side rate limiting), and per-notification retry with exponential backoff for transient network errors. Extend it by reading the queue from a database, persisting failed notifications for a dead-letter queue, or integrating with a task queue like Celery or asyncio.Queue for continuous processing.

Frequently Asked Questions

Why does Gmail reject my login even with the correct password?

Gmail requires App Passwords for third-party SMTP access when 2-Step Verification is enabled, which it should always be. Go to Google Account → Security → 2-Step Verification → App passwords, generate a 16-character password, and use that in your code. Never use your main Gmail password — Google blocks it by default through a security setting called “Less secure app access,” which is being phased out entirely.

How do I use aiosmtplib inside a FastAPI endpoint?

Call await aiosmtplib.send(...) directly inside your async def endpoint — no special setup needed since FastAPI already runs in an asyncio event loop. For fire-and-forget sends that shouldn’t delay the HTTP response, use asyncio.create_task(send_email(...)) inside the endpoint and return the response immediately. Be careful with this pattern: if the task fails after the response is sent, you’ll need to log errors explicitly since there’s no caller waiting for the result.

How do I test code that sends emails without actually sending them?

Run python -m smtpd -n -c DebuggingServer localhost:1025 to start a local debug SMTP server that prints emails to stdout instead of delivering them. Point your code at hostname="localhost", port=1025 with no authentication or TLS. For unit tests, mock aiosmtplib.send with unittest.mock.AsyncMock — this lets you assert what arguments it was called with without any network access.

How do I add CC and BCC recipients?

Set msg["Cc"] = "cc@example.com" on the EmailMessage object for CC. For BCC, do not set a Bcc header — instead, pass the BCC addresses directly in the recipients parameter of aiosmtplib.send(): await aiosmtplib.send(msg, recipients=["to@example.com", "bcc@example.com"], ...). The SMTP server delivers to all addresses in recipients but only the To and Cc headers appear in the message, keeping BCC addresses hidden.

How do I set a connection timeout?

Pass timeout=30 (seconds) to aiosmtplib.send() or the SMTP constructor. This sets both the connection timeout and the per-command timeout. If the SMTP server is unresponsive, the coroutine raises aiosmtplib.SMTPConnectError after the timeout rather than hanging indefinitely. Always set a timeout in production code — the default is no timeout, which can cause your event loop to stall silently on a network failure.

Conclusion

aiosmtplib brings proper async support to Python email sending without requiring you to change how you build messages — the standard library’s EmailMessage, MIMEMultipart, and MIMEText work exactly as before. You learned how to connect with TLS and STARTTLS, build HTML emails with plain-text fallbacks, attach files, send to multiple recipients concurrently with asyncio.gather(), reuse a single SMTP connection for batch sends, and build a production-ready notification system with retry logic and connection limiting.

The notification system is a practical starting point — extend it by storing unsent notifications in a database for durability, adding Jinja2 templating for richer HTML emails, integrating with a webhook receiver to trigger sends on application events, or wrapping it behind a simple HTTP API so other services can request notifications without knowing the SMTP details. Official documentation: aiosmtplib.readthedocs.io.

How To Automate Remote Servers with Python Fabric

How To Automate Remote Servers with Python Fabric

Advanced

Deploying a Python web application to a remote server involves a sequence of steps you run every single time: SSH into the server, navigate to the project directory, pull the latest code from git, activate the virtual environment, install new dependencies, run database migrations, restart the application server, and check that the service came back up cleanly. Doing this by hand once is fine. Doing it ten times a week across three servers is how you introduce errors, skip steps under pressure, and end up with servers in inconsistent states. Python Fabric exists to turn that sequence of steps into a single command you can run from your laptop.

Fabric is a Python library for executing SSH commands on remote servers. You define tasks as Python functions, connect to a server with a Connection object, and call methods like run(), sudo(), and put() to execute commands or transfer files. Because tasks are plain Python, you can add conditional logic, loops, error handling, and any other Python you like. Fabric uses Paramiko under the hood for SSH, so it works with password authentication, SSH keys, and jump hosts (bastion servers). It also supports running tasks on multiple servers in parallel or in sequence.

This article covers: installing Fabric, making a basic SSH connection, running commands and checking exit codes, transferring files with put() and get(), running commands with sudo, working with multiple servers, writing reusable tasks, and a complete deployment script for a Python application. You will need SSH access to at least one remote server to follow the connection examples — the code examples use a placeholder hostname that you should replace with your own.

Remote Server Automation: Quick Example

This example connects to a server via SSH, checks the current user, lists running Python processes, and checks disk space — all from a local Python script.

# quick_fabric.py
from fabric import Connection

# Replace with your server details
HOST = "yourserver.example.com"
USER = "deploy"
KEY = "~/.ssh/id_rsa"

with Connection(host=HOST, user=USER, connect_kwargs={"key_filename": KEY}) as c:
    # Run a command and print stdout
    result = c.run("whoami", hide=True)
    print(f"Connected as: {result.stdout.strip()}")

    # Check disk space on the root partition
    result = c.run("df -h /", hide=True)
    print("Disk usage:")
    print(result.stdout)

    # List Python processes
    result = c.run("pgrep -a python3", hide=True, warn=True)
    if result.ok:
        print("Python processes:")
        print(result.stdout)
    else:
        print("No Python processes running.")
Connected as: deploy
Disk usage:
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda1        40G   12G   26G  32% /

No Python processes running.

The hide=True parameter suppresses live output (result is captured into result.stdout instead). Without it, command output streams to your terminal as it runs. The warn=True parameter prevents Fabric from raising an exception when a command exits with a non-zero status — without it, the pgrep returning nothing would raise an UnexpectedExit error. These two parameters cover most real-world command execution patterns.

What is Fabric and How Does It Compare to Alternatives?

Fabric is a high-level SSH automation library for Python. It wraps the lower-level Paramiko SSH library with a task-oriented API designed for deployment and server management workflows. The current version is Fabric 3 (also called Fabric2+ depending on the package), which is a significant rewrite of the original Fabric 1.x that was popular in the early 2010s.

ToolLanguageLearning CurveBest For
FabricPythonLowPython devs, custom deploy scripts
AnsibleYAML + PythonMediumInfrastructure as code, large fleets
CapistranoRubyMediumRails/Ruby deployments
ParamikoPythonHighCustom SSH protocol work
subprocess + SSHShellLowOne-off scripts, no remote needed

Fabric sits in the sweet spot between “write a bash script” and “set up full Ansible infrastructure.” If you are a Python developer who needs to automate deployments for one to twenty servers without learning a new configuration language, Fabric is the right tool. For larger fleets or infrastructure provisioning (creating servers, managing firewall rules, setting up DNS), Ansible or Terraform are better suited.

Cache Katie running with lightning bolt deployment scripts
fabric.Connection wraps paramiko so you don’t have to.

Installing Fabric

Install Fabric using pip. The package name is fabric (not fabric2 or fabric3 — those are older forks).

# install_fabric.sh
pip install fabric
Successfully installed fabric-3.2.2 invoke-2.2.0 paramiko-3.4.0

Fabric installs Invoke (the task-running framework it builds on) and Paramiko (the SSH library) automatically. No additional dependencies are needed for basic SSH key authentication. If your servers use password authentication or GSSAPI/Kerberos, you may need to install additional Paramiko extras: pip install paramiko[invoke]. For most developer setups with SSH key authentication, the base install is sufficient.

Connecting to a Server

The Connection class is the core of Fabric. It holds the SSH connection parameters and provides the methods you use to interact with the server. Connections can be created as context managers (automatically closed when the block exits) or as persistent objects you reuse across multiple commands.

# connection_examples.py
from fabric import Connection

# --- Key-based authentication ---
c = Connection(
    host="yourserver.example.com",
    user="deploy",
    connect_kwargs={"key_filename": "~/.ssh/id_rsa"}
)

# --- Password authentication (not recommended for production) ---
c_pass = Connection(
    host="yourserver.example.com",
    user="deploy",
    connect_kwargs={"password": "yourpassword"}
)

# --- Connecting through a jump/bastion host ---
from fabric import Connection
gateway = Connection("bastion.example.com", user="jump_user")
c_jump = Connection(
    "internal-server.internal",
    user="deploy",
    gateway=gateway
)

# --- Using a different SSH port ---
c_port = Connection(
    host="yourserver.example.com",
    user="deploy",
    port=2222,
    connect_kwargs={"key_filename": "~/.ssh/deploy_key"}
)

# Test the connection
with c:
    result = c.run("echo 'Connection successful'", hide=True)
    print(result.stdout.strip())
Connection successful

The gateway parameter handles jump hosts (also called bastion hosts) — common in cloud environments where production servers are not directly accessible from the internet. Fabric handles the SSH tunneling automatically. The connect_kwargs dict passes directly to Paramiko’s SSHClient.connect(), so any authentication option Paramiko supports (GSSAPI, SSH agent forwarding, multiple key files) can be passed through here.

Running Commands

The run() method executes a command on the remote server. The return value is a Result object with stdout, stderr, return_code, and boolean ok and failed attributes.

# run_commands.py
from fabric import Connection

HOST, USER, KEY = "yourserver.example.com", "deploy", "~/.ssh/id_rsa"

with Connection(HOST, user=USER, connect_kwargs={"key_filename": KEY}) as c:
    # Stream output to terminal (default behavior)
    c.run("uptime")

    # Capture output silently
    result = c.run("free -h", hide=True)
    print("Memory info:")
    print(result.stdout)

    # Run as sudo
    sudo_result = c.sudo("systemctl status nginx", hide=True, warn=True)
    if "active (running)" in sudo_result.stdout:
        print("nginx is running")
    else:
        print("nginx is NOT running")

    # Run in a specific directory
    with c.cd("/var/www/myapp"):
        c.run("git log --oneline -5", hide=False)

    # Set environment variables for a command
    c.run("echo $APP_ENV", env={"APP_ENV": "production"}, hide=True)

    # Check exit code without raising
    result = c.run("ls /nonexistent", warn=True, hide=True)
    print(f"Exit code: {result.return_code}, OK: {result.ok}")
 14:23:01 up 42 days,  3:17,  1 user,  load average: 0.12, 0.08, 0.05
Memory info:
               total        used        free      shared  buff/cache   available
Mem:            3.8G        1.2G        512M         84M        2.1G        2.3G

nginx is running
a3f91bc (HEAD -> main) Add rate limiting to API endpoints
8d7c2e4 Fix password reset email template
...

Exit code: 2, OK: False

The c.cd() context manager is equivalent to cd /path && your_command — it prepends the directory change to the command string. Unlike a shell session, each run() call starts a fresh shell, so directory changes made in one call do not persist to the next. Always use c.cd() when commands need to run in a specific directory.

Loop Larry tangled in SSH network cables
One @task decorator, infinite remote commands.

Transferring Files

Fabric’s put() method uploads files from local to remote, and get() downloads files from remote to local. Both use SFTP under the same SSH connection.

# file_transfer.py
from fabric import Connection
from pathlib import Path

HOST, USER, KEY = "yourserver.example.com", "deploy", "~/.ssh/id_rsa"

with Connection(HOST, user=USER, connect_kwargs={"key_filename": KEY}) as c:
    # Upload a single file
    c.put("config/production.env", remote="/var/www/myapp/.env")
    print("Uploaded .env file")

    # Upload a file and make it executable
    c.put("scripts/start.sh", remote="/home/deploy/start.sh")
    c.run("chmod +x /home/deploy/start.sh")

    # Download a log file
    c.get("/var/log/myapp/error.log", local="logs/error.log")
    print("Downloaded error log")

    # Upload using Path objects
    local_path = Path("dist") / "myapp-1.2.0.tar.gz"
    c.put(str(local_path), remote="/tmp/myapp-1.2.0.tar.gz")

    # Create a directory if it doesn't exist before uploading
    c.run("mkdir -p /var/www/myapp/uploads")
    for local_file in Path("uploads").glob("*.csv"):
        c.put(str(local_file), remote=f"/var/www/myapp/uploads/{local_file.name}")
        print(f"Uploaded: {local_file.name}")
Uploaded .env file
Downloaded error log
Uploaded: data_2026_01.csv
Uploaded: data_2026_02.csv

The remote parameter accepts either a full path (including filename) or just a directory path. If you pass a directory, the local filename is preserved. SFTP does not create parent directories automatically — always ensure the remote directory exists with c.run("mkdir -p /path/to/dir") before uploading into it. For large files, Fabric shows a progress bar automatically when output is not hidden.

Working with Multiple Servers

Fabric’s Group class runs a command on multiple servers simultaneously. This is useful for deploying to a fleet of application servers behind a load balancer or running health checks across all machines in a tier.

# multi_server.py
from fabric import SerialGroup, ThreadingGroup

APP_SERVERS = ["app1.example.com", "app2.example.com", "app3.example.com"]
SSH_CONFIG = {"key_filename": "~/.ssh/deploy_key"}

# SerialGroup: runs on servers one at a time (safer for deployments)
group = SerialGroup(
    *APP_SERVERS,
    user="deploy",
    connect_kwargs=SSH_CONFIG
)

print("Checking disk space across all app servers:")
results = group.run("df -h / | tail -1", hide=True)
for conn, result in results.items():
    host = conn.host
    usage = result.stdout.strip().split()[4]  # percentage used
    print(f"  {host}: {usage} disk used")

# ThreadingGroup: runs on all servers simultaneously (faster for reads)
from fabric import ThreadingGroup
fast_group = ThreadingGroup(
    *APP_SERVERS,
    user="deploy",
    connect_kwargs=SSH_CONFIG
)

print("\nRestarting nginx on all servers:")
fast_group.sudo("systemctl restart nginx", hide=True)
print("Done.")
Checking disk space across all app servers:
  app1.example.com: 45% disk used
  app2.example.com: 47% disk used
  app3.example.com: 43% disk used

Restarting nginx on all servers:
Done.

Use SerialGroup for deployments where you want to verify each server succeeded before moving to the next — a failed deployment on app1 stops before touching app2 and app3, preventing a situation where half your fleet is on the new code and half on the old. Use ThreadingGroup for read-only operations like health checks or log collection where speed matters and partial failure is acceptable.

Real-Life Example: Automated Deployment Script

Sudo Sam precisely connecting server node cables
fabric.Group lets you run the same command on multiple hosts simultaneously.
# deploy.py
"""
Deployment script for a Python web application.
Usage: python deploy.py [--host HOST] [--branch BRANCH]
"""
import argparse
from fabric import Connection

DEFAULT_HOST = "yourserver.example.com"
APP_DIR = "/var/www/myapp"
VENV_DIR = f"{APP_DIR}/venv"
SERVICE_NAME = "myapp"

def deploy(c: Connection, branch: str = "main") -> None:
    print(f"Deploying branch '{branch}' to {c.host}...")

    # 1. Pull latest code
    with c.cd(APP_DIR):
        c.run(f"git fetch origin", hide=True)
        c.run(f"git checkout {branch}", hide=True)
        c.run(f"git pull origin {branch}", hide=False)

    # 2. Install/update dependencies
    print("Installing dependencies...")
    c.run(
        f"{VENV_DIR}/bin/pip install -r {APP_DIR}/requirements.txt -q",
        hide=True
    )

    # 3. Run database migrations (if applicable)
    print("Running migrations...")
    with c.cd(APP_DIR):
        result = c.run(
            f"{VENV_DIR}/bin/python manage.py migrate --no-input",
            hide=True, warn=True
        )
        if result.failed:
            raise SystemExit(f"Migrations failed: {result.stderr}")

    # 4. Collect static files (Django example)
    with c.cd(APP_DIR):
        c.run(
            f"{VENV_DIR}/bin/python manage.py collectstatic --no-input -q",
            hide=True
        )

    # 5. Restart the service
    print(f"Restarting {SERVICE_NAME}...")
    c.sudo(f"systemctl restart {SERVICE_NAME}", hide=True)

    # 6. Verify service is running
    result = c.sudo(f"systemctl is-active {SERVICE_NAME}", hide=True, warn=True)
    status = result.stdout.strip()
    if status == "active":
        print(f"Deployment complete. {SERVICE_NAME} is running.")
    else:
        raise SystemExit(f"Service failed to start! Status: {status}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Deploy myapp to a server")
    parser.add_argument("--host", default=DEFAULT_HOST)
    parser.add_argument("--branch", default="main")
    args = parser.parse_args()

    with Connection(
        host=args.host,
        user="deploy",
        connect_kwargs={"key_filename": "~/.ssh/deploy_key"}
    ) as conn:
        deploy(conn, args.branch)
Deploying branch 'main' to yourserver.example.com...
From github.com:yourorg/myapp
   a3f91bc..8d7c2e4  main -> origin/main
Updating a3f91bc..8d7c2e4
Fast-forward
 requirements.txt | 2 +-
 myapp/views.py   | 15 ++++++++++-----
Installing dependencies...
Running migrations...
Restarting myapp...
Deployment complete. myapp is running.

Run with python deploy.py --host app1.example.com --branch release/1.2.0. The script stops at the first failure and raises SystemExit with the error message, so a failed migration does not proceed to restarting the service with broken code. To deploy to multiple servers, call this inside a for loop or convert it to use SerialGroup. Adding a --rollback flag that runs git checkout HEAD~1 and restarts the service is a useful extension.

Frequently Asked Questions

How do I use Fabric with SSH keys that require a passphrase?

Pass the passphrase via connect_kwargs={"key_filename": "~/.ssh/id_rsa", "passphrase": "your-passphrase"}. For automation, it is better to use a passphrase-free deploy key rather than your personal SSH key, or use ssh-agent to cache the passphrase and let Fabric pick it up via the SSH agent socket (which Paramiko supports automatically when no explicit key is specified in connect_kwargs).

How does Fabric handle sudo password prompts?

Fabric’s c.sudo() method handles the sudo password prompt by passing the password to the remote shell’s stdin. Set the password via the config parameter: Connection(host, config=Config(overrides={"sudo": {"password": "yourpassword"}})). In practice, configure passwordless sudo for the deploy user on your servers for automation tasks: add deploy ALL=(ALL) NOPASSWD: /bin/systemctl restart myapp to /etc/sudoers. This restricts passwordless sudo to specific commands, which is more secure than full NOPASSWD access.

How do I handle timeouts and connection failures?

Set connect_timeout in connect_kwargs: connect_kwargs={"timeout": 10}. For command timeouts, there is no direct Fabric timeout parameter — use the shell’s timeout command: c.run("timeout 30 my_long_command", warn=True). For Robust scripts that handle network failures, wrap connections in try/except blocks catching paramiko.SSHException, socket.timeout, and fabric.exceptions.GroupException (for Group operations).

Can Fabric also run local commands?

Yes. Connection.local() runs a command on your local machine using the same API as run(). This is useful for build steps that should run locally before deployment — building a Docker image, running local tests, or compiling static assets. Invoke (the library Fabric builds on) also provides a @task decorator for organizing deployment workflows into named tasks that can be run from the command line with fab task-name.

I’ve seen old Fabric code that looks different. What changed?

Fabric 1.x (2009-2017) had a completely different API: global env.hosts, env.user, and functions like run() and sudo() imported directly from fabric.api. Fabric 2+ (2018+) rewrote the library around explicit Connection objects, which eliminated global state and made it easier to work programmatically with multiple connections. Old Fabric 1 tutorials are incompatible with the current version. The package you should install is always just fabric — the fabric2 package on PyPI is an unmaintained fork.

Conclusion

Python Fabric turns repetitive SSH-based server management into reproducible, version-controlled Python scripts. You learned how to create Connection objects with key-based and password authentication including jump host support, run commands with run() and sudo() while handling exit codes and output capture, transfer files with put() and get(), work with multiple servers using SerialGroup and ThreadingGroup, and build a complete automated deployment script with error handling and service verification.

The deployment script in the real-life example is a starting point — extend it by adding health check HTTP requests after restart, Slack notifications on success or failure, automated rollback on failed health checks, and a blue-green deployment pattern that switches traffic only after the new version passes verification. Official documentation: docs.fabfile.org.