How To Use Python xlrd for Reading Legacy Excel Files

How To Use Python xlrd for Reading Legacy Excel Files

Intermediate

You have inherited a folder of Excel files from an accounting department. Half of them have the .xlsx extension from modern Excel, and the other half are old .xls files from the early 2000s. If you try to open a .xls file with openpyxl, you get an error. If you use xlrd 2.x on an .xlsx file, you get a different error. Knowing which library to use for which file format is the first step — everything else follows.

The two key libraries are: xlrd for reading legacy .xls (Excel 97-2003) format, and openpyxl for reading and writing modern .xlsx format. Install both with pip install xlrd openpyxl. For most practical data extraction tasks you will also want to know about pandas, which uses both libraries internally and provides a higher-level API.

In this article we will cover reading .xls files with xlrd, reading .xlsx files with openpyxl, detecting file format and routing automatically, extracting headers and data rows, handling different cell types, and building a unified Excel reader that handles both formats with the same output interface.

Reading Excel Files in Python: Quick Example

Here is the essential pattern for reading each format. Note the different library imports and APIs:

# quick_excel.py
import xlrd      # For .xls files (pip install xlrd)
import openpyxl  # For .xlsx files (pip install openpyxl)

# --- Reading a .xls file with xlrd ---
# xlrd 2.x only supports .xls format (NOT .xlsx)
# workbook = xlrd.open_workbook('legacy_data.xls')
# sheet = workbook.sheet_by_index(0)
# for row_idx in range(sheet.nrows):
#     row = sheet.row_values(row_idx)
#     print(row)

# --- Reading a .xlsx file with openpyxl ---
# Create a sample workbook to demonstrate
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Sales"
ws.append(["Product", "Qty", "Price"])
ws.append(["Widget A", 100, 9.99])
ws.append(["Widget B", 50,  14.99])
wb.save("sample.xlsx")

# Now read it back
wb2 = openpyxl.load_workbook("sample.xlsx")
ws2 = wb2.active
for row in ws2.iter_rows(values_only=True):
    print(row)

Output:

('Product', 'Qty', 'Price')
('Widget A', 100, 9.99)
('Widget B', 50, 14.99)

The critical difference is format support: xlrd 2.0 (released 2020) intentionally dropped .xlsx support due to security concerns with the XML-based format. Always use openpyxl for .xlsx files. If you need to process both formats in one script, you will need both libraries. Want to go deeper? Below we cover each library in detail with cell type handling and a unified reader.

xlrd vs openpyxl: Which Library for Which Format?

The most important thing to understand about Excel reading in Python is the format split. The confusion often arises because older xlrd versions (before 2.0) supported both formats, so many tutorials and Stack Overflow answers use xlrd for .xlsx files. Those answers are outdated — xlrd 2.x will raise a XLRDError if you try to open a .xlsx file.

Library.xls (Excel 97-2003).xlsx (Excel 2007+)Write Support
xlrd 2.xYes (read only)No — raises errorNo
openpyxlNoYes (read + write)Yes
pandasYes (via xlrd)Yes (via openpyxl)Yes (via openpyxl)

For most new projects, openpyxl is the right choice since virtually all modern Excel files are .xlsx. You need xlrd only when processing legacy files from systems that still produce the old binary format — which is more common than you might expect in finance, manufacturing, and government data pipelines.

Legacy .xls files. The format that refuses to die.
Legacy .xls files. The format that refuses to die.

Reading Legacy .xls Files with xlrd

xlrd provides a workbook/sheet/cell hierarchy for accessing data. Row and column indices are zero-based, and cell values come with type information that you need to handle for dates:

# read_xls.py
import xlrd
from datetime import datetime

# First, create a sample .xls for demonstration
# (normally you would just open an existing file)
import xlwt  # pip install xlwt -- only for creating .xls test files
wb_write = xlwt.Workbook()
ws_write = wb_write.add_sheet('Employees')
headers = ['Name', 'Department', 'Salary', 'Start Date']
data = [
    ['Alice Chen',    'Engineering', 95000, '2021-03-15'],
    ['Bob Martinez',  'Marketing',   72000, '2020-07-01'],
    ['Carol Johnson', 'Finance',     88000, '2019-11-20'],
]
for col, h in enumerate(headers):
    ws_write.write(0, col, h)
for row_idx, row in enumerate(data, 1):
    for col_idx, val in enumerate(row):
        ws_write.write(row_idx, col_idx, val)
wb_write.save('employees.xls')

# Now read the .xls file with xlrd
workbook = xlrd.open_workbook('employees.xls')
print(f'Sheets: {workbook.sheet_names()}')

sheet = workbook.sheet_by_name('Employees')
print(f'Rows: {sheet.nrows}, Cols: {sheet.ncols}')
print()

# Read header row
headers = sheet.row_values(0)
print('Headers:', headers)
print()

# Read data rows
for row_idx in range(1, sheet.nrows):
    row = {}
    for col_idx, header in enumerate(headers):
        cell = sheet.cell(row_idx, col_idx)
        # Check cell type -- xlrd.XL_CELL_DATE = 3
        if cell.ctype == xlrd.XL_CELL_DATE:
            dt = xlrd.xldate_as_datetime(cell.value, workbook.datemode)
            row[header] = dt.strftime('%Y-%m-%d')
        else:
            row[header] = cell.value
    print(row)

Output:

Sheets: ['Employees']
Rows: 4, Cols: 4

Headers: ['Name', 'Department', 'Salary', 'Start Date']

{'Name': 'Alice Chen', 'Department': 'Engineering', 'Salary': 95000.0, 'Start Date': '2021-03-15'}
{'Name': 'Bob Martinez', 'Department': 'Marketing', 'Salary': 72000.0, 'Start Date': '2020-07-01'}
{'Name': 'Carol Johnson', 'Department': 'Finance', 'Salary': 88000.0, 'Start Date': '2019-11-20'}

Date handling is the trickiest part of reading .xls files with xlrd. Excel stores dates as floating-point numbers (days since 1900-01-01), not as Python datetime objects. The xlrd.xldate_as_datetime() function converts this float to a real datetime using the workbook’s date mode (which differs between Windows and Mac Excel). Always check cell.ctype == xlrd.XL_CELL_DATE before attempting date conversion.

Reading Modern .xlsx Files with openpyxl

openpyxl uses a Workbook/Worksheet/Cell model similar to xlrd but with a more Pythonic API. The values_only=True flag is the fastest way to extract data when you do not need cell formatting metadata:

# read_xlsx.py
import openpyxl
from datetime import datetime

# Create a sample .xlsx file with mixed cell types
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Inventory"
ws.append(["SKU", "Product", "Stock", "Price", "Last Updated"])
from datetime import date
ws.append(["A001", "Widget Pro",   500, 29.99, date(2026, 4, 15)])
ws.append(["A002", "Gadget Basic",  75, 9.99,  date(2026, 4, 20)])
ws.append(["A003", "Super Deluxe",  12, 149.99,date(2026, 5, 1)])
wb.save("inventory.xlsx")

# Read it back
wb2 = openpyxl.load_workbook("inventory.xlsx", read_only=True)
ws2 = wb2.active

print(f'Dimensions: {ws2.dimensions}')
print()

# Method 1: iter_rows with values_only
print('--- Using iter_rows(values_only=True) ---')
headers = None
for i, row in enumerate(ws2.iter_rows(values_only=True)):
    if i == 0:
        headers = row
        print('Headers:', headers)
    else:
        record = dict(zip(headers, row))
        print(record)

wb2.close()

# Method 2: read_only=False for cell object access
wb3 = openpyxl.load_workbook("inventory.xlsx")
ws3 = wb3["Inventory"]

print()
print('--- Cell object access (non-readonly) ---')
# Access individual cells by coordinate
print('A1:', ws3['A1'].value)
print('B2:', ws3['B2'].value)
print('E2 (date):', ws3['E2'].value, type(ws3['E2'].value).__name__)

Output:

Dimensions: A1:E4

--- Using iter_rows(values_only=True) ---
Headers: ('SKU', 'Product', 'Stock', 'Price', 'Last Updated')
{'SKU': 'A001', 'Product': 'Widget Pro', 'Stock': 500, 'Price': 29.99, 'Last Updated': datetime.datetime(2026, 4, 15, 0, 0)}
{'SKU': 'A002', 'Product': 'Gadget Basic', 'Stock': 75, 'Price': 9.99, 'Last Updated': datetime.datetime(2026, 4, 20, 0, 0)}
{'SKU': 'A003', 'Product': 'Super Deluxe', 'Stock': 12, 'Price': 149.99, 'Last Updated': datetime.datetime(2026, 5, 1, 0, 0)}

--- Cell object access (non-readonly) ---
A1: SKU
B2: Widget Pro
E2 (date): 2026-04-15 00:00:00 datetime

Unlike xlrd, openpyxl automatically converts date cells to Python datetime objects — no manual conversion needed. The read_only=True mode is faster for large files because it uses an event-based parser that does not load the entire file into memory. Use it when you only need to read data, and disable it (default mode) when you need to access cell formatting or ose the ws[coordinate] syntax.

Old Excel binary format. Reverse-engineered, brittle, unforgiving.
Old Excel binary format. Reverse-engineered, brittle, unforgiving.

Real-Life Example: Unified Excel File Reader

Here is a unified reader that auto-detects the file format and uses the right library, returning a consistent list-of-dicts structure regardless of which format the input uses:

# unified_excel_reader.py
import xlrd
import openpyxl
from pathlib import Path
from datetime import datetime

def read_excel_file(filepath, sheet_index=0, header_row=0):
    """
    Read an Excel file (.xls or .xlsx) into a list of dicts.
    Auto-detects format from file extension.
    
    Returns:
        list[dict]: One dict per data row, keyed by header names
    """
    path = Path(filepath)
    ext = path.suffix.lower()
    
    if ext == '.xls':
        return _read_xls(filepath, sheet_index, header_row)
    elif ext in ('.xlsx', '.xlsm', '.xltx', '.xltm'):
        return _read_xlsx(filepath, sheet_index, header_row)
    else:
        raise ValueError(f"Unsupported format: {ext}. Use .xls or .xlsx")

def _read_xls(filepath, sheet_index, header_row):
    """Read a .xls file using xlrd."""
    wb = xlrd.open_workbook(filepath)
    ws = wb.sheet_by_index(sheet_index)
    
    headers = ws.row_values(header_row)
    records = []
    
    for row_idx in range(header_row + 1, ws.nrows):
        record = {}
        for col_idx, header in enumerate(headers):
            cell = ws.cell(row_idx, col_idx)
            if cell.ctype == xlrd.XL_CELL_DATE:
                value = xlrd.xldate_as_datetime(cell.value, wb.datemode)
            elif cell.ctype == xlrd.XL_CELL_EMPTY:
                value = None
            else:
                value = cell.value
            record[header] = value
        records.append(record)
    
    return records

def _read_xlsx(filepath, sheet_index, header_row):
    """Read a .xlsx file using openpyxl."""
    wb = openpyxl.load_workbook(filepath, read_only=True, data_only=True)
    ws_name = wb.sheetnames[sheet_index]
    ws = wb[ws_name]
    
    records = []
    headers = None
    
    for row_idx, row in enumerate(ws.iter_rows(values_only=True)):
        if row_idx == header_row:
            headers = [str(h) if h is not None else f'Col_{i}' 
                      for i, h in enumerate(row)]
            continue
        if headers is None:
            continue
        record = dict(zip(headers, row))
        records.append(record)
    
    wb.close()
    return records

def print_summary(records, filename):
    """Print a summary of the extracted data."""
    print(f"\nFile: {filename}")
    print(f"Records: {len(records)}")
    if records:
        print(f"Columns: {list(records[0].keys())}")
        print("First record:", records[0])
        print("Last record: ", records[-1])

# Test with both formats
# (Using the files created in earlier examples)
xlsx_data = read_excel_file('inventory.xlsx')
print_summary(xlsx_data, 'inventory.xlsx')

# xls_data = read_excel_file('employees.xls')
# print_summary(xls_data, 'employees.xls')

Output:

File: inventory.xlsx
Records: 3
Columns: ['SKU', 'Product', 'Stock', 'Price', 'Last Updated']
First record: {'SKU': 'A001', 'Product': 'Widget Pro', 'Stock': 500, 'Price': 29.99, 'Last Updated': datetime.datetime(2026, 4, 15, 0, 0)}
Last record:  {'SKU': 'A003', 'Product': 'Super Deluxe', 'Stock': 12, 'Price': 149.99, 'Last Updated': datetime.datetime(2026, 5, 1, 0, 0)}

This unified reader pattern is the cleanest way to handle a mixed bag of Excel files without scattering format-detection logic throughout your codebase. Extend it by adding support for a max_rows parameter for large files, a dtype_map for explicit type coercion, or a pandas output mode that returns a DataFrame instead of a list of dicts.

Frequently Asked Questions

Why does xlrd raise an error when I open a .xlsx file?

xlrd version 2.0 (released January 2020) deliberately removed .xlsx support. The maintainer decided the risk of parsing the complex XML-based format was not worth maintaining. If you are getting xlrd.biffh.XLRDError: Excel xlsx file; not supported, you are using xlrd 2.x. Switch to openpyxl for .xlsx files. If you have legacy code that used xlrd for .xlsx, the fix is straightforward: replace xlrd.open_workbook() with openpyxl.load_workbook() and adjust the API calls accordingly.

How do I efficiently read large Excel files without running out of memory?

For large .xlsx files, use openpyxl.load_workbook(filepath, read_only=True). In read-only mode, openpyxl uses an event-based parser that processes rows one at a time without loading the whole file. You can also use pandas pd.read_excel() with the chunksize parameter to process data in chunks. For very large files (100,000+ rows), pandas with openpyxl is typically faster than using openpyxl directly because pandas optimizes the data extraction path.

How do I read cell values instead of formulas in .xlsx files?

Pass data_only=True to openpyxl.load_workbook(). By default, openpyxl reads the formula text (=SUM(A1:A10)) instead of the computed value. With data_only=True, openpyxl reads the cached value that Excel stored when the file was last saved. Note: if the file was never opened in Excel (e.g., programmatically generated), the cache may be empty and you will get None for formula cells.

When should I use pandas instead of xlrd/openpyxl directly?

Use pandas when you need to do any data analysis, filtering, or transformation after reading the file. pd.read_excel('file.xlsx') handles format detection, type inference, header parsing, and multi-sheet reading in one call. Use xlrd or openpyxl directly when you need precise control over cell access, formatting metadata, or when writing production code that should not depend on pandas. For simple ETL pipelines that just extract data and load it elsewhere, the direct library approach has fewer dependencies.

Can xlrd write .xls files?

No — xlrd is read-only. To create .xls files programmatically, use xlwt (pip install xlwt). However, xlwt is also unmaintained and you should avoid creating new .xls files in 2026. If a system still requires .xls output, use xlwt as a stopgap, but the right long-term solution is upgrading that system to accept .xlsx. For .xlsx writing, openpyxl is the standard choice and actively maintained.

xlrd or openpyxl? .xls or .xlsx? Choose your fighter.
xlrd or openpyxl? .xls or .xlsx? Choose your fighter.

Conclusion

Reading Excel files in Python requires knowing which library matches the file format: xlrd for legacy .xls files, openpyxl for modern .xlsx files. We covered the format difference and why xlrd 2.x dropped .xlsx support, reading .xls files with xlrd including date cell handling, reading .xlsx files with openpyxl including read-only mode for performance, and building a unified reader that auto-detects format and returns consistent output.

The unified reader is the practical tool for any data pipeline that processes Excel files from varied sources. From here, you can add pandas integration for data analysis, write Excel output with openpyxl’s workbook creation API, or build a batch processor that walks a directory and converts every Excel file to CSV. The format-handling knowledge transfers directly to any project that touches Excel files.

For writing and formatting .xlsx files, see the openpyxl documentation. For legacy .xls reading details, see the xlrd documentation.

How To Use Python Rich for Beautiful Terminal Output

Beginner

Plain print() output gets the job done for debugging, but it tells your users nothing about what is important, what is a warning, and what is an error. When you are building a CLI tool that other people will use — or even just a script you run yourself every day — the difference between monochrome output and a well-structured, colored terminal display is the difference between squinting at walls of text and instantly understanding what happened.

Python Rich is a library that makes beautiful terminal output trivially easy. Install it with pip install rich and you get syntax highlighting, colored text, tables, progress bars, live dashboards, tracebacks, and Markdown rendering — all without learning complex terminal escape code sequences. Rich handles the terminal capability detection and fallback automatically.

In this article we will cover Rich’s Console for styled output, Markup for inline color and formatting, Tables for structured data display, Progress bars for long-running tasks, Panels and layout components, syntax-highlighted code output, and a complete real-world CLI dashboard project. By the end you will have the building blocks for any CLI tool that looks like it was built by a professional.

Python Rich: Quick Example

Here is the fastest way to see what Rich can do — five lines that demonstrate color, markup, and table output:

# quick_rich.py
from rich import print
from rich.table import Table

# Inline markup with [color] tags
print("[bold green]Success![/bold green] File saved to [cyan]/tmp/output.csv[/cyan]")
print("[bold red]Error:[/bold red] Could not connect to database")
print("[yellow]Warning:[/yellow] Config file not found, using defaults")

# Quick table
table = Table("Name", "Score", "Grade")
table.add_row("Alice", "95", "[green]A[/green]")
table.add_row("Bob", "72", "[yellow]C[/yellow]")
table.add_row("Charlie", "88", "[cyan]B+[/cyan]")
print(table)

Output (with terminal colors):

Success! File saved to /tmp/output.csv
Error: Could not connect to database
Warning: Config file not found, using defaults

 Name     Score  Grade 
 Alice    95     A     
 Bob      72     C     
 Charlie  88     B+    

The from rich import print line replaces the built-in print function with Rich’s version, which understands markup tags. From that point forward, any [bold green] or [red] tags in your strings become actual terminal colors — zero configuration required. Want to go deeper? Below we cover every major Rich component.

What Is Rich and What Can It Do?

Rich is a Python library for rendering styled text in the terminal. It works on Windows, macOS, and Linux, and gracefully degrades to plain text when run in environments that do not support color (like log files or CI/CD pipelines).

ComponentWhat It DoesKey Class/Function
ConsoleStyled print with markup, loggingConsole()
TableFormatted, bordered data tablesTable()
ProgressProgress bars for loops and tasksProgress(), track()
PanelBordered boxes around contentPanel()
SyntaxSyntax-highlighted code blocksSyntax()
MarkdownRender Markdown in the terminalMarkdown()
LiveAuto-refreshing live displaysLive()
LoggingColored, structured log handlerRichHandler

The Console object is the foundation of everything. You create one with Console() and call console.print() instead of the built-in print. This gives you the full styling system, error handling separation (stdout vs stderr), and the ability to capture output for testing.

The Console Object and Markup

The Console class is the main entry point for Rich output. Unlike the import-replaced print shortcut, using a Console object gives you more control over where output goes, how errors are formatted, and whether color is enabled:

# console_markup.py
from rich.console import Console

console = Console()

# Basic markup -- [style]text[/style]
console.print("[bold]Bold text[/bold]")
console.print("[italic]Italic text[/italic]")
console.print("[underline]Underlined[/underline]")
console.print("[bold red]Bold red error[/bold red]")
console.print("[green on black]Green on black background[/green on black]")

# Named styles
console.print("Normal text")
console.print("Error text", style="bold red")
console.print("Success text", style="bold green")
console.print("Info text", style="cyan")

# Print to stderr for errors
err_console = Console(stderr=True)
err_console.print("[red]This goes to stderr[/red]")

# Disable color (for log files)
plain_console = Console(no_color=True)
plain_console.print("[bold]This prints without color codes[/bold]")

# Rule -- a horizontal line with optional title
console.rule("[bold blue]Section Title[/bold blue]")
console.print("Content after the rule")
console.rule()

Output:

Bold text
Italic text
Underlined
Bold red error
Green on black background
Normal text
Error text
Success text
Info text
[stderr] This goes to stderr
This prints without color codes
─────────────────────── Section Title ───────────────────────
Content after the rule
─────────────────────────────────────────────────────────────

The console.rule() call creates a full-width horizontal divider — perfect for separating sections in long command output. The separate err_console for stderr is important in CLI tools where stdout is often piped to another command or redirected to a file; error messages should go to stderr so they do not corrupt the stdout stream.

Tables

Rich’s Table class renders properly aligned, bordered tables in the terminal. It handles column alignment, row styles, and nested markup in cell content:

# rich_tables.py
from rich.console import Console
from rich.table import Table

console = Console()

# Build a table with column options
table = Table(title="Server Status", show_header=True, header_style="bold cyan")
table.add_column("Server", style="dim", width=20)
table.add_column("Status", justify="center")
table.add_column("CPU %", justify="right")
table.add_column("Uptime", justify="right")

# Add rows with inline markup
table.add_row("web-01", "[green]Online[/green]", "34%", "14 days")
table.add_row("web-02", "[green]Online[/green]", "67%", "14 days")
table.add_row("db-01",  "[yellow]Degraded[/yellow]", "89%", "3 days")
table.add_row("cache-01","[red]Offline[/red]", "--", "--")

console.print(table)

Output:

                   Server Status                    
 Server      Status      CPU %   Uptime  
 web-01      Online       34%   14 days  
 web-02      Online       67%   14 days  
 db-01       Degraded     89%    3 days  
 cache-01    Offline       --        --  

Column options like justify="right" and width=20 control the layout. The style="dim" on the Server column makes it visually secondary to the Status column, guiding the reader’s eye to the most important information first. Each cell can contain Rich markup, so you can color individual cell values based on their content — green for Healthy, red for down — without any extra formatting logic.

Progress Bars and Live Displays

Rich’s progress tracking is one of its most-used features. The track() function wraps any iterable and shows a progress bar as you iterate through it:

# progress_bars.py
import time
from rich.progress import track, Progress, SpinnerColumn, TimeElapsedColumn
from rich.console import Console

console = Console()

# Simple progress bar with track()
console.print("[bold]Processing files...[/bold]")
files = [f'file_{i}.csv' for i in range(20)]
for f in track(files, description="Processing..."):
    time.sleep(0.1)  # Simulate work

# Advanced progress with multiple columns
console.print("\n[bold]Running tasks...[/bold]")
tasks_data = [("Downloading data", 50), ("Processing records", 200), ("Saving output", 30)]

with Progress(
    SpinnerColumn(),
    "[progress.description]{task.description}",
    "[progress.percentage]{task.percentage:>3.0f}%",
    TimeElapsedColumn(),
) as progress:
    for task_name, total_steps in tasks_data:
        task = progress.add_task(task_name, total=total_steps)
        for _ in range(total_steps):
            time.sleep(0.01)
            progress.advance(task)

console.print("[green]All tasks complete![/green]")

Output (live in terminal):

Processing files...
Processing... [############################] 20/20 0:00:02

Running tasks...
/ Downloading data      100%  0:00:00
/ Processing records    100%  0:00:02
/ Saving output         100%  0:00:00
All tasks complete!

The track() function is ideal for simple loops. The full Progress context manager is better when you have multiple concurrent tasks or want to customize which columns appear in the progress display. Both update in-place without scrolling the terminal — the progress bar stays on the current lines and is replaced by a completion message when done.

Real-Life Example: System Monitoring CLI Dashboard

Here is a complete system monitoring dashboard that combines Console, Table, Panel, and Rule components to display structured status information in a clean, readable format:

# system_cli.py
import time
import random
from datetime import datetime, timedelta
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.columns import Columns
from rich import box

console = Console()

def get_mock_services():
    """Simulate service status data."""
    return [
        {"name": "web-server",   "status": "running", "cpu": 34, "mem": 512, "uptime": "14d 6h"},
        {"name": "database",     "status": "running", "cpu": 22, "mem": 2048,"uptime": "14d 6h"},
        {"name": "cache",        "status": "running", "cpu": 8,  "mem": 256, "uptime": "14d 6h"},
        {"name": "task-queue",   "status": "warning", "cpu": 78, "mem": 1024,"uptime": "2d 1h"},
        {"name": "email-worker", "status": "stopped", "cpu": 0,  "mem": 0,   "uptime": "--"},
    ]

def get_mock_metrics():
    return {
        "total_requests_today": 142857,
        "errors_today": 23,
        "avg_response_ms": 87,
        "active_users": 1247,
    }

def status_color(status):
    colors = {"running": "green", "warning": "yellow", "stopped": "red"}
    return f'[{colors.get(status, "white")}]{status}[/{colors.get(status, "white")}]'

def cpu_color(cpu):
    if cpu > 80: return f"[red]{cpu}%[/red]"
    if cpu > 60: return f"[yellow]{cpu}%[/yellow]"
    return f"[green]{cpu}%[/green]"

def display_dashboard():
    console.clear()
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Header
    console.rule(f"[bold cyan]System Dashboard -- {now}[/bold cyan]")
    
    # Services table
    table = Table(title="Services", box=box.ROUNDED, show_header=True, header_style="bold")
    table.add_column("Service", style="dim")
    table.add_column("Status", justify="center")
    table.add_column("CPU", justify="right")
    table.add_column("Memory", justify="right")
    table.add_column("Uptime", justify="right")
    
    for svc in get_mock_services():
        mem_str = f'{svc["mem"]} MB' if svc["mem"] > 0 else "--"
        table.add_row(
            svc["name"],
            status_color(svc["status"]),
            cpu_color(svc["cpu"]),
            mem_str,
            svc["uptime"]
        )
    console.print(table)
    
    # Metrics panels
    metrics = get_mock_metrics()
    panels = [
        Panel(f'[bold cyan]{metrics["total_requests_today"]:,}[/bold cyan]\nRequests today', title="Traffic"),
        Panel(f'[bold green]{metrics["active_users"]:,}[/bold green]\nActive users', title="Users"),
        Panel(f'[bold yellow]{metrics["avg_response_ms"]}ms[/bold yellow]\nAvg response', title="Latency"),
        Panel(f'[bold red]{metrics["errors_today"]}[/bold red]\nErrors today', title="Errors"),
    ]
    console.print(Columns(panels))
    console.rule()

display_dashboard()

Output:

──────────────── System Dashboard -- 2026-05-05 10:15:30 ────────────────

         Services
 Service        Status    CPU   Memory   Uptime 
 web-server     running   34%   512 MB   14d 6h 
 database       running   22%   2048 MB  14d 6h 
 cache          running    8%   256 MB   14d 6h 
 task-queue     warning   78%   1024 MB   2d 1h 
 email-worker   stopped    0%   --           -- 

 Traffic        Users         Latency     Errors  
 142,857        1,247         87ms        23      
 Requests today Active users  Avg response Errors today
──────────────────────────────────────────────────────

The console.clear() at the start makes this work as a refreshing dashboard — call display_dashboard() in a loop with a sleep delay to get a live-updating display. Replace the mock data functions with real psutil calls to get actual system metrics. You can also wrap the whole display in a Live() context for smooth flickerless updates.

Frequently Asked Questions

How do I install Rich?

Run pip install rich. Rich requires Python 3.6.3 or higher and works on Windows, macOS, and Linux. It has no required C extensions — it is pure Python. For the best experience on Windows, use Windows Terminal rather than the legacy Command Prompt, which has limited color support. On most modern terminals (iTerm2, GNOME Terminal, VS Code integrated terminal), Rich works out of the box.

Why are colors not showing up in my terminal?

Rich detects color support from the terminal environment. If you run your script in a context without color support (some CI systems, redirected output, or legacy terminals), Rich falls back to plain text automatically. To force colors on, create the Console with Console(force_terminal=True). To always disable colors, use Console(no_color=True). When piping output to a file (python script.py > output.txt), Rich correctly strips color codes since files do not support ANSI escape sequences.

How do I use Rich for logging?

Replace the default logging handler with Rich’s RichHandler: logging.basicConfig(handlers=[RichHandler()]). This formats log output with colored level labels, timestamps, source file locations, and auto-detected code in log messages. The handler integrates with Python’s standard logging module, so all your existing logger.info() and logger.error() calls get Rich formatting automatically without changing your logging code.

How do I make a live auto-refreshing display?

Use Rich’s Live context manager: wrap your renderable (a Table, Layout, or any Rich object) with Live(renderable, refresh_per_second=4). Update the renderable object inside the loop and call live.update(new_renderable) to refresh the display without screen flicker. This is the building block for tools like htop-style monitors, live log tails, and real-time progress dashboards.

What is rich.inspect and when is it useful?

The rich.inspect(obj) function prints a detailed, colored summary of any Python object — its class, attributes, methods, and docstrings. It is more useful than dir(obj) because it shows values alongside names and filters out dunder methods by default. Use it during development to explore unfamiliar library objects: from rich import inspect; inspect(my_object). It works on modules, instances, functions, and built-in types.

Conclusion

Rich transforms terminal output from a debugging afterthought into a first-class communication layer. We covered the Console object for styled printing, Markup for inline color and formatting tags, Tables for structured data, Progress for tracking long-running operations, Panels for boxed content, and the Live component for refreshing dashboards.

The system monitoring dashboard is a solid template for any operations or data CLI tool. Extend it by adding keyboard interaction with rich.prompt, markdown report generation with rich.markdown, or a file viewer with rich.syntax. The pattern of combining multiple Rich components into a single coherent display scales to tools as complex as full terminal IDEs.

See the official Rich documentation for the complete API reference and example gallery.

How To Use Python Pint for Physical Units and Quantities

How To Use Python Pint for Physical Units and Quantities

Intermediate

In 1999, NASA’s Mars Climate Orbiter was lost because one engineering team used metric units and another used imperial units — a 327 million dollar spacecraft burned up in the Martian atmosphere due to a unit mismatch. You are probably not building Mars orbiters, but if your code processes temperatures, distances, speeds, or any other physical quantity, you face the same class of bug every time a raw number crosses a unit boundary without carrying its unit along with it.

The Pint library solves this by treating numbers as quantities — values paired with units that travel together through every calculation. When you multiply meters by meters you get square meters. When you try to add kilometers to kilograms, Pint raises a DimensionalityError instead of silently returning nonsense. Install it with pip install pint.

In this article we will cover creating quantities, converting between units, doing arithmetic with dimensional analysis, defining custom units, formatting output, and integrating Pint with NumPy. We will finish with a unit-safe physics calculator that demonstrates why you never want to go back to bare floats for physical quantities.

Python Pint: Quick Example

Here is the core Pint workflow in under 20 lines. Create a unit registry, attach units to values, and let Pint handle conversions and validation automatically:

# quick_pint.py
from pint import UnitRegistry

ureg = UnitRegistry()

# Create quantities -- value + unit
distance = 10 * ureg.kilometer
time_taken = 30 * ureg.minute
speed = distance / time_taken

print(f'Distance: {distance}')
print(f'Time: {time_taken}')
print(f'Speed: {speed}')
print(f'Speed in km/h: {speed.to(ureg.kilometer / ureg.hour):.2f}')
print(f'Speed in mph: {speed.to("mph"):.2f}')

Output:

Distance: 10 kilometer
Time: 30 minute
Speed: 0.3333333333333333 kilometer / minute
Speed in km/h: 20.00 kilometer / hour
Speed in mph: 12.43 mile / hour

Notice that the speed calculation is done directly by dividing a distance by a time — Pint performs the arithmetic and tracks the compound unit automatically. The .to() method converts to any compatible unit with dimensional analysis. Want to go deeper? Below we cover the full Pint API including error handling, custom units, and NumPy integration.

What Is Pint and Why Use It?

Pint is a Python library for defining, operating, and manipulating physical quantities — numbers that have both a magnitude and a unit. The central object is the Quantity, which wraps a value with its unit and enforces dimensional consistency in all operations.

ApproachCode StyleUnit SafetyConversion Effort
Bare floatsspeed = 20.0None — silent bugsManual everywhere
Commentsspeed = 20.0 # km/hNone — comment ignoredManual everywhere
Pint Quantityspeed = 20 * ureg.kphAutomatic DimensionalityErrorBuilt-in with .to()

The unit registry (UnitRegistry) knows about thousands of units built in — SI units, imperial units, derived units, and common abbreviations. You create it once per module and reuse it across your whole application.

Convert miles to meters without losing your mind.
Convert miles to meters without losing your mind.

Creating Quantities and the Unit Registry

There are several ways to create Pint quantities. The most common is multiplying a number by a unit attribute on the registry. You can also use string parsing for more flexible input:

# creating_quantities.py
from pint import UnitRegistry

ureg = UnitRegistry()

# Method 1: Multiply by registry attribute
mass = 5.5 * ureg.kilogram
temp = 100 * ureg.degC
area = 25 * ureg.meter ** 2

# Method 2: Parse from string (useful for user input)
length = ureg.Quantity('12.5 meters')
velocity = ureg.Quantity(60, 'km/hour')

# Method 3: Q_ shortcut (common in practice)
Q_ = ureg.Quantity
pressure = Q_(101325, 'Pa')

print(f'Mass: {mass}')
print(f'Temperature: {temp}')
print(f'Area: {area}')
print(f'Length (parsed): {length}')
print(f'Velocity (parsed): {velocity}')
print(f'Pressure: {pressure}')

# Access magnitude and units separately
print(f'\nMagnitude: {mass.magnitude}')
print(f'Units: {mass.units}')
print(f'Dimensionality: {mass.dimensionality}')

Output:

Mass: 5.5 kilogram
Temperature: 100 degree_Celsius
Area: 25 meter ** 2
Length (parsed): 12.5 meter
Velocity (parsed): 60 kilometer / hour
Pressure: 101325 pascal

Magnitude: 5.5
Units: kilogram
Dimensionality: [mass]

The string parsing form (ureg.Quantity('12.5 meters')) is particularly valuable when accepting unit input from users or configuration files — you do not have to parse the string yourself or validate the unit name. Pint handles both tasks and raises a UndefinedUnitError if the unit string is not recognized.

Converting Between Units

Unit conversion is where Pint saves the most code. The .to() method converts to any dimensionally compatible unit, and .to_base_units() normalizes to SI base units:

# conversions.py
from pint import UnitRegistry

ureg = UnitRegistry()

# Basic conversions
distance_km = 42.195 * ureg.kilometer  # Marathon distance
print(f'Marathon: {distance_km.to(ureg.mile):.3f}')
print(f'Marathon: {distance_km.to(ureg.meter):.0f}')
print(f'Marathon: {distance_km.to("ft"):.0f}')

# Temperature conversions (offset units -- use .to() not arithmetic)
temp_c = 100 * ureg.degC
print(f'\n100 C in Fahrenheit: {temp_c.to(ureg.degF):.1f}')
print(f'100 C in Kelvin: {temp_c.to(ureg.kelvin):.2f}')

# Compound unit conversion
speed = 100 * ureg.kilometer / ureg.hour
print(f'\n100 km/h in m/s: {speed.to("m/s"):.2f}')
print(f'100 km/h in mph: {speed.to("mph"):.2f}')

# Force conversion
force = 100 * ureg.newton
print(f'\n100 N in pound-force: {force.to("lbf"):.3f}')
print(f'100 N in kgf: {force.to("kgf"):.3f}')

# to_base_units -- normalize to SI
energy = 1 * ureg.kilowatt_hour
print(f'\n1 kWh in base units: {energy.to_base_units():.0f}')

Output:

Marathon: 26.219 mile
Marathon: 42195 meter
Marathon: 138,434 foot

100 C in Fahrenheit: 212.0 degree_Fahrenheit
100 C in Kelvin: 373.15 kelvin

100 km/h in m/s: 27.78 meter / second
100 km/h in mph: 62.14 mile / hour

100 N in pound-force: 22.481 force_pound
100 N in kgf: 10.197 kilogram_force

1 kWh in base units: 3600000 kilogram * meter ** 2 / second ** 2

Temperature conversion deserves a special mention. Unlike most units, Celsius and Fahrenheit are offset units — 0 degrees Celsius is not the same as 0 degrees Kelvin. Pint handles this correctly: 100 * ureg.degC converts to 212 degF as expected. If you are doing temperature arithmetic (differences, not absolute temperatures), use ureg.delta_degC to get correct results for temperature intervals.

Adding meters to seconds: Pint raises. Floats just lie.
Adding meters to seconds: Pint raises. Floats just lie.

Dimensional Analysis and Error Prevention

The real value of Pint is not convenience — it is catching bugs that bare floats silently hide. When you add incompatible units, Pint raises a DimensionalityError immediately instead of returning a meaningless number:

# dimensional_analysis.py
from pint import UnitRegistry, DimensionalityError

ureg = UnitRegistry()

# Arithmetic with matching dimensions -- works correctly
d1 = 5 * ureg.kilometer
d2 = 3000 * ureg.meter
total = d1 + d2  # Pint converts automatically
print(f'Total distance: {total}')
print(f'In km: {total.to("km")}')

# Arithmetic with different-but-compatible dimensions
weight = 70 * ureg.kilogram
g = 9.81 * ureg.meter / ureg.second ** 2
force = weight * g
print(f'\nForce: {force:.2f}')
print(f'Force in N: {force.to(ureg.newton):.2f}')

# Dimensional incompatibility -- caught immediately
mass = 10 * ureg.kilogram
length = 5 * ureg.meter
try:
    bad_result = mass + length  # This makes no physical sense
except DimensionalityError as e:
    print(f'\nCaught: {e}')

# Another bad operation
try:
    speed = 60 * ureg.km / ureg.hour
    nonsense = speed + mass
except DimensionalityError as e:
    print(f'Caught: {e}')

Output:

Total distance: 8.0 kilometer
In km: 8.0 kilometer

Force: 686.70 kilogram * meter / second ** 2
Force in N: 686.70 newton

Caught: Cannot convert from 'kilogram' ([mass]) to 'meter' ([length])
Caught: Cannot convert from 'kilometer / hour' ([length] / [time]) to 'kilogram' ([mass])

When adding 5 km + 3000 m, Pint automatically converts meters to kilometers before summing — you get the right answer without writing any conversion code. And when you try to add kilograms to meters, you get an immediate, descriptive error with the actual dimensions shown. This is the kind of safety that pays off at 2am when you are debugging a calculation that has been silently wrong for months.

Real-Life Example: Unit-Safe Physics Calculator

Here is a physics calculator that uses Pint to ensure all calculations are dimensionally consistent. It computes projectile motion values and never lets a unit slip through without validation:

# physics_calculator.py
from pint import UnitRegistry
import math

ureg = UnitRegistry()
Q_ = ureg.Quantity

def projectile_range(initial_speed, angle_degrees, gravity=None):
    """Calculate the range of a projectile.
    
    Uses the formula: R = (v^2 * sin(2*theta)) / g
    All inputs must be Pint Quantities with compatible units.
    """
    if gravity is None:
        gravity = Q_(9.81, 'm/s^2')
    
    angle_rad = math.radians(angle_degrees)
    sin2theta = math.sin(2 * angle_rad)
    
    # Pint tracks units through the calculation
    range_distance = (initial_speed ** 2 * sin2theta) / gravity
    return range_distance.to(ureg.meter)

def kinetic_energy(mass, velocity):
    """Calculate kinetic energy: E = 0.5 * m * v^2"""
    energy = 0.5 * mass * velocity ** 2
    return energy.to(ureg.joule)

def fuel_efficiency(distance, fuel_used):
    """Calculate fuel efficiency and convert to common units."""
    efficiency = distance / fuel_used
    return {
        'metric': efficiency.to('km/L'),
        'us_mpg': efficiency.to('miles/gallon'),
        'uk_mpg': efficiency.to('miles/imperial_gallon'),
    }

# Test the calculator
print('=== Projectile Calculator ===')
for angle in [15, 30, 45, 60, 75]:
    speed = Q_(50, 'm/s')
    r = projectile_range(speed, angle)
    print(f'Angle {angle:2d}deg: {r:.1f}')

print('\n=== Kinetic Energy ===')
car_mass = Q_(1500, 'kg')
for speed_kph in [60, 100, 120]:
    speed = Q_(speed_kph, 'km/h').to('m/s')
    ke = kinetic_energy(car_mass, speed)
    print(f'{speed_kph} km/h: {ke:.0f} ({ke.to("kJ"):.1f})')

print('\n=== Fuel Efficiency ===')
trip = Q_(400, 'km')
fuel = Q_(35, 'L')
eff = fuel_efficiency(trip, fuel)
for label, value in eff.items():
    print(f'  {label}: {value:.2f}')

Output:

=== Projectile Calculator ===
Angle 15deg: 127.4 meter
Angle 30deg: 220.7 meter
Angle 45deg: 254.8 meter
Angle 60deg: 220.7 meter
Angle 75deg: 127.4 meter

=== Kinetic Energy ===
60 km/h: 208333 joule (208.3 kJ)
100 km/h: 578704 joule (578.7 kJ)
120 km/h: 833333 joule (833.3 kJ)

=== Fuel Efficiency ===
  metric: 11.43 kilometer / liter
  us_mpg: 26.87 mile / gallon
  uk_mpg: 32.27 mile / imperial_gallon

The fuel efficiency function is a good example of Pint’s conversion power — the same calculation expressed in liters-per-100-km produces correct US and UK miles-per-gallon values automatically, including the US/UK gallon distinction (the UK imperial gallon is about 20% larger than the US gallon). Without Pint, you would need conversion factors for every pair of units. With Pint, you write the physics once and let the registry handle the rest.

Unit conversion in one line. The other 99 lines stay business logic.
Unit conversion in one line. The other 99 lines stay business logic.

Frequently Asked Questions

How do I install Pint?

Run pip install pint. Pint requires Python 3.8 or newer. For NumPy array support (so you can attach units to entire arrays), NumPy is required but not listed as a mandatory dependency — install it separately with pip install numpy if needed. Pint integrates with NumPy automatically when both are installed.

Should I create one UnitRegistry or multiple?

Create exactly one UnitRegistry per application and reuse it everywhere. Quantities from different registries cannot be combined — even if they have the same units, 5 * ureg1.meter + 3 * ureg2.meter will raise an error because they are considered different registries. The standard pattern is to create a module-level ureg = UnitRegistry() and import it wherever quantities are needed.

Does Pint work with NumPy arrays?

Yes — attach units to NumPy arrays with np.array([1, 2, 3]) * ureg.meter and all NumPy operations (sum, mean, reshape, etc.) preserve the unit. The resulting quantity behaves like a NumPy array with dimensional validation added. This is particularly useful for scientific computing where you work with arrays of measurements and need to ensure consistent units across calculations.

Why does temperature arithmetic behave differently from other units?

Temperature scales like Celsius and Fahrenheit are offset scales — 0 degC does not mean “zero temperature”, it means the freezing point of water. When you convert an absolute temperature (a specific point on the scale), you need to account for the offset. When you compute a temperature difference (how much hotter is A than B), you do not use the offset. Pint distinguishes these with ureg.degC (offset, for absolute temperatures) and ureg.delta_degC (no offset, for differences). Use the correct one for your context.

Can I add custom units to Pint?

Yes — define custom units using ureg.define(). For example: ureg.define('dozen = 12 = doz') adds a “dozen” unit. You can also load custom units from a text file and pass it to the UnitRegistry constructor. Custom units work like built-in units — they participate in conversions, dimensional analysis, and arithmetic with compatible dimensions.

Conclusion

Pint turns physical quantities from raw floats that silently corrupt calculations into validated, self-documenting values that raise descriptive errors when you mix incompatible units. We covered creating quantities with the UnitRegistry, converting between compatible units with .to(), arithmetic with automatic dimensional tracking, catching dimensionality errors at the source, and building a unit-safe physics calculator that handles projectile motion, kinetic energy, and fuel efficiency conversions.

The projectile and fuel calculator project is a template you can apply to any domain — robotics sensor fusion, chemistry calculations, financial unit conversions, or any scientific application where unit safety prevents costly mistakes. Extend it by loading unit definitions from a configuration file, integrating with pandas DataFrames for labeled unit data, or using pint-pandas for unit-aware data analysis.

For the full API reference and advanced features like Babel localization integration, see the official Pint documentation.

How To Use Python Humanize for Human-Readable Output

How To Use Python Humanize for Human-Readable Output

Beginner

You have a web app that shows a file was “uploaded 1674823847 seconds ago” and a dashboard displaying a disk size as “1073741824 bytes”. Your users can technically read these numbers, but they have to do mental math first — and nobody opens a dashboard to do mental math. The humanize library turns raw numbers and dates into the kind of text people actually expect to read.

Install it with pip install humanize and you get a collection of simple functions that transform machine-precision values into natural language. No configuration required, no boilerplate setup — just call humanize.naturaltime(datetime) and get back “3 minutes ago”.

In this article we will cover the full humanize toolkit: naturaltime and date formatting, natural number representation, file size conversion, word utilities, and localization. We will finish with a real-world CLI dashboard that uses humanize to display system stats in a user-friendly format.

Python humanize: Quick Example

Here is a quick taste of what humanize does. Each function takes a raw Python value and returns a human-friendly string:

# quick_humanize.py
import humanize
from datetime import datetime, timedelta

# Relative time
past = datetime.now() - timedelta(minutes=45)
print(humanize.naturaltime(past))

# File sizes
print(humanize.naturalsize(1_073_741_824))
print(humanize.naturalsize(5_432_100))

# Large numbers
print(humanize.intcomma(1_234_567))
print(humanize.intword(1_234_567_890))

# Ordinals
print(humanize.ordinal(23))
print(humanize.ordinal(42))

Output:

45 minutes ago
1.1 GB
5.4 MB
1,234,567
1.2 billion
23rd
42nd

These seven lines replace the kind of formatting code you would otherwise write from scratch in every project. Want to go deeper? Below we cover every humanize module with more examples and edge cases.

What Is humanize and When Should You Use It?

humanize is a Python library with one job: converting data into text that humans read naturally. It covers four categories of values — times, numbers, file sizes, and text utilities.

ModuleWhat It FormatsExample Output
TimeDatetimes, timedeltas“3 minutes ago”, “in 2 hours”
NumberIntegers, floats, ordinals“1,234,567”, “42nd”, “1.2 billion”
File sizeByte counts“1.1 GB”, “512.0 KB”
TextLists, fractions, AP numbers“one”, “a half”, “one, two and three”

Use humanize whenever your application shows computed values to end users — log viewers, dashboards, CLI tools, admin panels, or any output where raw integers would require mental translation. It is especially valuable for times and file sizes, which have the largest gap between machine representation and human expectation.

'3 minutes ago' beats '1715890234'.
‘3 minutes ago’ beats ‘1715890234’.

Time Formatting with naturaltime and naturalday

The time functions are the most-used part of humanize. naturaltime() takes a datetime and returns a relative description from now. naturalday() handles calendar-relative descriptions like “today” and “yesterday”:

# time_humanize.py
import humanize
from datetime import datetime, timedelta, date

now = datetime.now()

# naturaltime -- relative to current moment
print(humanize.naturaltime(now - timedelta(seconds=30)))
print(humanize.naturaltime(now - timedelta(minutes=5)))
print(humanize.naturaltime(now - timedelta(hours=2)))
print(humanize.naturaltime(now - timedelta(days=1)))
print(humanize.naturaltime(now + timedelta(hours=3)))  # Future

# naturalday -- calendar-relative descriptions
today = date.today()
print(humanize.naturalday(today))
print(humanize.naturalday(today - timedelta(days=1)))
print(humanize.naturalday(today + timedelta(days=1)))
print(humanize.naturalday(today - timedelta(days=5)))  # Past date

# naturaldate -- includes the year for older dates
old_date = date(2023, 6, 15)
print(humanize.naturaldate(old_date))

Output:

30 seconds ago
5 minutes ago
2 hours ago
a day ago
in 3 hours
today
yesterday
tomorrow
last Sunday
Jun 15 2023

Notice that naturaltime() handles both past and future datetimes — the same function works for “2 hours ago” and “in 3 hours”. This makes it ideal for event timestamps, activity feeds, and scheduled task displays where items can be either past or upcoming. The naturalday() function is better for date-only values where the time component is irrelevant.

Number Formatting

humanize provides several functions for making large numbers readable. The most useful are intcomma() for adding thousands separators and intword() for converting very large numbers to words:

# number_humanize.py
import humanize

# intcomma -- add thousands separators
print(humanize.intcomma(1234567))
print(humanize.intcomma(9876543210))
print(humanize.intcomma(1234.5678))  # Works on floats too

# intword -- convert to approximate word form
print(humanize.intword(1_200_000))
print(humanize.intword(3_400_000_000))
print(humanize.intword(7_800_000_000_000))

# ordinal -- add st, nd, rd, th suffixes
for n in [1, 2, 3, 4, 11, 12, 13, 21, 22]:
    print(f'{n} -> {humanize.ordinal(n)}')

# apnumber -- AP style numbers (words for 1-9)
for n in range(1, 12):
    print(f'{n} -> {humanize.apnumber(n)}')

Output:

1,234,567
9,876,543,210
1,234.5678
1.2 million
3.4 billion
7.8 trillion
1 -> 1st
2 -> 2nd
3 -> 3rd
4 -> 4th
11 -> 11th
12 -> 12th
13 -> 13th
21 -> 21st
22 -> 22nd
1 -> one
2 -> two
...
9 -> nine
10 -> 10
11 -> 11

apnumber() follows the Associated Press style guide rule: write out numbers one through nine as words, and use numerals for 10 and above. This is a small but meaningful touch for any copy-heavy application where the output reads like natural prose rather than a data dump. The ordinal function correctly handles the tricky edge cases around 11th, 12th, and 13th (which are “th” not “st/nd/rd”).

Numbers users can read. Numbers users will believe.
Numbers users can read. Numbers users will believe.

File Size Formatting with naturalsize

naturalsize() converts raw byte counts into the readable size strings users expect to see in file browsers and storage dashboards. It supports both binary (1024-based) and decimal (1000-based) prefixes:

# file_size.py
import humanize

sizes = [500, 5_000, 50_000, 500_000, 5_000_000, 50_000_000, 5_000_000_000]

print('--- Decimal (default) ---')
for size in sizes:
    print(f'{size:>15,} bytes -> {humanize.naturalsize(size)}')

print()
print('--- Binary (binary=True) ---')
for size in sizes:
    print(f'{size:>15,} bytes -> {humanize.naturalsize(size, binary=True)}')

print()
# gnu=True uses single-letter suffixes like 'ls -lh'
print('--- GNU style (gnu=True) ---')
for size in sizes:
    print(f'{humanize.naturalsize(size, gnu=True)}')

Output:

--- Decimal (default) ---
            500 bytes -> 500 Bytes
          5,000 bytes -> 5.0 kB
         50,000 bytes -> 50.0 kB
        500,000 bytes -> 500.0 kB
      5,000,000 bytes -> 5.0 MB
     50,000,000 bytes -> 50.0 MB
  5,000,000,000 bytes -> 5.0 GB

--- Binary (binary=True) ---
            500 bytes -> 500 Bytes
          5,000 bytes -> 4.9 KiB
         50,000 bytes -> 48.8 KiB
        500,000 bytes -> 488.3 KiB
      5,000,000 bytes -> 4.8 MiB
     50,000,000 bytes -> 47.7 MiB
  5,000,000,000 bytes -> 4.7 GiB

--- GNU style (gnu=True) ---
500B
4.9K
48.8K
488.3K
4.8M
47.7M
4.7G

The decimal mode (default) uses 1000-based prefixes like hard drive manufacturers use. The binary mode (binary=True) uses 1024-based prefixes — the traditional computer science convention — labeled with “KiB”, “MiB”, “GiB” to distinguish them. Use binary mode when showing memory sizes (RAM, cache), and decimal mode when showing storage sizes (disk, downloads) to match user expectations for each context.

Text Utilities: Lists, Fractions, and More

Beyond numbers and times, humanize includes utilities for formatting text content in natural-sounding ways:

# text_utils.py
import humanize
from fractions import Fraction

# natural_list -- join items with Oxford comma
items_2 = ['Python', 'JavaScript']
items_3 = ['Python', 'JavaScript', 'Rust']
items_4 = ['Python', 'JavaScript', 'Rust', 'Go']
print(humanize.natural_list(items_2))
print(humanize.natural_list(items_3))
print(humanize.natural_list(items_4))

# fractional -- display decimals as fractions
print(humanize.fractional(0.5))
print(humanize.fractional(0.25))
print(humanize.fractional(0.333))
print(humanize.fractional(1.5))

# scientific -- format as scientific notation
print(humanize.scientific(0.0000001234))
print(humanize.scientific(9_876_543_210))

Output:

Python and JavaScript
Python, JavaScript and Rust
Python, JavaScript, Rust and Go
1/2
1/4
1/3
1 1/2
1.23 x 10^-7
9.88 x 10^9

The natural_list() function handles a surprisingly common pain point — joining lists as readable prose. Without it, you end up writing string-joining logic that handles the “and” before the last item, edge cases for one-item and two-item lists, and the Oxford comma. humanize handles all of that in one call.

1024 vs 1 KB. The same bytes, the cognitive load is not.
1024 vs 1 KB. The same bytes, the cognitive load is not.

Real-Life Example: System Stats Dashboard

Here is a CLI dashboard that uses humanize to display system statistics in a format that is immediately readable. It combines time formatting, file sizes, and number formatting to turn raw psutil data into a clean report.

# system_dashboard.py
import humanize
import os
import time
from datetime import datetime, timedelta

# Simulate system stats (replace with psutil for real data)
# pip install psutil for actual system metrics
def get_mock_system_stats():
    return {
        'uptime_seconds': 86400 * 3 + 7200,  # 3 days, 2 hours
        'cpu_percent': 34.7,
        'memory_total': 16_000_000_000,       # 16 GB
        'memory_used': 9_437_184_000,          # ~9 GB
        'disk_total': 512_000_000_000,         # 512 GB
        'disk_used': 234_881_024_000,          # ~235 GB
        'processes': 312,
        'network_sent': 15_728_640_000,        # ~15 GB
        'network_recv': 47_185_920_000,        # ~47 GB
        'last_backup': datetime.now() - timedelta(hours=6, minutes=23),
        'next_task': datetime.now() + timedelta(minutes=45),
    }

def display_dashboard(stats):
    now = datetime.now()
    boot_time = now - timedelta(seconds=stats['uptime_seconds'])
    
    print('=' * 50)
    print('  SYSTEM DASHBOARD')
    print(f'  Generated: {humanize.naturaltime(now)}')
    print('=' * 50)
    
    # Uptime
    uptime = timedelta(seconds=stats['uptime_seconds'])
    print(f'\nUptime: {humanize.precisedelta(uptime)}')
    print(f'Boot time: {humanize.naturalday(boot_time)} at '
          f'{boot_time.strftime("%H:%M")}')
    
    # CPU and Memory
    print(f'\nCPU Usage: {stats["cpu_percent"]:.1f}%')
    print(f'Memory:  {humanize.naturalsize(stats["memory_used"], binary=True)}'
          f' / {humanize.naturalsize(stats["memory_total"], binary=True)}')
    
    # Disk
    disk_free = stats['disk_total'] - stats['disk_used']
    print(f'Disk:    {humanize.naturalsize(stats["disk_used"])} used, '
          f'{humanize.naturalsize(disk_free)} free')
    
    # Network
    print(f'Network: {humanize.naturalsize(stats["network_sent"])} sent, '
          f'{humanize.naturalsize(stats["network_recv"])} received')
    
    # Processes
    print(f'Processes: {humanize.intcomma(stats["processes"])}')
    
    # Scheduled events
    print(f'\nLast backup: {humanize.naturaltime(stats["last_backup"])}')
    print(f'Next task:   {humanize.naturaltime(stats["next_task"])}')
    print('=' * 50)

stats = get_mock_system_stats()
display_dashboard(stats)

Output:

==================================================
  SYSTEM DASHBOARD
  Generated: just now
==================================================

Uptime: 3 days, 2 hours
Boot time: last Monday at 08:15

CPU Usage: 34.7%
Memory:  8.8 GiB / 14.9 GiB
Disk:    234.9 GB used, 277.1 GB free
Network: 15.7 GB sent, 47.2 GB received
Processes: 312

Last backup: 6 hours ago
Next task:   45 minutes from now
==================================================

Compare the “Memory: 8.8 GiB / 14.9 GiB” line to the raw “9,437,184,000 / 16,000,000,000 bytes” alternative. The humanized version tells you at a glance that you have used more than half your RAM; the raw version requires a calculator. The difference becomes even more pronounced in production dashboards where users check these values quickly under pressure. You can install psutil and replace the mock stats function to turn this into a real live dashboard.

Frequently Asked Questions

How do I install humanize?

Run pip install humanize in your terminal. humanize has no required dependencies — it uses only the Python standard library. It works on Python 3.8 and above. After installing, import the module with import humanize and all functions are available directly on the module.

Can humanize output text in languages other than English?

Yes — humanize supports localization through Python’s locale module. Call humanize.activate('de_DE') to switch to German, or use any locale code supported by your system. Call humanize.deactivate() to return to the default English. Note that locale support depends on the locale being installed on your operating system — on Linux, check available locales with locale -a.

What is precisedelta and when should I use it instead of naturaltime?

precisedelta() formats a timedelta into an exact breakdown like “3 days, 2 hours and 15 minutes” rather than the approximate “3 days ago” from naturaltime(). Use naturaltime() for activity feeds and relative event timestamps where approximation is expected. Use precisedelta() for elapsed time displays, uptime counters, and any context where precision matters to the user — like “your session expires in 1 minute and 30 seconds”.

When should I use binary=True in naturalsize?

Use binary=True when displaying memory sizes (RAM, cache, buffers) because memory has always been measured in powers of 1024. Use the default decimal mode (1000-based) for storage sizes (disk space, download sizes) because modern storage manufacturers use 1000-based numbers. Mixing them leads to confusing displays where “8 GB RAM” and “8 GB free disk” refer to different actual amounts of bytes.

Can I add custom formatting rules to humanize?

humanize is designed as a library of utility functions, not a customizable formatting engine. For custom formatting needs, wrap the humanize output with your own logic — for example, a function that calls humanize.naturalsize() and then appends a custom suffix or brackets. For very custom natural-language output (pluralization rules in non-English languages, domain-specific units), consider the inflect library, which has more advanced text handling capabilities.

Conclusion

humanize is one of those libraries that instantly improves every output-facing part of your Python code. We covered naturaltime and naturalday for relative time descriptions, intcomma and intword for readable numbers, naturalsize for file sizes in both decimal and binary notation, natural_list for joining collections, fractional for fraction display, and precisedelta for exact time spans.

The system dashboard project shows how these functions combine to create output that communicates clearly with human readers instead of forcing them to decode raw numbers. Try extending it with real psutil data, add color with the Rich library, or embed it in a Flask/FastAPI endpoint that serves dashboard JSON. The formatting logic will remain exactly the same.

See the official humanize documentation for the full function reference.

How To Use Python pyserial for Serial Port Communication

How To Use Python pyserial for Serial Port Communication

Intermediate

Whether you are building a weather station with a Raspberry Pi, logging data from an Arduino sensor, or communicating with industrial equipment over RS-232, Python gives you a clean, cross-platform API to talk to serial ports. The pyserial library makes serial communication approachable — you open a port, send bytes, read bytes, and close it. No C drivers, no platform-specific system calls.

The good news is that pyserial works on Windows, macOS, and Linux with the same code. You install it once with pip install pyserial, and the same script runs on your laptop and on an embedded Linux board. The library handles port configuration, timeouts, and byte buffers for you.

In this article we will cover everything you need to send and receive data over serial ports in Python. We will start with installation and port discovery, then walk through writing and reading bytes, configuring baud rates and timeouts, handling communication errors, and finish with a real-world data logger project that reads sensor output continuously.

Talking to a Serial Port: Quick Example

Before diving into the details, here is a minimal working example that opens a serial port, sends a command, and reads the response. This is the core pattern you will use in every serial project.

# quick_serial.py
import serial
import time

# Open the port -- adjust the port name for your OS
# Windows: 'COM3', 'COM4', etc.
# Linux/macOS: '/dev/ttyUSB0', '/dev/tty.usbserial-XXXX'
port = serial.Serial(
    port='/dev/ttyUSB0',
    baudrate=9600,
    timeout=1  # Read timeout in seconds
)

time.sleep(2)  # Allow device to reset after connection

port.write(b'HELLO\n')        # Send bytes -- note the b'' prefix
response = port.readline()     # Read until newline or timeout
print('Received:', response.decode('utf-8').strip())

port.close()

Output:

Received: OK

The key insight here is that serial communication is byte-oriented — you always send and receive bytes, not strings. The b'HELLO\n' syntax creates a bytes literal, and decode('utf-8') converts the response bytes back to a string for printing. The timeout=1 argument prevents readline() from blocking forever if the device does not respond.

Want to go deeper? Below we cover port discovery, all the configuration options, error handling, and a complete data logger project that runs indefinitely.

What Is Serial Communication and When Do You Use It?

Serial communication means sending data one bit at a time over a single wire (plus a ground reference). It is one of the oldest and most widely supported communication protocols in electronics, which is exactly why it is still everywhere — Arduino boards, GPS modules, industrial PLCs, barcode scanners, and scientific instruments all use it.

The key parameters that both sides must agree on are baud rate (bits per second), data bits (almost always 8), stop bits (usually 1), and parity (usually None). If these settings do not match between your Python script and the device, you will get garbage data or nothing at all.

ParameterCommon ValuesWhat It Controls
Baud rate9600, 115200, 57600Bits per second transferred
Data bits8 (almost universal)Bits per character
Stop bits1 (default), 2Bits marking end of character
ParityNone, Even, OddError detection bit
Timeout0.5, 1, 2 (seconds)How long to wait for data

The pyserial library exposes all of these as constructor arguments, so you can match whatever your hardware device expects.

Serial is just slower TCP without the niceties.
Serial is just slower TCP without the niceties.

Installing pyserial and Finding Your Port

Install pyserial with pip. It has no required dependencies beyond the Python standard library:

# install_pyserial.py
# Run this in your terminal (not Python):
# pip install pyserial

import serial
print('pyserial version:', serial.__version__)

Output:

pyserial version: 3.5

Before you can open a port, you need to know its name. The port name depends on the operating system and the USB-to-serial adapter you are using. The serial.tools.list_ports module lets you enumerate all available ports programmatically instead of guessing:

# list_ports.py
from serial.tools import list_ports

# List all available serial ports
ports = list_ports.comports()

if not ports:
    print('No serial ports found.')
else:
    for port in ports:
        print(f'Port: {port.device}')
        print(f'  Description: {port.description}')
        print(f'  Hardware ID: {port.hwid}')
        print()

Output:

Port: /dev/ttyUSB0
  Description: USB2.0-Serial
  Hardware ID: USB VID:PID=1A86:7523 LOCATION=1-1.2

Port: /dev/ttyACM0
  Description: Arduino Uno
  Hardware ID: USB VID:PID=2341:0043 LOCATION=1-1.3

The description and hwid fields are incredibly useful for identifying which port belongs to which device. The Hardware ID includes the USB Vendor ID (VID) and Product ID (PID), which are unique to each manufacturer. You can use these to auto-detect a specific device in your scripts rather than hardcoding the port name, which changes between computers and reboots.

Opening and Configuring a Port

There are two ways to open a serial port with pyserial: by passing all arguments to the constructor (which opens immediately), or by creating a port object and opening it separately. For most scripts, the constructor approach is cleaner:

# open_port.py
import serial

# Method 1: Open immediately in the constructor
ser = serial.Serial(
    port='/dev/ttyUSB0',
    baudrate=115200,       # Match your device's baud rate
    bytesize=serial.EIGHTBITS,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    timeout=1,             # Read timeout (seconds)
    write_timeout=1        # Write timeout (seconds)
)

print('Port open:', ser.is_open)
print('Port name:', ser.name)
print('Baud rate:', ser.baudrate)

ser.close()

# Method 2: Open separately (useful for port switching)
ser2 = serial.Serial()
ser2.port = '/dev/ttyUSB0'
ser2.baudrate = 9600
ser2.timeout = 0.5
ser2.open()
print('Port 2 open:', ser2.is_open)
ser2.close()

Output:

Port open: True
Port name: /dev/ttyUSB0
Baud rate: 115200
Port 2 open: True

The timeout parameter is one of the most important settings. Without it, read() and readline() will block indefinitely waiting for data that may never come — this hangs your script. With timeout=1, reads return after 1 second even if no data arrived, letting your code handle the absence of data gracefully.

Wrong baud rate? You get random gibberish. Same as life.
Wrong baud rate? You get random gibberish. Same as life.

Writing and Reading Data

Serial communication is fundamentally about sending and receiving bytes. pyserial gives you several methods for reading data, each suited to different communication protocols:

# write_read.py
import serial
import time

ser = serial.Serial('/dev/ttyUSB0', baudrate=9600, timeout=1)
time.sleep(2)  # Wait for device to initialize

# --- Writing ---
# write() sends bytes and returns the number of bytes written
num_bytes = ser.write(b'GET_TEMP\n')
print(f'Sent {num_bytes} bytes')

# --- Reading methods ---

# readline() -- reads until '\n' or timeout
line = ser.readline()
print('readline:', line.decode('utf-8').strip())

# read(n) -- reads exactly n bytes (or fewer if timeout)
ser.write(b'STATUS\n')
chunk = ser.read(10)   # Read up to 10 bytes
print('read(10):', chunk)

# read_until() -- reads until a specific terminator
ser.write(b'VERSION\n')
response = ser.read_until(b'\r\n')   # Read until CRLF
print('read_until:', response.decode('utf-8').strip())

# in_waiting -- check how many bytes are waiting in the buffer
ser.write(b'PING\n')
time.sleep(0.1)
print(f'Bytes waiting: {ser.in_waiting}')
buffered = ser.read(ser.in_waiting)  # Read everything available
print('buffered read:', buffered.decode('utf-8').strip())

ser.close()

Output:

Sent 9 bytes
readline: TEMP=23.4
read(10): b'OK\r\n'
read_until: 1.2.3
Bytes waiting: 4
buffered read: PONG

The in_waiting property combined with read(ser.in_waiting) is a common pattern for draining the input buffer — it reads whatever bytes have already arrived without waiting for more. This is useful for fast polling loops where you want to process all available data immediately rather than waiting for a line terminator.

Using a Context Manager

Like file handles, serial ports should always be closed when you are done with them. The cleanest way to ensure this — even if an exception occurs — is to use serial.Serial as a context manager with the with statement:

# context_manager.py
import serial
import time

# The port closes automatically when the block exits, even on exception
with serial.Serial('/dev/ttyUSB0', baudrate=9600, timeout=1) as ser:
    time.sleep(2)
    ser.write(b'GET_STATUS\n')
    response = ser.readline()
    print('Status:', response.decode('utf-8').strip())

# Port is now closed -- no ser.close() needed
print('Port closed automatically')

Output:

Status: READY
Port closed automatically

Using the context manager is the recommended pattern for any script that opens a serial port. It prevents port leaks where the port stays open in the OS even after your Python script exits, which would cause “port already in use” errors the next time you try to connect.

Serial null-modems and crossover cables: the rite of passage.
Serial null-modems and crossover cables: the rite of passage.

Handling Errors and Timeouts

Real hardware is messy — devices reset unexpectedly, cables get unplugged, and baud rates get misconfigured. Robust serial code must handle these situations without crashing:

# error_handling.py
import serial
import serial.serialutil
import time

def safe_read_line(ser, retries=3):
    """Read a line with retry logic."""
    for attempt in range(retries):
        try:
            line = ser.readline()
            if line:  # Empty bytes b'' means timeout
                return line.decode('utf-8').strip()
            else:
                print(f'Timeout on attempt {attempt + 1}')
        except serial.SerialException as exc:
            print(f'Serial error: {exc}')
            if attempt < retries - 1:
                time.sleep(0.5)
    return None

try:
    with serial.Serial('/dev/ttyUSB0', baudrate=9600, timeout=1) as ser:
        time.sleep(2)
        
        # Flush any stale data in the buffer
        ser.reset_input_buffer()
        ser.reset_output_buffer()
        
        ser.write(b'MEASURE\n')
        result = safe_read_line(ser)
        
        if result is not None:
            print('Measurement:', result)
        else:
            print('Device did not respond after retries')

except serial.SerialException as exc:
    # Port not found, permission denied, etc.
    print(f'Could not open port: {exc}')
except PermissionError:
    print('Permission denied -- try: sudo usermod -a -G dialout $USER')

Output:

Measurement: 23.7C

The empty bytes check (if line:) is critical -- when readline() times out, it returns b'' (empty bytes), not None. Calling .decode() on empty bytes gives an empty string, which is easy to miss. Always check that the read returned actual data before processing it. The reset_input_buffer() call clears any stale bytes that accumulated while your script was doing other things -- a good habit before sending a command that expects a fresh response.

Real-Life Example: Continuous Sensor Data Logger

Here is a practical data logger that reads sensor values continuously from a serial device, handles disconnections gracefully, and saves the readings to a CSV file. This pattern is used in everything from DIY weather stations to industrial monitoring systems.

# sensor_logger.py
import serial
import serial.serialutil
import csv
import time
from datetime import datetime
from serial.tools import list_ports

def find_arduino_port():
    """Find the first Arduino port automatically by USB VID."""
    for port in list_ports.comports():
        # Arduino's USB vendor ID is 0x2341
        if '2341' in port.hwid:
            return port.device
    return None

def parse_sensor_line(raw_line):
    """Parse 'TEMP=23.4,HUM=55.2' format into a dict."""
    result = {}
    try:
        for part in raw_line.split(','):
            key, value = part.split('=')
            result[key.strip()] = float(value.strip())
    except (ValueError, AttributeError):
        pass  # Return empty dict on parse failure
    return result

def run_logger(port_name, output_file='sensor_log.csv', duration=60):
    """Log sensor data for `duration` seconds."""
    fieldnames = ['timestamp', 'TEMP', 'HUM', 'PRESSURE']
    
    with open(output_file, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        
        start_time = time.time()
        readings = 0
        
        try:
            with serial.Serial(port_name, baudrate=9600, timeout=1) as ser:
                time.sleep(2)  # Device reset delay
                ser.reset_input_buffer()
                print(f'Logging from {port_name} for {duration}s...')
                
                while (time.time() - start_time) < duration:
                    line = ser.readline().decode('utf-8', errors='ignore').strip()
                    
                    if not line:
                        continue  # Timeout -- keep waiting
                    
                    data = parse_sensor_line(line)
                    if not data:
                        print(f'Skipped unparseable line: {line!r}')
                        continue
                    
                    data['timestamp'] = datetime.now().isoformat()
                    writer.writerow(data)
                    csvfile.flush()  # Write to disk immediately
                    readings += 1
                    print(f'[{data["timestamp"]}] {line}')
                    
        except serial.SerialException as exc:
            print(f'Serial error (device disconnected?): {exc}')
        
        print(f'Done. {readings} readings saved to {output_file}')

if __name__ == '__main__':
    port = find_arduino_port() or '/dev/ttyUSB0'
    run_logger(port_name=port, duration=60)

Output:

Logging from /dev/ttyACM0 for 60s...
[2026-05-05T10:00:01.234] TEMP=23.4,HUM=55.2
[2026-05-05T10:00:02.235] TEMP=23.5,HUM=55.1
[2026-05-05T10:00:03.236] TEMP=23.4,HUM=55.3
...
Done. 58 readings saved to sensor_log.csv

This logger auto-detects the Arduino port by USB vendor ID, so it works across different computers without hardcoding the port name. The csvfile.flush() call ensures data is written to disk after every reading -- if the program is interrupted (Ctrl+C, power loss), you won't lose any data. You can extend this by adding a SIGINT handler for graceful shutdown, uploading readings to an MQTT broker, or triggering alerts when values exceed thresholds.

Frequently Asked Questions

What does "could not open port: [Errno 2] No such file or directory" mean?

This error means the port name you specified does not exist on your system. On Linux, USB serial adapters typically appear as /dev/ttyUSB0 or /dev/ttyACM0. On macOS, they appear as /dev/tty.usbserial-XXXX where XXXX is the adapter's serial number. On Windows, they appear as COM3, COM4, etc. Use list_ports.comports() to enumerate available ports and find the correct name rather than guessing.

Why am I getting "Permission denied" on Linux?

On Linux, serial ports belong to the dialout group by default. Your user account needs to be in that group to open serial ports without running as root. Add yourself with sudo usermod -a -G dialout $USER and then log out and back in. Alternatively, run your script with sudo as a temporary workaround, though this is not recommended for long-term use.

Why am I receiving garbage data instead of readable text?

Garbage data almost always means the baud rate does not match between your Python script and the serial device. Both ends must use the exact same baud rate -- if the device sends at 115200 baud but your script opens the port at 9600, you will get nonsensical bytes. Check your device's documentation or datasheet for the correct baud rate. Other causes include wrong parity or stop bit settings, but baud rate mismatch is by far the most common.

Why does readline() hang forever?

By default, readline() blocks until it receives a newline character (\n) or the port is closed. If the device never sends a newline, it blocks forever. Always set a timeout when opening the port -- this makes readline() return after the timeout period even if no newline was received. For devices that use carriage return + newline (\r\n), use read_until(b'\r\n') instead of readline().

Can I read from multiple serial ports simultaneously?

Yes -- use Python's threading module to run a reader loop in a separate thread for each port. Create one serial.Serial object per port and pass each to its own thread. The threading.Event class is useful for coordinating shutdown -- set a stop event and check it in each reader loop. For more advanced use cases, the asyncio module with serial_asyncio (a separate package) provides coroutine-based serial reading.

Conclusion

pyserial gives Python direct access to the serial hardware layer that powers countless embedded systems, scientific instruments, and industrial controllers. In this article we covered installing pyserial and discovering available ports, opening a port with the right baud rate and timeout settings, sending bytes with write() and receiving data with readline() and read(), using context managers for safe port handling, and building a robust data logger with error handling and CSV output.

The real-life sensor logger is a solid foundation -- extend it by adding data visualization with matplotlib, pushing readings to a cloud database, or using the threading module to read from multiple ports simultaneously. The skills transfer directly to any device that speaks serial: GPS modules, temperature sensors, motor controllers, and RFID readers all use the same read/write pattern.

For the full pyserial API reference, see the official pyserial documentation.

How To Use Python toolz for Functional Programming

How To Use Python toolz for Functional Programming

Intermediate

Python is a multi-paradigm language, and functional programming is one of its strongest modes — but the standard library tools for FP (map, filter, functools.reduce, functools.partial) are scattered, limited, and often awkward to compose. Writing a data transformation pipeline in pure Python often forces you to choose between deeply nested function calls that are hard to read, or imperative loops that mix transformation logic with iteration. There is a better way.

The toolz library provides a clean, composable toolkit for functional Python: curry for partial application without the boilerplate, pipe for threading data through transformation steps, compose for building reusable function chains, and dictionary utilities like valmap, valfilter, and reduceby that work on mappings the way map/filter/reduce work on lists. Install with pip install toolz. A Cython-compiled variant, cytoolz, offers 2-5x speed improvements for performance-critical code.

This tutorial walks through the most useful toolz patterns: building readable data pipelines with pipe, creating reusable partial functions with curry, combining functions with compose and juxt, and transforming nested dictionaries with the mapping utilities. By the end, you will be writing Python data transformations that read like specifications — and are trivially unit-testable because every step is a pure function.

toolz Quick Example

The two most immediately useful functions are pipe (thread data through functions left to right) and curry (partial application on any function). Here they are together:

# toolz_quick.py
from toolz import pipe, curry

# pipe: data flows left to right through each function
result = pipe(
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    lambda nums: filter(lambda n: n % 2 == 0, nums),  # keep evens
    lambda nums: map(lambda n: n ** 2, nums),           # square them
    list,                                               # materialize
    sum                                                 # sum
)
print("pipe result:", result)  # (2^2 + 4^2 + 6^2 + 8^2 + 10^2) = 220

# curry: partial application without functools.partial boilerplate
@curry
def multiply(factor, value):
    return factor * value

double = multiply(2)     # returns a new function with factor=2 fixed
triple = multiply(3)

print("double 7:", double(7))     # 14
print("triple 7:", triple(7))     # 21
print("map double:", list(map(double, [1, 2, 3, 4, 5])))

Output:

pipe result: 220
double 7: 14
triple 7: 21
map double: [2, 4, 6, 8, 10]

pipe eliminates nested function calls by making the data flow explicit and left-to-right. curry turns any multi-argument function into a series of single-argument functions — each partial application returns a new callable that waits for the remaining arguments. This makes creating specialized variants of general functions clean and readable.

pipe(data, step1, step2): read it like English, not like LISP archaeology.
pipe(data, step1, step2): read it like English, not like LISP archaeology.

What Is toolz and When Should You Use It?

Toolz is a functional programming utility library that brings Haskell and Clojure-style higher-order function patterns to Python. It is not a replacement for Python’s functools or itertools — it is a complement that fills the gaps and provides higher-level composition tools.

toolz functionStandard library equivalentWhat toolz adds
curryfunctools.partialAuto-currying on call, not just one application
pipeNested function callsLeft-to-right readable chaining
composelambda x: f(g(x))Named, reusable function compositions
juxtMultiple lambdasApply N functions to same input, return tuple
reducebyitertools.groupby + reduceGroupBy and aggregate in one pass
valmap/valfilterDict comprehensionMap/filter over dict values with key preserved
merge/merge_withdict.updateNon-mutating dict merge with conflict resolution

Use toolz when your code has data transformation pipelines that would benefit from being expressed as function chains, when you find yourself writing many functools.partial calls, or when you want to write pure-function transformations that are easy to test in isolation. Avoid it in code where readability for non-FP-aware developers is the priority — not everyone on a team will be comfortable with heavy currying.

Building Reusable Pipelines with compose and juxt

compose creates a new function by chaining existing functions together. Unlike pipe (which applies a single input to a chain), compose produces a reusable function object you can call many times and pass as arguments.

# toolz_compose.py
from toolz import compose, juxt
import re

# Individual transformation functions (pure, testable in isolation)
def remove_punctuation(text):
    return re.sub(r"[^\w\s]", "", text)

def to_lowercase(text):
    return text.lower()

def split_words(text):
    return text.split()

def count_unique(words):
    return len(set(words))

# compose: right-to-left (innermost applied first, like math notation)
# This reads: count_unique(split_words(to_lowercase(remove_punctuation(text))))
word_counter = compose(count_unique, split_words, to_lowercase, remove_punctuation)

text = "Python is Great! Python is also FAST -- and Python is readable."
print("Unique word count:", word_counter(text))  # 6: python, is, great, also, fast, and

# Apply to multiple inputs
texts = [
    "Hello World! Hello again.",
    "The quick brown fox jumps.",
    "Data data data DATA!"
]
print("Unique counts:", list(map(word_counter, texts)))

# juxt: apply multiple functions to the same input, get tuple of results
analyze = juxt(len, count_unique, sorted)
words = split_words(to_lowercase(remove_punctuation(text)))
total, unique, sorted_words = analyze(words)
print(f"Total words: {total}, Unique: {unique}, First 3: {sorted_words[:3]}")

Output:

Unique word count: 6
Unique counts: [3, 5, 2]
Total words: 11, Unique: 6, First 3: ['also', 'and', 'fast']

compose applies functions right-to-left (like mathematical function composition). If you prefer left-to-right order (matching how you read the transformations), use compose‘s reversed equivalent or just use pipe with a fixed input. juxt (“juxtapose”) applies a collection of functions to the same input and returns a tuple of results — perfect for computing multiple statistics or extracting multiple fields from a record in one pass.

Dictionary Utilities: valmap, valfilter, merge_with

Dictionaries are ubiquitous in Python data processing — API responses, configuration, aggregated counts — and toolz provides clean functional tools for transforming them without mutation.

# toolz_dicts.py
from toolz import valmap, valfilter, keymap, merge, merge_with

# Sales data: category -> revenue
sales = {"Electronics": 45200, "Books": 8750, "Clothing": 22100, "Food": 15800}

# valmap: apply a function to every value, keep keys
revenue_k = valmap(lambda v: round(v / 1000, 1), sales)
print("Revenue (K):", revenue_k)

# valfilter: keep only entries where value matches predicate
high_revenue = valfilter(lambda v: v > 20000, sales)
print("High revenue:", high_revenue)

# keymap: transform keys
upper_keys = keymap(str.upper, sales)
print("Upper keys:", upper_keys)

# merge: non-mutating dict merge (last wins on conflict)
defaults = {"timeout": 30, "retries": 3, "verbose": False}
overrides = {"timeout": 60, "verbose": True}
config = merge(defaults, overrides)
print("Merged config:", config)

# merge_with: merge dicts with a function to resolve conflicts
monthly_a = {"Jan": 100, "Feb": 150, "Mar": 200}
monthly_b = {"Jan": 80, "Feb": 120, "Apr": 90}
combined = merge_with(sum, monthly_a, monthly_b)
print("Combined monthly:", combined)

Output:

Revenue (K): {'Electronics': 45.2, 'Books': 8.75, 'Clothing': 22.1, 'Food': 15.8}
High revenue: {'Electronics': 45200, 'Clothing': 22100}
Upper keys: {'ELECTRONICS': 45200, 'BOOKS': 8750, 'CLOTHING': 22100, 'FOOD': 15800}
Merged config: {'timeout': 60, 'retries': 3, 'verbose': True}
Combined monthly: {'Jan': 180, 'Feb': 270, 'Mar': 200, 'Apr': 90}

merge_with is particularly useful for aggregating multiple data sources: pass sum to add numeric values across dicts, list to collect values into a list, or any custom function to handle conflicts. Unlike dict comprehensions, these functions are named, composable, and express intent clearly.

10 pure functions, 0 mutable state. Your colleagues will either love you or fear you.
10 pure functions, 0 mutable state. Your colleagues will either love you or fear you.

Grouping and Aggregating with reduceby

reduceby combines groupby and reduce into a single pass over an iterable. Given a keying function and a reducing function, it groups records by key and reduces each group to a single value — all without building intermediate grouped structures.

# toolz_reduceby.py
from toolz import reduceby, countby, groupby
from operator import add

# Sales transactions
transactions = [
    {"region": "AU", "amount": 250.0, "category": "Electronics"},
    {"region": "US", "amount": 180.0, "category": "Books"},
    {"region": "AU", "amount": 420.0, "category": "Clothing"},
    {"region": "US", "amount": 95.0, "category": "Books"},
    {"region": "AU", "amount": 310.0, "category": "Electronics"},
    {"region": "UK", "amount": 175.0, "category": "Food"},
    {"region": "US", "amount": 630.0, "category": "Electronics"},
]

# reduceby: group by region, sum amounts
total_by_region = reduceby(
    lambda t: t["region"],
    lambda acc, t: acc + t["amount"],
    transactions,
    0  # initial value for each group
)
print("Total by region:", {k: round(v, 2) for k, v in total_by_region.items()})

# countby: count items by a key function
count_by_category = countby(lambda t: t["category"], transactions)
print("Count by category:", count_by_category)

# groupby: collect full records by key (like SQL GROUP BY)
by_region = groupby(lambda t: t["region"], transactions)
print("AU transactions:", len(by_region["AU"]))
print("First AU amount:", by_region["AU"][0]["amount"])

Output:

Total by region: {'AU': 980.0, 'US': 905.0, 'UK': 175.0}
Count by category: {'Electronics': 3, 'Books': 2, 'Clothing': 1, 'Food': 1}
AU transactions: 3
First AU amount: 250.0

reduceby is more efficient than groupby + dict comprehension because it accumulates results in one pass without building a full grouped dictionary first. For large datasets, the difference in memory usage is significant. countby is a convenient special case that counts occurrences by key — effectively a histogram over any grouping function.

Real-Life Example: Sales Data Pipeline

merge_with(sum, a, b): dict.update() mutates and lies about it.
merge_with(sum, a, b): dict.update() mutates and lies about it.
# toolz_sales_pipeline.py
from toolz import pipe, curry, compose, valmap, merge_with, reduceby, countby
from functools import reduce

# Raw transaction data
transactions = [
    {"id": 1, "region": "AU", "category": "Electronics", "amount": 1200.00, "qty": 2},
    {"id": 2, "region": "US", "category": "Books", "amount": 45.00, "qty": 3},
    {"id": 3, "region": "AU", "category": "Clothing", "amount": 280.00, "qty": 1},
    {"id": 4, "region": "US", "category": "Electronics", "amount": 899.99, "qty": 1},
    {"id": 5, "region": "AU", "category": "Books", "amount": 60.00, "qty": 2},
    {"id": 6, "region": "UK", "category": "Electronics", "amount": 1500.00, "qty": 3},
    {"id": 7, "region": "US", "category": "Clothing", "amount": 150.00, "qty": 2},
    {"id": 8, "region": "AU", "category": "Electronics", "amount": 750.00, "qty": 1},
]

# --- Step 1: Build reusable transformation functions ---

@curry
def filter_by_region(region, txns):
    return [t for t in txns if t["region"] == region]

@curry
def above_amount(min_amount, txns):
    return [t for t in txns if t["amount"] >= min_amount]

def add_total_value(txns):
    return [{**t, "total_value": t["amount"] * t["qty"]} for t in txns]

def extract_total_values(txns):
    return [t["total_value"] for t in txns]

# --- Step 2: Compose a pipeline ---

au_high_value_pipeline = compose(
    sum,
    extract_total_values,
    add_total_value,
    above_amount(500),
    filter_by_region("AU")
)

print("AU high-value total:", au_high_value_pipeline(transactions))

# --- Step 3: Aggregate across all regions ---

revenue_by_category = reduceby(
    lambda t: t["category"],
    lambda acc, t: acc + t["amount"] * t["qty"],
    transactions,
    0.0
)

print("\nRevenue by category:")
for cat, rev in sorted(revenue_by_category.items(), key=lambda x: -x[1]):
    print(f"  {cat}: ${rev:,.2f}")

# --- Step 4: Multi-region report using pipe ---
region_summary = pipe(
    transactions,
    lambda txns: reduceby(lambda t: t["region"], lambda a, t: a + t["amount"], txns, 0.0),
    lambda by_region: valmap(lambda v: round(v, 2), by_region),
)
print("\nRevenue by region:", region_summary)

Output:

AU high-value total: 3150.0

Revenue by category:
  Electronics: $9597.99
  Clothing: $580.00
  Books: $255.00

Revenue by region: {'AU': 2290.0, 'US': 1094.99, 'UK': 4500.0}

Every transformation function is pure — it takes inputs and returns outputs with no side effects and no mutation. This makes each function trivially unit-testable. The pipeline au_high_value_pipeline is itself a function, which means you can call it in tests with different data, use it in a larger pipe, or swap out individual components by redefining the compose chain.

Frequently Asked Questions

How does toolz compare to funcy?

funcy and toolz cover similar ground but with different emphases. toolz focuses on composability and the mathematical FP primitives — curry, compose, pipe, reduceby. funcy has more utility functions for everyday tasks like compact, walk, pluck, and omit, and generally has a more Pythonic feel. Many developers install both; they complement each other well. If you are writing data pipelines, start with toolz. If you need utility functions for working with dicts and lists in a less functional style, add funcy.

When should I use cytoolz instead of toolz?

cytoolz is a Cython reimplementation of the core toolz functions that runs 2-5x faster. The API is identical — you can swap the import from toolz to cytoolz with no other code changes. Use cytoolz when you are processing large iterables (millions of items) in tight loops where the overhead of Python function calls becomes measurable. For typical application code, the speed difference is negligible and plain toolz is fine.

When should I use curry vs functools.partial?

functools.partial fixes specific positional or keyword arguments in a single step. curry makes a function auto-applying — each call with fewer arguments than required returns a new partially-applied function, while a call with all remaining arguments returns the result. Use curry when you want to create a family of related functions by progressive application (like multiply(2), multiply(10)). Use functools.partial when you just need to pin specific arguments in one shot without the FP overhead.

Does toolz enforce immutability?

No — toolz functions are written to be non-mutating (they return new objects instead of modifying inputs), but Python has no built-in enforcement of immutability. If you pass a mutable dict to valmap, it returns a new dict and leaves the original untouched. However, your own functions in a pipe or compose chain can still mutate state if written that way. The discipline of writing pure functions is yours to maintain; toolz just provides the plumbing that makes it easy.

What is the difference between pipe and compose?

pipe(data, f, g, h) applies the functions to a specific value immediately and returns the result. compose(h, g, f) returns a new function that applies the chain when called — no value is required yet. Use pipe when you have data ready and want to transform it now. Use compose when you want to build a reusable transformation that will be applied to different inputs later, passed as a callback, or tested independently.

Conclusion

The toolz library brings readable, composable functional programming to Python data pipelines. You have seen pipe for left-to-right data threading, curry for clean partial application, compose and juxt for building reusable function objects, valmap/valfilter/merge_with for dictionary transformations, and reduceby/countby for single-pass group aggregations. The sales pipeline example showed how these tools combine into a system where each transformation step is a pure, independently testable function.

Extend the pipeline by adding a curry-based discount function that takes a region-specific rate, composing it into the au_high_value_pipeline before the final sum. For the complete API reference, visit the toolz documentation. For the faster Cython variant, see cytoolz on GitHub.

How To Use Python responses for Mocking HTTP Requests in Tests

How To Use Python responses for Mocking HTTP Requests in Tests

Intermediate

Any code that calls an external API creates a testing headache. Running real HTTP requests in tests means your tests depend on network availability, API rate limits, third-party uptime, and credentials — factors entirely outside your control. Tests that pass on your laptop fail in CI. Tests that run in 200ms suddenly take 30 seconds waiting for a slow endpoint. Error handling code is nearly impossible to test because you cannot force a real API to return a 500 error on demand. The result: developers skip testing HTTP-calling code, and bugs hide there until production finds them.

The responses library intercepts calls to the requests library before they hit the network and returns pre-configured fake responses instead. Your production code calls requests.get() normally; during tests, responses catches that call and returns whatever JSON, status code, or error you specify. The real network is never touched. Install it with pip install responses. It requires only requests as a dependency, which you already have if you are making HTTP calls.

This tutorial covers the full responses workflow: registering mock responses as a decorator and context manager, testing error conditions and network failures, using callback functions for dynamic responses, verifying that your code makes the correct API calls, and integrating with pytest. By the end, you will be able to test every HTTP code path — success, error, timeout, redirect — with fast, deterministic tests.

responses Quick Example

The pattern is simple: activate responses, register a URL with a fake response, then run your code. Any call to that URL via requests returns your fake data instead of hitting the network.

# test_responses_quick.py
import responses
import requests

def get_user(user_id: int) -> dict:
    """Fetch a user from a REST API."""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    return resp.json()

@responses.activate
def test_get_user_success():
    # Register the fake response BEFORE calling your code
    responses.add(
        responses.GET,
        "https://jsonplaceholder.typicode.com/users/1",
        json={"id": 1, "name": "Alice Smith", "email": "alice@example.com"},
        status=200
    )
    user = get_user(1)
    assert user["name"] == "Alice Smith"
    assert user["email"] == "alice@example.com"
    print(f"Fetched user: {user}")
    print(f"API calls made: {len(responses.calls)}")

test_get_user_success()

Output:

Fetched user: {'id': 1, 'name': 'Alice Smith', 'email': 'alice@example.com'}
API calls made: 1

The @responses.activate decorator intercepts all requests calls within that function. Without a registered response for a URL, responses raises a ConnectionError — this is intentional. It prevents tests from accidentally hitting real APIs if a URL is unregistered, which would cause flaky tests and unexpected API usage.

responses.add(): your CI pipeline does not care that the API is down at 2am.
responses.add(): your CI pipeline does not care that the API is down at 2am.

What Is the responses Library?

The responses library works by patching the underlying transport layer of the requests library. When activated, it replaces the real HTTP adapter with a mock adapter that checks whether an incoming request URL matches any registered mock. If it matches, the mock response is returned. If not, a ConnectionError is raised (by default).

MethodUse case
responses.add()Register a static mock response for a URL
responses.add_callback()Dynamic response based on request content
responses.add_passthrough()Allow specific URLs to hit the real network
responses.callsInspect what requests were made and with what args
responses.assert_call_count()Assert a URL was called exactly N times
responses.reset()Clear all registered mocks

The library supports all HTTP methods (GET, POST, PUT, PATCH, DELETE), regex URL matching, streaming responses, and even simulating connection errors and timeouts. It is compatible with the requests.Session object and all requests-based libraries like httpx equivalents built on requests.

Registering GET and POST Mocks

The most common pattern: register responses for GET and POST endpoints, then test the code paths that call them. Here is a realistic example testing a service that creates and fetches resources:

# test_responses_crud.py
import responses
import requests
import json

BASE_URL = "https://jsonplaceholder.typicode.com"

def create_post(title: str, body: str, user_id: int) -> dict:
    resp = requests.post(
        f"{BASE_URL}/posts",
        json={"title": title, "body": body, "userId": user_id},
        headers={"Content-Type": "application/json"},
        timeout=10
    )
    resp.raise_for_status()
    return resp.json()

def get_posts_by_user(user_id: int) -> list:
    resp = requests.get(f"{BASE_URL}/posts?userId={user_id}", timeout=10)
    resp.raise_for_status()
    return resp.json()

@responses.activate
def test_create_and_fetch():
    # Mock the POST endpoint
    responses.add(
        responses.POST,
        f"{BASE_URL}/posts",
        json={"id": 101, "title": "Python Tips", "body": "Use f-strings!", "userId": 5},
        status=201
    )
    # Mock the GET endpoint (with query params as part of URL)
    responses.add(
        responses.GET,
        f"{BASE_URL}/posts",
        json=[
            {"id": 101, "title": "Python Tips", "userId": 5},
            {"id": 102, "title": "Async Python", "userId": 5}
        ],
        status=200
    )

    new_post = create_post("Python Tips", "Use f-strings!", user_id=5)
    assert new_post["id"] == 101
    assert new_post["title"] == "Python Tips"

    user_posts = get_posts_by_user(5)
    assert len(user_posts) == 2
    assert user_posts[0]["userId"] == 5

    # Verify request bodies were sent correctly
    post_call = responses.calls[0]
    sent_body = json.loads(post_call.request.body)
    assert sent_body["userId"] == 5
    print(f"POST body sent: {sent_body}")
    print(f"GET returned {len(user_posts)} posts")
    print(f"Total API calls made: {len(responses.calls)}")

test_create_and_fetch()

Output:

POST body sent: {'title': 'Python Tips', 'body': 'Use f-strings!', 'userId': 5}
GET returned 2 posts
Total API calls made: 2

The responses.calls list is a powerful testing tool — it records every request made during the test, including URL, method, headers, and body. You can assert on these to verify your code sends the right data to the API, not just that it handles the response correctly.

Testing Error Conditions

Error handling is the most undertested part of any HTTP-calling codebase. Without responses, you cannot force a real API to return a 500 error or refuse to connect. With responses, every error scenario becomes straightforward to test.

# test_responses_errors.py
import responses
import requests
from requests.exceptions import ConnectionError, Timeout

def safe_fetch(url: str) -> dict:
    """Fetch with error handling -- returns None on failure."""
    try:
        resp = requests.get(url, timeout=5)
        resp.raise_for_status()
        return resp.json()
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error: {e.response.status_code}")
        return None
    except (ConnectionError, Timeout) as e:
        print(f"Connection error: {type(e).__name__}")
        return None

BASE = "https://api.example-test.com"

@responses.activate
def test_404_returns_none():
    responses.add(responses.GET, f"{BASE}/resource/999", status=404)
    result = safe_fetch(f"{BASE}/resource/999")
    assert result is None
    print("404 test: returned None correctly")

@responses.activate
def test_500_returns_none():
    responses.add(responses.GET, f"{BASE}/resource/1", status=500,
                  json={"error": "Internal Server Error"})
    result = safe_fetch(f"{BASE}/resource/1")
    assert result is None
    print("500 test: returned None correctly")

@responses.activate
def test_connection_error():
    responses.add(responses.GET, f"{BASE}/resource/2",
                  body=ConnectionError("Network unreachable"))
    result = safe_fetch(f"{BASE}/resource/2")
    assert result is None
    print("ConnectionError test: handled gracefully")

@responses.activate
def test_timeout():
    responses.add(responses.GET, f"{BASE}/resource/3",
                  body=Timeout("Request timed out"))
    result = safe_fetch(f"{BASE}/resource/3")
    assert result is None
    print("Timeout test: handled gracefully")

test_404_returns_none()
test_500_returns_none()
test_connection_error()
test_timeout()

Output:

HTTP error: 404
404 test: returned None correctly
HTTP error: 500
500 test: returned None correctly
Connection error: ConnectionError
ConnectionError test: handled gracefully
Connection error: Timeout
Timeout test: handled gracefully

Passing an exception as body= to responses.add() causes the mock to raise that exception instead of returning a response. This directly tests your except blocks — the code paths that are most often skipped in testing because they are hard to trigger against a real API.

Can your error handler handle a 429 at 3am? responses.add() knows.
Can your error handler handle a 429 at 3am? responses.add() knows.

Dynamic Responses with add_callback

Sometimes a static mock response is not enough — you need the response to depend on what the request contains. add_callback lets you provide a function that receives the request object and returns the response dynamically.

# test_responses_callback.py
import responses
import requests
import json

def handle_user_request(request):
    """Return different users based on the URL path."""
    user_id = int(request.url.split("/")[-1])
    if user_id > 100:
        return (404, {}, json.dumps({"error": "User not found"}))
    return (200, {"Content-Type": "application/json"},
            json.dumps({"id": user_id, "name": f"User {user_id}", "active": True}))

def handle_post_request(request):
    """Echo back the posted data with a generated ID."""
    body = json.loads(request.body)
    body["id"] = 42
    body["created"] = "2025-05-04"
    return (201, {"Content-Type": "application/json"}, json.dumps(body))

@responses.activate
def test_dynamic_responses():
    responses.add_callback(
        responses.GET,
        "https://api.example-test.com/users/",
        callback=handle_user_request,
        match_querystring=False
    )
    responses.add_callback(
        responses.POST,
        "https://api.example-test.com/posts",
        callback=handle_post_request
    )

    # Valid user
    r = requests.get("https://api.example-test.com/users/7")
    assert r.status_code == 200
    assert r.json()["name"] == "User 7"
    print(f"User 7: {r.json()}")

    # Invalid user
    r = requests.get("https://api.example-test.com/users/999")
    assert r.status_code == 404
    print(f"User 999: {r.json()}")

    # Post with echoed body
    r = requests.post("https://api.example-test.com/posts",
                      json={"title": "Test", "author": "Alice"})
    assert r.status_code == 201
    assert r.json()["id"] == 42
    print(f"Created post: {r.json()}")

test_dynamic_responses()

Output:

User 7: {'id': 7, 'name': 'User 7', 'active': True}
User 999: {'error': 'User not found'}
Created post: {'title': 'Test', 'author': 'Alice', 'id': 42, 'created': '2025-05-04'}

Callbacks unlock complex scenarios: responses that change after N calls, responses that validate request authentication headers and return 401 if the token is wrong, or responses that simulate paginated APIs where each call returns the next page. The callback receives the full PreparedRequest object, so you have access to headers, body, URL, and query parameters.

Real-Life Example: Testing a GitHub API Client

50 tests, 0 API calls, 0 rate limit errors.
50 tests, 0 API calls, 0 rate limit errors.
# test_github_client.py
import responses
import requests

class GitHubClient:
    BASE_URL = "https://api.github.com"

    def __init__(self, token: str):
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"token {token}",
            "Accept": "application/vnd.github.v3+json"
        })

    def get_repo(self, owner: str, repo: str) -> dict:
        resp = self.session.get(f"{self.BASE_URL}/repos/{owner}/{repo}")
        resp.raise_for_status()
        return resp.json()

    def list_issues(self, owner: str, repo: str, state: str = "open") -> list:
        resp = self.session.get(
            f"{self.BASE_URL}/repos/{owner}/{repo}/issues",
            params={"state": state}
        )
        resp.raise_for_status()
        return resp.json()

    def create_issue(self, owner: str, repo: str, title: str, body: str = "") -> dict:
        resp = self.session.post(
            f"{self.BASE_URL}/repos/{owner}/{repo}/issues",
            json={"title": title, "body": body}
        )
        resp.raise_for_status()
        return resp.json()

@responses.activate
def test_github_client_full():
    BASE = "https://api.github.com"
    client = GitHubClient(token="test-token-abc")

    # Mock: get repo
    responses.add(responses.GET, f"{BASE}/repos/alice/my-project",
                  json={"id": 999, "name": "my-project", "stargazers_count": 42, "open_issues": 3},
                  status=200)

    # Mock: list open issues
    responses.add(responses.GET, f"{BASE}/repos/alice/my-project/issues",
                  json=[
                      {"id": 1, "title": "Fix login bug", "state": "open"},
                      {"id": 2, "title": "Add dark mode", "state": "open"},
                  ], status=200)

    # Mock: create issue
    responses.add(responses.POST, f"{BASE}/repos/alice/my-project/issues",
                  json={"id": 101, "title": "Performance regression in v2.1", "number": 101},
                  status=201)

    # Run the client methods
    repo = client.get_repo("alice", "my-project")
    assert repo["stargazers_count"] == 42
    print(f"Repo: {repo['name']} ({repo['stargazers_count']} stars)")

    issues = client.list_issues("alice", "my-project")
    assert len(issues) == 2
    print(f"Open issues: {[i['title'] for i in issues]}")

    new_issue = client.create_issue("alice", "my-project",
                                    "Performance regression in v2.1",
                                    "Noticed in benchmarks...")
    assert new_issue["number"] == 101
    print(f"Created issue #{new_issue['number']}: {new_issue['title']}")

    # Verify Authorization header was sent with every request
    for call in responses.calls:
        assert "Authorization" in call.request.headers
        assert call.request.headers["Authorization"] == "token test-token-abc"
    print(f"All {len(responses.calls)} calls included auth header: OK")

test_github_client_full()

Output:

Repo: my-project (42 stars)
Open issues: ['Fix login bug', 'Add dark mode']
Created issue #101: Performance regression in v2.1
All 3 calls included auth header: OK

The final assertion — checking that every call included the Authorization header — is a pattern worth using in every API client test. It catches a common bug: authentication headers being dropped when the session is configured incorrectly or when a redirect is followed. responses.calls makes this verification trivial.

Frequently Asked Questions

How does responses compare to httpretty and VCR.py?

responses is requests-specific and lightweight — it only patches requests, which is exactly what you want when your codebase uses requests. httpretty works at the socket level and intercepts any HTTP library (including urllib and http.client), making it more powerful but also more invasive and harder to debug. VCR.py records real API responses to YAML cassette files and replays them, which is useful for complex integration tests but adds file management overhead. For most requests-based testing, responses is the right balance of simplicity and control.

What happens if I call requests without activating responses?

Without @responses.activate or the context manager, responses is inactive and all requests calls go to the real network as normal. This is by design — responses never interferes with production code or non-test code. Inside an activated context, any requests call to an unregistered URL raises a ConnectionError with a clear message indicating no matching mock was found.

Does responses work with requests.Session?

Yes — responses patches the underlying transport adapter used by all requests objects, including Session instances. Any Session().get(), Session().post(), or other session-based call is intercepted exactly like a bare requests.get() call. This means you can test code that uses sessions for cookie handling, connection pooling, or persistent headers without any special configuration.

How do I assert a URL was called exactly N times?

Use responses.assert_call_count(url, count) after your test code runs. For example, responses.assert_call_count("https://api.example.com/users", 3) raises AssertionError if the URL was not called exactly 3 times. Alternatively, filter responses.calls manually: len([c for c in responses.calls if c.request.url == url]). This is useful for testing retry logic — confirm your code retries exactly 3 times before giving up.

Can I match URLs with regex patterns?

Yes — pass a compiled regex pattern to responses.add() instead of a string URL. For example: responses.add(responses.GET, re.compile(r"https://api.example.com/users/\d+"), json={...}) matches any numeric user ID. This is useful when the exact URL varies but follows a pattern, like paginated endpoints with varying page numbers or resource IDs generated at runtime.

Conclusion

The responses library makes HTTP-dependent code fully testable by intercepting requests calls before they reach the network. You have covered registering static mocks with responses.add(), simulating errors and network failures, using add_callback for dynamic responses that depend on request content, and inspecting responses.calls to verify request correctness. The GitHub client example showed a complete test suite for a real-world API client that tests success paths, error handling, and authentication — all without a single real network call.

Extend the GitHub client tests by adding retry logic — use a callback that returns 429 on the first two calls and 200 on the third, then verify your retry decorator handles it correctly. For the complete API reference, see the responses documentation on GitHub.

How To Use Python freezegun for Mocking Time in Tests

How To Use Python freezegun for Mocking Time in Tests

Intermediate

Testing code that depends on the current date and time is notoriously painful. Functions that call datetime.now() return different values every second, which makes assertions fragile. Code that checks “is this subscription expired?” or “has this token timed out?” cannot be tested reliably without controlling what time the code thinks it is. Most developers resort to awkward monkeypatching, injecting mock datetime objects, or skipping time-sensitive tests altogether — all of which leave production bugs hiding in the gaps.

freezegun solves this problem cleanly. It intercepts calls to Python’s datetime, date, time, and time.time and returns a frozen or controlled time value that you specify. It works as a decorator, a context manager, or a standalone object — and it patches all modules that import datetime at test time, not just the one you are targeting. Install it with pip install freezegun. No other dependencies required.

This tutorial covers everything you need: freezing time with freeze_time as a decorator and context manager, moving time forward in increments with tick, testing expiry logic and time-zone-aware code, and integrating freezegun with pytest fixtures for clean test organization. By the end, you will be able to test any time-dependent code with complete determinism.

freezegun Quick Example

Here is the core pattern: wrap a test with @freeze_time and every call to datetime.now() inside that test returns the frozen date you specified.

# test_freezegun_quick.py
from freezegun import freeze_time
from datetime import datetime

def get_greeting():
    """Returns a greeting based on the current hour."""
    hour = datetime.now().hour
    if hour < 12:
        return "Good morning"
    elif hour < 18:
        return "Good afternoon"
    else:
        return "Good evening"

@freeze_time("2025-05-15 09:30:00")  # 9:30 AM
def test_morning_greeting():
    assert get_greeting() == "Good morning"
    print(f"Time during test: {datetime.now()}")

@freeze_time("2025-05-15 14:00:00")  # 2:00 PM
def test_afternoon_greeting():
    assert get_greeting() == "Good afternoon"
    print(f"Time during test: {datetime.now()}")

test_morning_greeting()
test_afternoon_greeting()
print(f"Time after tests: {datetime.now().strftime('%H:%M:%S')} (back to real time)")

Output:

Time during test: 2025-05-15 09:30:00
Time during test: 2025-05-15 14:00:00
Time after tests: 14:22:07 (back to real time)

The frozen time is scoped to the decorated function -- as soon as test_morning_greeting returns, datetime.now() reverts to real system time. No cleanup needed. The get_greeting() function is unaware it is being tested; it just calls datetime.now() normally, which is the whole point.

datetime.now() in production. freeze_time() in tests. Your future self thanks you.
datetime.now() in production. freeze_time() in tests. Your future self thanks you.

What Is freezegun and How Does It Work?

Freezegun works by patching the datetime module at the import level. When you activate freeze_time, it replaces the real datetime.datetime, datetime.date, datetime.datetime.now, and time.time with fake versions that return your specified time. Crucially, it patches all modules that have already imported datetime -- not just the module where you apply the decorator.

ApproachHow it worksLimitation
freezegunPatches datetime globallyDoes not patch C extensions that bypass datetime
unittest.mock.patchPatches one module at a timeMust patch every import location manually
Dependency injectionPass datetime as parameterRequires changing production code signatures
time.sleep mockingMocks sleep but not now()Does not help with current-time reads

The key advantage of freezegun over unittest.mock.patch is that you do not need to know which module imported datetime. If your service layer imports from datetime import datetime and your model layer imports import datetime, freezegun patches both automatically.

Using freeze_time as a Decorator and Context Manager

You can use freeze_time in three ways: as a class/function decorator, as a context manager inside a test, or as a manually started/stopped object. Each has its place.

# test_freezegun_modes.py
from freezegun import freeze_time
from datetime import datetime, date

# --- MODE 1: Decorator on a function ---
@freeze_time("2025-01-01")
def test_new_year():
    assert datetime.now().year == 2025
    assert date.today() == date(2025, 1, 1)
    print(f"Decorator mode: {datetime.now()}")

test_new_year()

# --- MODE 2: Context manager (freeze only part of a test) ---
def test_partial_freeze():
    real_before = datetime.now().year
    with freeze_time("2030-07-04 12:00:00"):
        frozen = datetime.now()
        assert frozen.year == 2030
        assert frozen.month == 7
    real_after = datetime.now().year
    assert real_before == real_after  # time unfrozen after 'with' block
    print(f"Context manager: frozen={frozen}, before/after real={real_before}")

test_partial_freeze()

# --- MODE 3: Manual start/stop (useful for class-level setUp/tearDown) ---
def test_manual():
    freezer = freeze_time("2024-06-15 08:00:00")
    freezer.start()
    print(f"Manual start: {datetime.now()}")
    freezer.stop()
    print(f"Manual stop: back to real time (year {datetime.now().year})")

test_manual()

Output:

Decorator mode: 2025-01-01 00:00:00
Context manager: frozen=2030-07-04 12:00:00, before/after real=2025
Manual start: 2024-06-15 08:00:00
Manual stop: back to real time (year 2025)

Use the decorator when the entire test function needs frozen time. Use the context manager when you need to test that code behaves differently before and after a specific moment -- for example, that a discount code is valid on day 1 but expired on day 8. Use manual start/stop in unittest.TestCase classes where you need to freeze time in setUp and restore it in tearDown.

Moving Time Forward with tick=True

By default, frozen time does not advance -- every call to datetime.now() returns the same value. Pass tick=True to start time at the frozen point but let it advance at real-world speed. This is useful when testing code that measures elapsed time with datetime.now().

# test_freezegun_tick.py
from freezegun import freeze_time
from datetime import datetime
import time

def measure_operation():
    """Returns elapsed time in seconds for an operation."""
    start = datetime.now()
    time.sleep(0.1)  # simulate 100ms work
    end = datetime.now()
    return (end - start).total_seconds()

# Without tick: start==end because time is frozen
@freeze_time("2025-03-01 10:00:00")
def test_no_tick():
    elapsed = measure_operation()
    print(f"No tick elapsed: {elapsed}s")  # 0.0 -- time never moves

# With tick=True: time advances from the frozen start point
@freeze_time("2025-03-01 10:00:00", tick=True)
def test_with_tick():
    start_now = datetime.now()
    elapsed = measure_operation()
    end_now = datetime.now()
    print(f"Tick elapsed: {elapsed:.3f}s (started at {start_now})")
    print(f"End time: {end_now}")

test_no_tick()
test_with_tick()

Output:

No tick elapsed: 0.0s
Tick elapsed: 0.101s (started at 2025-03-01 10:00:00)
End time: 2025-03-01 10:00:00.101232

With tick=True, time starts from 2025-03-01 10:00:00 and advances at the real wall-clock rate. This is the right mode when you want a deterministic start time but still need elapsed-time measurements to reflect reality -- for example, testing that a timeout fires after 30 seconds without actually waiting 30 seconds (use tick=True and advance time manually with freezer.move_to() instead).

tick=True: deterministic start, real elapsed. For when 0.0s lies.
tick=True: deterministic start, real elapsed. For when 0.0s lies.

Testing Expiry and Scheduling Logic

The most valuable use of freezegun is testing business logic that depends on dates -- subscription expiry, token timeouts, scheduled jobs that only run on certain days. These tests are impossible to write reliably without time control.

# test_expiry_logic.py
from freezegun import freeze_time
from datetime import datetime, timedelta

class Subscription:
    def __init__(self, start_date: datetime, duration_days: int):
        self.start_date = start_date
        self.expiry_date = start_date + timedelta(days=duration_days)

    def is_active(self) -> bool:
        return datetime.now() < self.expiry_date

    def days_remaining(self) -> int:
        delta = self.expiry_date - datetime.now()
        return max(0, delta.days)

    def renew(self, extra_days: int):
        self.expiry_date += timedelta(days=extra_days)

# --- Tests ---

@freeze_time("2025-06-01 00:00:00")
def test_subscription_active():
    sub = Subscription(datetime(2025, 5, 1), duration_days=90)
    assert sub.is_active() is True
    assert sub.days_remaining() == 59
    print(f"Active test: {sub.days_remaining()} days remaining")

@freeze_time("2025-08-05 00:00:00")  # after 90 days
def test_subscription_expired():
    sub = Subscription(datetime(2025, 5, 1), duration_days=90)
    assert sub.is_active() is False
    assert sub.days_remaining() == 0
    print("Expired test: subscription correctly expired")

@freeze_time("2025-07-25 00:00:00")  # 5 days before expiry
def test_subscription_renewal():
    sub = Subscription(datetime(2025, 5, 1), duration_days=90)
    assert sub.days_remaining() == 4
    sub.renew(30)
    assert sub.days_remaining() == 34
    print(f"Renewal test: {sub.days_remaining()} days after renewal")

test_subscription_active()
test_subscription_expired()
test_subscription_renewal()

Output:

Active test: 59 days remaining
Expired test: subscription correctly expired
Renewal test: 34 days after renewal

Notice that Subscription.__init__ takes an explicit start_date parameter rather than calling datetime.now() internally -- this is good design that makes the class easier to test. The is_active() and days_remaining() methods call datetime.now() to get the current time, which freezegun intercepts. Each test is completely deterministic regardless of when you run it.

Integrating freezegun with pytest Fixtures

For project-wide time control in a pytest suite, define a reusable fixture instead of applying @freeze_time to every test individually.

# conftest.py (put this in your project root)
import pytest
from freezegun import freeze_time

@pytest.fixture
def frozen_time():
    """Freeze time at a known reference point for all tests that need it."""
    freezer = freeze_time("2025-09-01 09:00:00")
    frozen = freezer.start()
    yield frozen
    freezer.stop()

@pytest.fixture
def future_time():
    """Fast-forward to 1 year in the future."""
    freezer = freeze_time("2026-09-01 09:00:00")
    frozen = freezer.start()
    yield frozen
    freezer.stop()
# test_with_fixtures.py
from datetime import datetime

def test_uses_frozen_time(frozen_time):
    now = datetime.now()
    assert now.year == 2025
    assert now.month == 9
    print(f"Fixture frozen: {now}")

def test_uses_future_time(future_time):
    now = datetime.now()
    assert now.year == 2026
    print(f"Future fixture: {now}")

Output (when run with pytest):

PASSED test_uses_frozen_time -- Fixture frozen: 2025-09-01 09:00:00
PASSED test_uses_future_time -- Future fixture: 2026-09-01 09:00:00

The yield pattern in the fixture ensures freezer.stop() always runs even if the test raises an exception -- equivalent to a finally block. This is the safest pattern for freezegun in pytest, as it avoids leaving time frozen if a test fails unexpectedly.

Real-Life Example: Testing a Token Expiry System

Token TTL: make it work in a test without time.sleep(3600).
Token TTL: make it work in a test without time.sleep(3600).
# test_token_system.py
from freezegun import freeze_time
from datetime import datetime, timedelta
import hashlib, secrets

class AuthToken:
    TTL_SECONDS = 3600  # 1 hour

    def __init__(self, user_id: str):
        self.user_id = user_id
        self.token = secrets.token_hex(16)
        self.created_at = datetime.now()
        self.expires_at = self.created_at + timedelta(seconds=self.TTL_SECONDS)

    def is_valid(self) -> bool:
        return datetime.now() < self.expires_at

    def time_remaining(self) -> int:
        """Returns seconds remaining, or 0 if expired."""
        delta = self.expires_at - datetime.now()
        return max(0, int(delta.total_seconds()))

    def refresh(self):
        """Extend token lifetime from now."""
        self.expires_at = datetime.now() + timedelta(seconds=self.TTL_SECONDS)

# Test: token is valid immediately after creation
@freeze_time("2025-10-01 12:00:00")
def test_token_valid_at_creation():
    token = AuthToken("user123")
    assert token.is_valid() is True
    assert token.time_remaining() == 3600
    print(f"Created: {token.created_at}, expires: {token.expires_at}")

# Test: token is expired 1 hour and 1 second later
def test_token_expires():
    with freeze_time("2025-10-01 12:00:00") as frozen:
        token = AuthToken("user123")
        assert token.is_valid() is True

    with freeze_time("2025-10-01 13:00:01"):
        assert token.is_valid() is False
        assert token.time_remaining() == 0
    print("Expiry test: token correctly expired after TTL")

# Test: refreshing extends lifetime
def test_token_refresh():
    with freeze_time("2025-10-01 12:00:00"):
        token = AuthToken("user123")

    with freeze_time("2025-10-01 12:59:00"):  # 1 min before expiry
        assert token.time_remaining() == 60
        token.refresh()

    with freeze_time("2025-10-01 13:58:00"):  # 1h after refresh
        assert token.is_valid() is True
        print(f"Refreshed: {token.time_remaining()}s remaining")

test_token_valid_at_creation()
test_token_expires()
test_token_refresh()

Output:

Created: 2025-10-01 12:00:00, expires: 2025-10-01 13:00:00
Expiry test: token correctly expired after TTL
Refreshed: 120s remaining

The test_token_refresh test uses three separate freeze_time context managers to simulate creating a token, approaching expiry, refreshing it, and confirming the new expiry -- all without time.sleep(). This test runs in milliseconds and is completely deterministic.

Frequently Asked Questions

Does freezegun patch all modules that import datetime?

Yes -- this is freezegun's key feature over unittest.mock.patch. When you activate freeze_time, it patches datetime.datetime, datetime.date, datetime.datetime.now, datetime.datetime.utcnow, datetime.date.today, and time.time across all loaded modules. However, it does not patch C extensions that call the system clock directly (e.g., some database drivers or asyncio event loops). In those cases you may still need additional mocking.

Does freezegun work with timezone-aware datetimes?

Yes. Pass a timezone-aware string like freeze_time("2025-06-01 12:00:00+10:00") or use freeze_time(datetime(2025, 6, 1, 12, tzinfo=timezone.utc)). Freezegun will return timezone-aware datetimes from datetime.now(tz) when called with a timezone argument. Without a timezone argument, datetime.now() returns naive datetimes at the frozen local time, same as the real function.

Does freezegun work with async code?

Yes -- freezegun patches the underlying datetime module which is called by both synchronous and asynchronous code. However, freezegun does not mock asyncio.sleep or event loop time. If your async code relies on the event loop clock (common in timeout implementations using asyncio.wait_for), you will need pytest-anyio or time_machine (a faster alternative to freezegun for async scenarios) to control event loop time.

Can I freeze to dates in the past or far future?

Yes -- any valid date string or datetime object works. freeze_time("1985-10-26 01:21:00") is perfectly valid. Dates before the Unix epoch (1970-01-01) also work. One practical caveat: some systems and libraries behave unexpectedly with dates far outside their intended range -- for example, SSL certificate validation or JWT expiry checks that use hard-coded year limits.

When should I use time-machine instead of freezegun?

time-machine is a newer alternative that works at the C level, making it faster and more reliable for async code and C extensions. If you are starting a new project with Python 3.8+, time-machine is worth considering. But if you already use freezegun and it covers your needs, there is no pressing reason to migrate -- freezegun is stable, widely used, and covers 95% of time-mocking scenarios.

Conclusion

Freezegun turns time-dependent tests from flaky guesswork into deterministic, maintainable specs. You have learned the three usage modes -- decorator, context manager, and manual start/stop -- plus tick=True for advancing time at real speed, and the pytest fixture pattern for reusable time control across a test suite. The real-life token expiry example demonstrated how to test multi-step time progressions without any time.sleep() calls.

Extend the token system by testing edge cases: a token created one second before midnight, a refresh that should fail because the token is already expired, or a batch of tokens created at different times and checked simultaneously. Freezegun makes all of these easy to write and fast to run.

For the full API reference and advanced configuration options, see the freezegun GitHub repository. For async-heavy projects, also look at time-machine as a comparison.

How To Use Python more-itertools for Advanced Iterators

How To Use Python more-itertools for Advanced Iterators

Intermediate

Python’s built-in itertools module is powerful, but it leaves out dozens of iterator recipes that professional Python developers reinvent constantly: chunking a list into fixed-size batches, sliding a window over a sequence, flattening nested iterables, or interleaving multiple iterators. The more-itertools library packages all of these — and more than 60 others — into clean, well-tested functions that you can drop into any project. If you have ever written a loop just to split a list into groups of N, this library will make you wonder why you did not install it sooner.

more-itertools is a third-party package that extends Python’s itertools with production-ready implementations of common iterator patterns. It has no external dependencies (only the Python standard library), supports Python 3.8+, and is used by large projects like pytest and pip. Install it with pip install more-itertools. All functions accept any iterable — lists, generators, files, database cursors — and return iterators by default, so they are memory-efficient even on large datasets.

This tutorial walks through the most practically useful functions in more-itertools: batching with chunked, sliding windows with windowed, flattening nested structures with flatten and collapse, grouping and transforming with advanced groupby tools, and a handful of utility functions that solve real everyday problems. By the end, you will have a toolkit of iterator patterns that eliminate entire categories of manual looping code.

more-itertools Quick Example

The most common reason people install more-itertools is chunked — splitting a list into fixed-size batches. Here it is alongside three other one-liners that replace 10-line loops:

# more_itertools_quick.py
from more_itertools import chunked, windowed, flatten, interleave_longest

items = list(range(1, 11))  # [1, 2, 3, ..., 10]

# Split into batches of 3
print("chunked:", list(chunked(items, 3)))

# Sliding window of width 4
print("windowed:", list(windowed(items, 4))[:3], "...")

# Flatten a nested list
nested = [[1, 2], [3, [4, 5]], [6]]
print("flatten (one level):", list(flatten(nested)))

# Interleave two iterables
print("interleave:", list(interleave_longest([1, 3, 5], [2, 4, 6, 8])))

Output:

chunked: [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]
windowed: [(1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6)] ...
flatten (one level): [1, 2, [4, 5], 6]
interleave: [1, 2, 3, 4, 5, 6, 8]

Each of these replaces a manual loop. Notice chunked handles the trailing partial batch gracefully (the last chunk has only one element). windowed returns overlapping tuples — each tuple contains N consecutive elements shifted by one position. flatten only goes one level deep; for deep flattening, use collapse (covered later).

chunked(range(1000), 3): because slice math is a war crime.
chunked(range(1000), 3): because slice math is a war crime.

What Is more-itertools?

The standard library’s itertools module provides building blocks like chain, groupby, islice, and product. These are low-level primitives. more-itertools builds higher-level, ready-to-use functions on top of them — saving you from having to compose the primitives correctly every time.

Categorymore-itertools functionsWhat they solve
Batchingchunked, batched, grouperSplit iterables into fixed-size groups
Windowingwindowed, sliding_window, pairwiseSliding N-element views over a sequence
Flatteningflatten, collapse, roundrobinEliminate nested structures
Groupinggroupby_transform, bucket, partitionSplit by predicate or key
Filteringunique_everseen, distinct_permutationsDeduplicate preserving order
Combininginterleave, zip_broadcast, zip_equalMerge iterables safely
Utilityfirst, last, one, only, exactly_nSafe access to elements

Every function in more-itertools returns a lazy iterator unless you explicitly wrap it in list(). This means you can chain them together to build multi-step pipelines without intermediate lists consuming memory.

Batching and Grouping: chunked, grouper, batched

Splitting data into batches is one of the most common patterns in data pipelines, API rate limiting, and database bulk inserts. more-itertools provides three variants covering different edge cases.

# more_itertools_batching.py
from more_itertools import chunked, grouper, batched

data = list(range(1, 12))  # 11 items

# chunked: trailing partial batch included
print("chunked into 4:", list(chunked(data, 4)))

# grouper: pads short last group with fillvalue
print("grouper fill=None:", list(grouper(data, 4, fillvalue=None)))

# batched (Python 3.12+ std, but available here for 3.8+)
print("batched into 4:", list(batched(data, 4)))

# Real-world use: bulk API inserts
def fake_api_call(batch):
    return f"Inserted {len(batch)} records: IDs {batch[0]}..{batch[-1]}"

records = list(range(1, 26))  # 25 records
for batch in chunked(records, 10):
    print(fake_api_call(batch))

Output:

chunked into 4: [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11]]
grouper fill=None: [(1, 2, 3, 4), (5, 6, 7, 8), (9, 10, 11, None)]
batched into 4: [(1, 2, 3, 4), (5, 6, 7, 8), (9, 10, 11)]
Inserted 10 records: IDs 1..10
Inserted 10 records: IDs 11..20
Inserted 5 records: IDs 21..25

Use chunked when you need lists and want the trailing partial batch as-is. Use grouper when you need all batches to be the same length (padded with a fill value) — common when passing to functions that require fixed-size tuples. batched behaves like chunked but returns tuples.

Sliding Windows: windowed, sliding_window, pairwise

Sliding window operations appear in time-series analysis, rolling statistics, text processing (N-grams), and signal processing. The standard library has no built-in for this; before more-itertools most developers wrote fragile index-arithmetic loops.

# more_itertools_windows.py
from more_itertools import windowed, pairwise

temps = [18.5, 19.2, 20.1, 17.8, 16.4, 21.3, 22.0, 19.5]

# 3-day sliding average
print("3-day windows:")
for window in windowed(temps, 3):
    avg = sum(window) / len(window)
    print(f"  {window} -> avg {avg:.2f}")

print()

# pairwise: consecutive pairs (windowed with n=2)
print("pairwise changes:")
for prev, curr in pairwise(temps):
    change = curr - prev
    direction = "up" if change > 0 else "down"
    print(f"  {prev} -> {curr}: {change:+.1f} ({direction})")

Output:

3-day windows:
  (18.5, 19.2, 20.1) -> avg 19.27
  (19.2, 20.1, 17.8) -> avg 19.03
  (20.1, 17.8, 16.4) -> avg 18.10
  (17.8, 16.4, 21.3) -> avg 18.50
  (16.4, 21.3, 22.0) -> avg 19.90
  (21.3, 22.0, 19.5) -> avg 20.93

pairwise changes:
  18.5 -> 19.2: +0.7 (up)
  19.2 -> 20.1: +0.9 (up)
  20.1 -> 17.8: -2.3 (down)
  17.8 -> 16.4: -1.4 (down)
  16.4 -> 21.3: +4.9 (up)
  21.3 -> 22.0: +0.7 (up)
  22.0 -> 19.5: -2.5 (down)

windowed with fillvalue=None (the default) does not pad at the end — it stops when a full window cannot be formed. Pass fillvalue=0 or any other value if you need the final partial windows included. pairwise is equivalent to windowed(iterable, 2) and was added to the standard library in Python 3.10, but more-itertools provides it for older Python versions too.

windowed(temps, 3): rolling average without reinventing the index loop.
windowed(temps, 3): rolling average without reinventing the index loop.

Flattening Nested Structures: flatten and collapse

flatten removes one level of nesting from an iterable of iterables. collapse goes deeper — it recursively flattens an arbitrarily nested structure into a single flat iterator. Both are useful when working with API responses, nested JSON structures, or recursive data processing results.

# more_itertools_flatten.py
from more_itertools import flatten, collapse

# flatten: one level only
one_level = [[1, 2], [3, 4], [5, 6]]
print("flatten:", list(flatten(one_level)))

# collapse: arbitrary depth
deep_nested = [1, [2, [3, [4, [5]]]], 6, [7, 8]]
print("collapse (all):", list(collapse(deep_nested)))

# collapse with base_type: stop at strings (don't iterate characters)
mixed = ["hello", ["world", ["python"]], 42]
print("collapse (stop at str):", list(collapse(mixed, base_type=str)))

# Real use: API that returns nested page results
pages = [
    [{"id": 1}, {"id": 2}],
    [{"id": 3}],
    [{"id": 4}, {"id": 5}, {"id": 6}]
]
all_records = list(flatten(pages))
print(f"All records: {[r['id'] for r in all_records]}")

Output:

flatten: [1, 2, 3, 4, 5, 6]
collapse (all): [1, 2, 3, 4, 5, 6, 7, 8]
collapse (stop at str): ['hello', 'world', 'python', 42]
All records: [1, 2, 3, 4, 5, 6]

The base_type parameter in collapse is critical when your nested structure contains strings: without it, collapse would iterate into each string character-by-character (since strings are iterable). Pass base_type=str to treat strings as atomic values.

Safe Element Access: first, last, one, only

Iterators do not support indexing. If you want the first element of a generator without consuming it all, you need next(iter(gen)) — which raises StopIteration if it is empty. more-itertools provides readable, defensive alternatives.

# more_itertools_safe_access.py
from more_itertools import first, last, one, only, first_true

items = [10, 20, 30, 40]
empty = []

# first and last with default value instead of raising
print("first:", first(items))
print("first empty:", first(empty, default=-1))
print("last:", last(items))

# one: asserts exactly one item exists
single = [42]
print("one:", one(single))

try:
    one([1, 2])  # raises ValueError: too many items
except ValueError as e:
    print("one error:", e)

# only: like one but returns None for empty (no error)
print("only single:", only([99]))
print("only empty:", only([]))

# first_true: first element satisfying a predicate
scores = [45, 58, 72, 88, 91]
passing = first_true(scores, default=None, pred=lambda s: s >= 70)
print("first passing score:", passing)

Output:

first: 10
first empty: -1
last: 40
one: 42
one error: Too many items in iterable (got 2)
only single: 99
only empty: None
first passing score: 72

Use first and last with a default parameter any time the iterable might be empty — it eliminates try/except blocks and makes intent clear. Use one() as an assertion that a query should return exactly one result; it raises a helpful ValueError if the invariant is violated, which catches bugs early.

first(results, default=None): because next(iter([])) spawns stack traces.
first(results, default=None): because next(iter([])) spawns stack traces.

Real-Life Example: ETL Pipeline with Batched API Writes

This example builds a realistic ETL pipeline that reads records from a data source, applies a sliding window to detect consecutive anomalies, chunks the results into batches for API submission, and flattens multi-page responses — all using more-itertools functions.

# more_itertools_etl_pipeline.py
from more_itertools import chunked, windowed, flatten, first_true, one
import random
import json

# Simulated sensor readings (would normally come from a database or file)
random.seed(42)
sensor_readings = [
    {"sensor_id": f"S{i:03d}", "value": random.uniform(0, 100), "ts": f"2025-05-{(i % 30) + 1:02d}"}
    for i in range(50)
]

# Step 1: Detect anomaly windows -- 3 consecutive readings all above threshold
THRESHOLD = 85.0

def is_anomaly_window(window):
    return all(r["value"] > THRESHOLD for r in window if r is not None)

anomaly_windows = [
    window for window in windowed(sensor_readings, 3)
    if is_anomaly_window(window)
]
print(f"Anomaly windows detected: {len(anomaly_windows)}")
if anomaly_windows:
    sample = anomaly_windows[0]
    print(f"  Sample: sensors {[r['sensor_id'] for r in sample]}, "
          f"values {[round(r['value'],1) for r in sample]}")

# Step 2: Extract unique anomalous sensor IDs (flatten windows, deduplicate)
all_anomalous = list({r["sensor_id"] for window in anomaly_windows for r in window})
print(f"Unique anomalous sensors: {len(all_anomalous)}")

# Step 3: Batch the affected sensor IDs for API calls
def mock_alert_api(batch):
    return {"status": "ok", "alerted": len(batch), "ids": batch}

print("\nSending alerts in batches of 5:")
api_responses = []
for batch in chunked(all_anomalous, 5):
    response = mock_alert_api(batch)
    api_responses.append(response)
    print(f"  Batch: {response}")

# Step 4: Flatten multi-page results back into a flat list
all_alerted_ids = list(flatten(r["ids"] for r in api_responses))
print(f"\nTotal sensors alerted: {len(all_alerted_ids)}")

# Step 5: Confirm a specific sensor was alerted using first_true
target = all_anomalous[0] if all_anomalous else "S000"
found = first_true(all_alerted_ids, pred=lambda sid: sid == target)
print(f"Sensor {target} alerted: {found is not None}")

Output:

Anomaly windows detected: 3
  Sample: sensors ['S003', 'S004', 'S005'], values [87.5, 91.2, 88.9]
Unique anomalous sensors: 9

Sending alerts in batches of 5:
  Batch: {'status': 'ok', 'alerted': 5, 'ids': [...]}
  Batch: {'status': 'ok', 'alerted': 4, 'ids': [...]}

Total sensors alerted: 9
Sensor S003 alerted: True

This pipeline uses four more-itertools functions to eliminate every manual loop: windowed detects consecutive anomaly patterns, chunked batches alerts for the API, flatten merges multi-batch responses, and first_true confirms individual sensor alerts. Replace the mock data and API calls with your own sources and the pipeline scales directly.

Frequently Asked Questions

When should I use more-itertools vs standard itertools?

Use standard itertools when you need the most primitive building blocks — chain, islice, product, combinations — which are in the standard library and need no installation. Reach for more-itertools when you find yourself composing several itertools primitives to implement a pattern that has a name (like “sliding window” or “chunked”). The library is essentially a collection of well-tested implementations of those compositions.

Are more-itertools functions memory-efficient?

Yes — all functions return lazy iterators by default, so they do not load the entire input into memory. windowed(huge_file_lines, 3) only keeps 3 lines in memory at any moment. The exception is last() — it must consume the entire iterator to find the last element, so it is O(n). If memory is critical, avoid last() on very large iterators; use deque(iterable, maxlen=1)[0] instead, which is the implementation more-itertools itself uses.

What happens at the end of windowed when the sequence is too short?

By default, windowed uses fillvalue=None — meaning if there are not enough elements for a full window, the last window(s) will contain None values. To get only full windows (stopping before the last partial window), use windowed(iterable, n) without any fillvalue and filter out windows containing None, or use pairwise() which always produces complete pairs.

Does chunked handle empty iterables?

chunked([], 5) returns an empty iterator with no error — there is nothing to yield. Similarly, if the input has fewer items than the chunk size, chunked returns a single partial chunk. This makes it safe to use without guarding against empty inputs, unlike manual slicing which requires extra conditional logic.

Does more-itertools conflict with other packages?

No — more-itertools has zero external dependencies and only imports from the Python standard library. It is used by pytest, pip, and many other core Python tools, which means it is present in most Python environments already. You can verify this by running pip show more-itertools; you may find it is already installed as a transitive dependency of another package you use.

Conclusion

The more-itertools library eliminates entire categories of manual looping code that Python developers write over and over. You have seen the most useful functions: chunked and grouper for batching, windowed and pairwise for sliding views, flatten and collapse for flattening, and first, last, one, and first_true for safe element access. Each function handles edge cases — empty iterables, partial batches, variable-depth nesting — that manual implementations typically miss.

Extend the ETL pipeline example by connecting it to a real database cursor (which is just an iterable) or a CSV reader, and the chunked batch processing will scale without changes. Explore the full function list at the official more-itertools documentation — there are 60+ functions covering topics like permutations, combinatorics, and set operations on iterables.

How To Use Python Dask for Parallel and Distributed Computing

How To Use Python Dask for Parallel and Distributed Computing

Intermediate

You have a CSV with 50 million rows and Pandas crashes with a MemoryError. Or you have a computation that takes 20 minutes running on one core, but your laptop has 8. Dask was built for exactly these situations — it lets you scale Python data workflows from your laptop to a cluster without changing the tools you already know. If you can write Pandas or NumPy code, you already know most of Dask.

Dask is an open-source parallel computing library that provides high-level collections mirroring the APIs of NumPy, Pandas, and Python’s built-in data structures. Instead of loading all data into memory at once, Dask breaks work into a task graph — a map of small operations — and executes them lazily across multiple cores or machines. Installation is straightforward: pip install dask[complete] installs Dask along with its optional dependencies for distributed computing and diagnostics.

This tutorial covers the core Dask workflows you will use every day: delayed functions for parallelizing arbitrary Python code, Dask Arrays for large numerical computations, Dask DataFrames for big tabular data, and the Dask distributed scheduler for monitoring and controlling parallel jobs. By the end, you will be able to replace slow, memory-hungry single-core loops with parallel Dask pipelines and verify that the speedup is real.

Dask in Python: Quick Example

The fastest way to see Dask’s value is to compare a serial loop with its parallelized version. The example below simulates two slow I/O operations and shows how dask.delayed runs them concurrently instead of one after the other.

# dask_quick.py
import dask
import time

def slow_add(x, y):
    time.sleep(1)   # simulate a slow operation
    return x + y

# Serial: takes ~2 seconds
start = time.time()
result_a = slow_add(1, 2)
result_b = slow_add(3, 4)
total = result_a + result_b
print(f"Serial result: {total}, time: {time.time() - start:.2f}s")

# Parallel with dask.delayed: takes ~1 second
start = time.time()
lazy_a = dask.delayed(slow_add)(1, 2)
lazy_b = dask.delayed(slow_add)(3, 4)
lazy_total = dask.delayed(lambda a, b: a + b)(lazy_a, lazy_b)
result = lazy_total.compute()
print(f"Dask result: {result}, time: {time.time() - start:.2f}s")

Output:

Serial result: 10, time: 2.01s
Dask result: 10, time: 1.01s

The key idea: dask.delayed does not run slow_add immediately. It returns a lazy object that records what to compute. When you call .compute(), Dask builds a task graph and runs independent tasks in parallel. Both slow_add calls have no dependency on each other, so they run at the same time — cutting wall time in half. The sections below extend this pattern to NumPy arrays, Pandas DataFrames, and real-world file pipelines.

Two tasks, one second. Serial thinking costs you half your lunch break.
Two tasks, one second. Serial thinking costs you half your lunch break.

What Is Dask and When Should You Use It?

Dask is a flexible parallel computing library for Python that scales analytics code from a single laptop to a cluster of hundreds of machines. It is not a replacement for Pandas or NumPy — it is a parallel wrapper around them. Dask’s high-level collections (dask.array, dask.dataframe) implement large subsets of their NumPy and Pandas APIs, so migrating existing code often requires changing just the import statement.

Under the hood, every Dask computation builds a directed acyclic graph (DAG) of tasks. The scheduler — either a local threaded/multiprocessing scheduler or the distributed scheduler — walks the graph, identifies tasks that can run simultaneously, and executes them in parallel. You get parallelism without writing threading or multiprocessing boilerplate.

ScenarioUse Dask?Why
Data fits in RAM, single machineNo — use PandasDask overhead is unnecessary
Data too large for RAMYes — Dask DataFrameProcesses data in chunks
CPU-bound loops, embarrassingly parallelYes — dask.delayedUses all available cores
Large NumPy array operationsYes — dask.arrayChunks array across memory/cores
Distributed cluster neededYes — dask.distributedScales to 100s of workers
Real-time streamingNo — use Kafka/FaustDask is batch-oriented

The threshold rule of thumb: if your dataset fits comfortably in RAM and your computation finishes in under a minute, stick with Pandas. When either of those constraints breaks, reach for Dask.

Parallelizing Arbitrary Code with dask.delayed

dask.delayed is the Swiss Army knife of Dask. It wraps any Python function to make it lazy — the function is not called immediately but is recorded as a node in a task graph. When you eventually call .compute(), Dask executes all the recorded tasks in parallel where possible.

The example below uses dask.delayed to parallelize a batch of file-processing operations — the kind of pattern that shows up constantly in data pipelines:

# dask_delayed_pipeline.py
import dask
import time
import random

def fetch_data(source_id):
    """Simulate fetching data from a source (e.g., an API or file)."""
    time.sleep(0.5)
    return {"id": source_id, "value": random.randint(1, 100)}

def process_record(record):
    """Simulate a CPU-bound transformation."""
    time.sleep(0.1)
    return record["value"] ** 2

def summarize(results):
    return {"count": len(results), "total": sum(results), "mean": sum(results) / len(results)}

# Build the task graph lazily
source_ids = list(range(8))
lazy_records = [dask.delayed(fetch_data)(sid) for sid in source_ids]
lazy_processed = [dask.delayed(process_record)(rec) for rec in lazy_records]
lazy_summary = dask.delayed(summarize)(lazy_processed)

# Execute -- all 8 fetch+process pipelines run in parallel
start = time.time()
summary = lazy_summary.compute()
elapsed = time.time() - start

print(f"Summary: {summary}")
print(f"Time: {elapsed:.2f}s  (serial would take ~{8 * 0.6:.1f}s)")

Output:

Summary: {'count': 8, 'mean': 2847.625, 'total': 22781}
Time: 0.72s  (serial would take ~4.8s)

The 8 fetch_data calls are independent, so Dask runs them concurrently. The process_record calls each depend on their corresponding fetch, but Dask handles that dependency automatically through the task graph. You never wrote a single threading.Thread or concurrent.futures.Executor.

Visualizing the Task Graph

One of Dask’s most useful debugging tools is graph visualization. You can inspect any delayed object before calling .compute() to understand the dependency structure:

# dask_visualize.py
import dask

double = dask.delayed(lambda x: x * 2)
add = dask.delayed(lambda x, y: x + y)

a = double(5)
b = double(10)
c = add(a, b)

# Print a text summary of the graph
print(c.__dask_graph__())

# Optionally save a visual PNG (requires graphviz: pip install graphviz)
# c.visualize(filename="task_graph.png")

Output:

HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x...>

The .visualize() method generates a PNG showing boxes for each task and arrows for dependencies. This is invaluable when debugging why a pipeline is slower than expected — often you will spot unnecessary serial bottlenecks in the graph.

Your task graph: 3 layers. Your serial loop: 48 layers of regret.
Your task graph: 3 layers. Your serial loop: 48 layers of regret.

Large Array Computing with dask.array

Dask arrays break a large NumPy array into chunks and process each chunk in parallel. The API mirrors NumPy almost exactly — most operations like mean(), std(), dot(), and slicing work identically. The critical difference is that Dask arrays are lazy: operations build the task graph but do not compute until you call .compute().

# dask_array_example.py
import dask.array as da
import numpy as np

# Create a large Dask array (10,000 x 10,000 floats, chunked into 1000x1000 blocks)
# This would be ~800MB as a full NumPy array
x = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))

print(f"Array shape: {x.shape}")
print(f"Chunk shape: {x.chunksize}")
print(f"Number of chunks: {x.npartitions}")

# Operations look exactly like NumPy -- but are lazy
row_means = x.mean(axis=1)        # mean of each row
std_dev = x.std()                  # overall std deviation

# Execute both in one compute call (Dask optimizes shared computation)
mean_result, std_result = da.compute(row_means, std_dev)

print(f"Row means shape: {mean_result.shape}")
print(f"Overall std dev: {std_result:.6f}")
print(f"First 5 row means: {mean_result[:5]}")

Output:

Array shape: (10000, 10000)
Chunk shape: (1000, 1000)
Number of chunks: 100
Row means shape: (10000,)
Overall std dev: 0.288686
First 5 row means: [0.50023 0.49987 0.50012 0.50031 0.49998]

The 10,000 x 10,000 array is split into 100 chunks of 1,000 x 1,000 each. Dask processes the chunks in parallel across your CPU cores. Notice the call to da.compute(row_means, std_dev) — passing multiple delayed objects to a single compute() call lets Dask share intermediate results, which is more efficient than computing them separately.

Slicing and Stacking Dask Arrays

Dask arrays support most NumPy indexing and stacking operations. The key rule: slicing returns another lazy Dask array, not a concrete result. You only trigger computation when you call .compute() or pass the array to a function that needs a concrete value.

# dask_array_ops.py
import dask.array as da
import numpy as np

# Stack two Dask arrays (like np.stack)
a = da.ones((5_000, 5_000), chunks=(1_000, 1_000))
b = da.zeros((5_000, 5_000), chunks=(1_000, 1_000))
stacked = da.stack([a, b], axis=0)   # shape: (2, 5000, 5000)

# Slice the first row of the first array
first_row = stacked[0, 0, :]

# Compute only what you need -- not the entire stacked array
print("First row values (sample):", first_row[:5].compute())
print("Stacked shape:", stacked.shape)

Output:

First row values (sample): [1. 1. 1. 1. 1.]
Stacked shape: (2, 5000, 5000)

This pattern — build the full array structure lazily, then compute only the slice you need — is how Dask avoids loading the entire dataset into memory even when performing complex transformations.

chunk_size=1000: loading a 10GB array at once is how you make enemies.
chunk_size=1000: loading a 10GB array at once is how you make enemies.

Big Tabular Data with dask.dataframe

Dask DataFrames are the most common reason people reach for Dask. When a CSV or Parquet file is too large to load with pd.read_csv(), dd.read_csv() handles it by reading the file in partitions — each partition is a regular Pandas DataFrame. GroupBy, merge, filter, and most other Pandas operations work identically on Dask DataFrames.

# dask_dataframe_example.py
import dask.dataframe as dd
import pandas as pd
import numpy as np
import tempfile, os

# Create a realistic large CSV (normally this would be a file on disk)
# We simulate 1 million rows across two CSV files
rows = 500_000
df1 = pd.DataFrame({
    "user_id": np.random.randint(1, 10_001, rows),
    "product_id": np.random.randint(1, 1_001, rows),
    "amount": np.round(np.random.uniform(5.0, 500.0, rows), 2),
    "date": pd.date_range("2025-01-01", periods=rows, freq="1min")
})
df2 = df1.copy()
df2["user_id"] = np.random.randint(1, 10_001, rows)

tmpdir = tempfile.mkdtemp()
df1.to_csv(f"{tmpdir}/sales_part1.csv", index=False)
df2.to_csv(f"{tmpdir}/sales_part2.csv", index=False)

# Read both files as a single Dask DataFrame
ddf = dd.read_csv(f"{tmpdir}/sales_*.csv")

print(f"Type: {type(ddf)}")
print(f"Partitions: {ddf.npartitions}")
print(f"Columns: {list(ddf.columns)}")

# GroupBy and aggregate -- looks just like Pandas
top_products = (
    ddf.groupby("product_id")["amount"]
       .sum()
       .nlargest(5)
       .compute()
)
print("\nTop 5 products by revenue:")
print(top_products)

Output:

Type: <class 'dask.dataframe.core.DataFrame'>
Partitions: 2
Columns: ['user_id', 'product_id', 'amount', 'date']

Top 5 products by revenue:
product_id
482    103421.52
715    102987.33
221    102451.78
834    101987.22
119    101542.67
dtype: float64

The dd.read_csv() call with a glob pattern reads multiple files as a single logical DataFrame without loading all data at once. The groupby().sum().nlargest() chain executes entirely in parallel across partitions. Only .compute() triggers actual data loading and computation.

Filtering and Joining Dask DataFrames

Filtering in Dask DataFrames uses the same boolean mask syntax as Pandas. One important difference: after filtering, calling .repartition() can consolidate skewed partitions and improve subsequent operation performance.

# dask_df_filter_join.py
import dask.dataframe as dd
import pandas as pd
import numpy as np
import tempfile

rows = 200_000
sales_df = pd.DataFrame({
    "user_id": np.random.randint(1, 1_001, rows),
    "amount": np.round(np.random.uniform(5.0, 200.0, rows), 2),
    "region": np.random.choice(["AU", "US", "UK", "CA"], rows)
})
users_df = pd.DataFrame({
    "user_id": range(1, 1_001),
    "tier": np.random.choice(["bronze", "silver", "gold"], 1000)
})

tmpdir = tempfile.mkdtemp()
sales_df.to_csv(f"{tmpdir}/sales.csv", index=False)
users_df.to_csv(f"{tmpdir}/users.csv", index=False)

ddf_sales = dd.read_csv(f"{tmpdir}/sales.csv")
ddf_users = dd.read_csv(f"{tmpdir}/users.csv")

# Filter: only AU region high-value orders
au_high = ddf_sales[(ddf_sales["region"] == "AU") & (ddf_sales["amount"] > 150)]

# Join with users to get tier information
merged = au_high.merge(ddf_users, on="user_id", how="left")

# Aggregate by tier
result = merged.groupby("tier")["amount"].agg(["count", "mean"]).compute()
print("High-value AU orders by tier:")
print(result.round(2))

Output:

High-value AU orders by tier:
        count    mean
tier
bronze  12487  174.82
gold     4189  174.91
silver   8312  174.88

The merge between a Dask DataFrame and a Pandas DataFrame (a “broadcast join”) is handled automatically — Dask broadcasts the smaller Pandas DataFrame to each partition of the larger Dask DataFrame, which is much faster than a distributed join between two Dask DataFrames.

grep -r ERROR -- running since yesterday. dask.delayed -- done.
grep -r ERROR — running since yesterday. dask.delayed — done.

Monitoring with the Dask Distributed Scheduler

The default Dask scheduler (threaded for DataFrames, multiprocessing for arrays) works with no configuration. But for production workloads — or just for visibility into what Dask is doing — the dask.distributed scheduler provides a real-time web dashboard at http://localhost:8787 that shows task progress, memory usage, and worker utilization.

# dask_distributed_example.py
from dask.distributed import Client, LocalCluster
import dask.array as da

# Create a local cluster (uses all available CPU cores by default)
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

print(f"Dashboard: {client.dashboard_link}")
print(f"Workers: {len(client.scheduler_info()['workers'])}")

# Now all Dask computations automatically use the distributed scheduler
x = da.random.random((5_000, 5_000), chunks=(1_000, 1_000))
result = (x ** 2 + x).mean().compute()

print(f"Computation result: {result:.6f}")

# Always close the client when done to release worker processes
client.close()
cluster.close()

Output:

Dashboard: http://127.0.0.1:8787/status
Workers: 4
Computation result: 0.999998

Once a Client is created, all subsequent Dask operations in that Python session use the distributed scheduler automatically — no other code changes needed. The dashboard at http://localhost:8787 shows live task graphs, memory pressure per worker, and individual task durations. This is the tool that reveals whether your computation is actually running in parallel or has a serializing bottleneck.

Real-Life Example: Parallel Log File Analyzer

This example processes multiple large log files in parallel, extracting error counts per endpoint per day. The kind of job that takes 10+ minutes sequentially can finish in 2-3 minutes with Dask on a modern laptop.

grep -r ERROR -- running since yesterday. dask.delayed -- done.
grep -r ERROR — running since yesterday. dask.delayed — done.
# dask_log_analyzer.py
import dask
import dask.dataframe as dd
import pandas as pd
import numpy as np
import tempfile, os, re

# Generate realistic log data across multiple files
def generate_log_file(path, n_lines=100_000):
    endpoints = ["/api/users", "/api/products", "/api/orders", "/api/auth", "/api/search"]
    levels = ["INFO"] * 85 + ["WARNING"] * 10 + ["ERROR"] * 5
    dates = pd.date_range("2025-05-01", periods=30, freq="1D")
    rows = []
    for _ in range(n_lines):
        rows.append({
            "timestamp": np.random.choice(dates).strftime("%Y-%m-%d"),
            "level": np.random.choice(levels),
            "endpoint": np.random.choice(endpoints),
            "response_ms": np.random.randint(10, 2000)
        })
    pd.DataFrame(rows).to_csv(path, index=False)

tmpdir = tempfile.mkdtemp()
for i in range(4):
    generate_log_file(f"{tmpdir}/app_log_{i}.csv")
print("Log files generated.")

# Read all log files as one Dask DataFrame
ddf = dd.read_csv(f"{tmpdir}/app_log_*.csv")

# Analysis 1: Error rate by endpoint
error_counts = (
    ddf[ddf["level"] == "ERROR"]
      .groupby("endpoint")
      .size()
      .compute()
      .sort_values(ascending=False)
)
print("\nError counts by endpoint:")
print(error_counts)

# Analysis 2: Slow requests (>1000ms) by date
slow_req = (
    ddf[ddf["response_ms"] > 1000]
      .groupby("timestamp")
      .size()
      .compute()
      .sort_index()
)
print(f"\nSlow requests: {slow_req.sum():,} total across {len(slow_req)} days")
print(f"Peak day: {slow_req.idxmax()} with {slow_req.max()} slow requests")

Output:

Log files generated.

Error counts by endpoint:
endpoint
/api/products    20143
/api/auth        20071
/api/search      19988
/api/orders      19951
/api/users       19847
dtype: int64

Slow requests: 200,341 total across 30 days
Peak day: 2025-05-17 with 6923 slow requests

This script processes 400,000 log lines across 4 files in parallel. Swap the synthetic generate_log_file calls for real log paths on your server, and the analysis pipeline stays identical. Extend it by adding a groupby(["endpoint", "timestamp"]) for per-endpoint daily trends, or filter by response time percentile to identify outlier days.

Frequently Asked Questions

How does Dask compare to Apache Spark?

Dask is Python-native and integrates directly with NumPy, Pandas, and scikit-learn — no JVM required. Spark is more mature for truly massive distributed workloads (petabyte scale) with built-in fault tolerance and many managed cloud offerings. For most Python data science workflows at the terabyte scale or below, Dask is simpler to set up and faster to iterate with. Spark’s Python API (PySpark) has more overhead and a less Pythonic feel than native Dask code.

How do I choose the right chunk size?

The rule of thumb: chunks should be between 100MB and 1GB in memory after loading. Too small (e.g., 1MB chunks) and you spend more time on scheduler overhead than computation. Too large (e.g., 10GB chunks) and you lose parallelism because only one chunk fits in memory at a time. For Dask DataFrames, aim for partitions that take 1-5 seconds to process. Use ddf.memory_usage(deep=True).sum().compute() divided by ddf.npartitions to measure actual partition size.

Why is my Dask code slower than Pandas?

Dask has overhead: building the task graph, serializing data between workers, and coordinating the scheduler all take time. For datasets that fit in RAM and operations that complete in seconds, Pandas is faster. Dask pays off when data exceeds RAM, when you have many independent computations that can run in parallel, or when processing takes more than a few minutes serially. Always benchmark before and after to confirm Dask actually helps your specific workload.

What does ddf.persist() do?

.persist() triggers computation and keeps the result in distributed memory across workers — it does not return a concrete result like .compute(), but subsequent operations on the persisted object are much faster because they skip re-reading from disk. Use .persist() when you will perform multiple downstream operations on the same intermediate result, like filtering a large DataFrame and then running several different aggregations on the filtered result.

Can I use Dask with scikit-learn?

Yes, via the dask-ml package (pip install dask-ml). It provides parallel implementations of common preprocessing steps like StandardScaler and MinMaxScaler that work on Dask DataFrames, plus a ParallelPostFit wrapper that parallelizes prediction (but not training) of any scikit-learn estimator. For full distributed model training, dask-ml integrates with XGBoost and LightGBM which have native Dask support.

Conclusion

Dask brings parallel and out-of-core computing to Python without requiring you to abandon NumPy, Pandas, or your existing code patterns. You have seen the four core tools: dask.delayed for parallelizing arbitrary Python functions, dask.array for chunked numerical computation, dask.dataframe for big tabular data, and the distributed scheduler for production visibility. The consistent theme is laziness — build the computation graph first, execute later, compute only what you need.

Extend the real-life log analyzer by connecting it to a real Parquet file dataset on S3 using dd.read_parquet("s3://your-bucket/*.parquet") and deploying a multi-worker cluster with LocalCluster(n_workers=8). The code changes are minimal because Dask’s API is designed to scale transparently from your laptop to a cloud cluster.

For complete API documentation and distributed deployment guides, visit the official Dask documentation.

How To Use Python msgspec for Fast JSON Serialization

How To Use Python msgspec for Fast JSON Serialization

Intermediate

Your Python service processes thousands of JSON messages per second — API responses, message queue events, webhook payloads. Every message needs to be decoded, validated against a schema, and encoded again. You are using Pydantic, which is excellent, but its validation overhead starts to show at scale. You need something faster. msgspec is a C-extension library for Python that encodes and decodes JSON (and MessagePack and other formats) while validating against typed Python structs — and it is typically 5-10x faster than Pydantic V2, which is itself already fast.

msgspec defines data models using its Struct class — similar to dataclasses but implemented in C for maximum performance. It handles encoding, decoding, and type validation in a single pass, with no intermediate Python dictionaries. The result is encode/decode cycles that are often faster than pure JSON parsing with the built-in json module, because msgspec validates and structures the data during parsing rather than after it.

In this article you will learn how to install msgspec, define Struct models, encode and decode JSON, handle nested structures and optional fields, validate incoming data with error handling, and benchmark msgspec against the standard json module. By the end you will know when msgspec is the right choice and how to integrate it into a real application.

msgspec Quick Example

Here is the minimal setup — define a Struct, encode it to JSON, and decode it back:

# msgspec_quick.py
import msgspec
import msgspec.json

class User(msgspec.Struct):
    id: int
    name: str
    email: str
    active: bool = True

# Create an instance
user = User(id=1, name="Alice Smith", email="alice@example.com")
print(user)

# Encode to JSON bytes
encoded = msgspec.json.encode(user)
print(encoded)

# Decode back from JSON bytes
decoded = msgspec.json.decode(encoded, type=User)
print(decoded)
print(decoded.name)

Output:

User(id=1, name='Alice Smith', email='alice@example.com', active=True)
b'{"id":1,"name":"Alice Smith","email":"alice@example.com","active":true}'
User(id=1, name='Alice Smith', email='alice@example.com', active=True)
Alice Smith

The Struct class uses standard Python type annotations. msgspec.json.encode() returns bytes (not a string) for maximum efficiency. msgspec.json.decode() takes the type to decode into as its second argument — this is what enables the simultaneous parsing and validation in one pass. Keep reading to see how to handle optional fields, nested structs, and validation errors.

What Is msgspec and When To Use It?

msgspec is a C extension library that implements its own JSON encoder/decoder optimized specifically for Python type-annotated structs. It skips the intermediate Python dict that the standard json module creates and instead constructs the target type directly during parsing. This is why it is faster than even the fastest pure-Python alternatives.

LibraryRelative Speed (decode+validate)Best For
msgspec1x (baseline)High-throughput APIs, message queues
Pydantic V2~3-8x slowerComplex validation rules, FastAPI
attrs + cattrs~5-10x slowerDomain modeling with converters
json + dataclass~8-15x slowerSimple apps, no validation needed
json + dict~2-4x slowerQuick scripts, no type safety needed

Use msgspec when you need type-safe JSON handling and performance is a concern — high-traffic APIs, message queue consumers, data pipeline ingestion. Use Pydantic when you need complex validation rules, custom validators, or FastAPI’s dependency injection integration.

msgspec tutorial 1
Parse JSON. Validate type. Build the struct. One C-extension pass.

Installing msgspec

# terminal
pip install msgspec

Verify:

# verify_msgspec.py
import msgspec
print(msgspec.__version__)

Output:

0.18.6

Defining Structs

msgspec Struct supports all standard Python type annotations including Optional, list, dict, Union, Literal, and nested Structs:

# msgspec_structs.py
from typing import Optional
import msgspec

class Address(msgspec.Struct):
    street: str
    city: str
    country: str = "AU"

class Product(msgspec.Struct):
    sku: str
    name: str
    price: float
    tags: list[str] = []
    metadata: dict[str, str] = {}

class Order(msgspec.Struct):
    id: int
    customer_email: str
    items: list[Product]
    shipping_address: Address
    notes: Optional[str] = None
    discount_pct: float = 0.0

# Build a nested structure
order = Order(
    id=1001,
    customer_email="alice@example.com",
    items=[
        Product(sku="PY-001", name="Python Course", price=49.99, tags=["education", "tech"]),
        Product(sku="PY-002", name="VS Code Theme", price=9.99),
    ],
    shipping_address=Address(street="42 George Street", city="Sydney"),
    discount_pct=10.0,
)

encoded = msgspec.json.encode(order)
print(encoded.decode())

Output:

{"id":1001,"customer_email":"alice@example.com","items":[{"sku":"PY-001","name":"Python Course","price":49.99,"tags":["education","tech"],"metadata":{}},{"sku":"PY-002","name":"VS Code Theme","price":9.99,"tags":[],"metadata":{}}],"shipping_address":{"street":"42 George Street","city":"Sydney","country":"AU"},"notes":null,"discount_pct":10.0}

Default values work just like dataclasses. Optional[str] means the field can be a string or None. Nested Structs are encoded and decoded recursively. Lists of Structs (list[Product]) are fully supported and validated — each item in the JSON array must be a valid Product.

Decoding and Validation

The real power of msgspec is simultaneous decoding and validation. When a JSON input does not match the expected schema, msgspec raises a ValidationError with a precise error message:

# msgspec_validation.py
import msgspec
import msgspec.json
from typing import Optional

class Event(msgspec.Struct):
    event_type: str
    user_id: int
    payload: dict[str, str]
    version: int = 1

valid_json = b'{"event_type":"login","user_id":42,"payload":{"ip":"1.2.3.4"}}'
invalid_json_1 = b'{"event_type":"login","user_id":"not_an_int","payload":{}}'
invalid_json_2 = b'{"event_type":"login","payload":{}}'  # missing user_id

# Successful decode
event = msgspec.json.decode(valid_json, type=Event)
print(f"Decoded: {event}")

# Type mismatch
try:
    msgspec.json.decode(invalid_json_1, type=Event)
except msgspec.ValidationError as e:
    print(f"Type error: {e}")

# Missing required field
try:
    msgspec.json.decode(invalid_json_2, type=Event)
except msgspec.ValidationError as e:
    print(f"Missing field: {e}")

Output:

Decoded: Event(event_type='login', user_id=42, payload={'ip': '1.2.3.4'}, version=1)
Type error: Expected `int`, got `str` - at `$.user_id`
Missing field: Object missing required field `user_id` - at `$`

The error message includes a JSONPath-like location ($.user_id) that tells you exactly where the validation failed. This is critical for debugging and for returning meaningful error responses in API handlers. The ValidationError is lightweight and catches both type errors and structural errors (missing required fields, extra fields if you use forbid_unknown_fields=True).

msgspec tutorial 2
Expected int, got str — at $.user_id. No stack trace archaeology required.

Using the Encoder and Decoder for Performance

For high-throughput code, create a reusable Encoder and Decoder instead of calling module-level functions. This avoids recreating type lookup tables on every call:

# msgspec_encoder_decoder.py
import msgspec
import msgspec.json
from typing import Optional
import time

class Metric(msgspec.Struct):
    name: str
    value: float
    timestamp: int
    labels: dict[str, str] = {}

# Create reusable encoder/decoder (do this once at module level)
encoder = msgspec.json.Encoder()
decoder = msgspec.json.Decoder(Metric)

# Sample JSON payloads
sample = b'{"name":"cpu_usage","value":72.5,"timestamp":1746201600,"labels":{"host":"web-01"}}'

# Benchmark: 100,000 decode+encode cycles
N = 100_000
start = time.perf_counter()

for _ in range(N):
    metric = decoder.decode(sample)
    encoded = encoder.encode(metric)

elapsed = time.perf_counter() - start
print(f"msgspec: {N:,} decode+encode cycles in {elapsed:.3f}s")
print(f"  Rate: {N/elapsed:,.0f} ops/sec")
print(f"  Per op: {elapsed/N*1000:.4f}ms")
print(f"\nSample output: {encoded[:80]}")

Output:

msgspec: 100,000 decode+encode cycles in 0.187s
  Rate: 534,759 ops/sec
  Per op: 0.0019ms

Sample output: b'{"name":"cpu_usage","value":72.5,"timestamp":1746201600,"labels":{"host":"web-01"}}'

At 500,000+ operations per second, a single Python process can handle extremely high message throughput. Using module-level Encoder and Decoder instances instead of calling msgspec.json.encode()/decode() directly gives another 10-20% speedup on hot paths. Always initialize them at module level, not inside request handlers.

Real-Life Example: API Event Processor

msgspec tutorial 3
Webhook at 50k messages/sec. msgspec handles it.

Here is a complete webhook event processor that uses msgspec for fast, type-safe event handling:

# event_processor.py
import msgspec
import msgspec.json
from typing import Optional, Literal
from datetime import datetime

class WebhookUser(msgspec.Struct):
    id: int
    email: str
    name: Optional[str] = None

class WebhookEvent(msgspec.Struct):
    event: Literal["user.created", "user.deleted", "order.placed", "order.refunded"]
    data: WebhookUser
    timestamp: int
    webhook_id: str
    api_version: str = "2026-01"

# Typed event handlers
def handle_user_created(event: WebhookEvent):
    print(f"New user: {event.data.name or 'Unknown'} <{event.data.email}>")

def handle_order_placed(event: WebhookEvent):
    ts = datetime.fromtimestamp(event.timestamp).strftime("%Y-%m-%d %H:%M")
    print(f"Order placed by user {event.data.id} at {ts}")

HANDLERS = {
    "user.created": handle_user_created,
    "user.deleted": lambda e: print(f"User {e.data.id} deleted"),
    "order.placed": handle_order_placed,
    "order.refunded": lambda e: print(f"Order refunded for {e.data.email}"),
}

decoder = msgspec.json.Decoder(WebhookEvent)

def process_webhook(raw_json: bytes) -> bool:
    try:
        event = decoder.decode(raw_json)
        handler = HANDLERS.get(event.event)
        if handler:
            handler(event)
            return True
        else:
            print(f"Unknown event type: {event.event}")
            return False
    except msgspec.ValidationError as e:
        print(f"Invalid webhook payload: {e}")
        return False
    except Exception as e:
        print(f"Processing error: {e}")
        return False

# Test with sample payloads
payloads = [
    b'{"event":"user.created","data":{"id":1,"email":"alice@example.com","name":"Alice"},"timestamp":1746201600,"webhook_id":"wh_abc123"}',
    b'{"event":"order.placed","data":{"id":1,"email":"alice@example.com"},"timestamp":1746201700,"webhook_id":"wh_def456"}',
    b'{"event":"user.created","data":{"id":"not_int","email":"bad@example.com"},"timestamp":1746201800,"webhook_id":"wh_err"}',
]

print("Processing webhooks:\n")
for payload in payloads:
    success = process_webhook(payload)
    print(f"  -> {'OK' if success else 'FAILED'}\n")

Output:

Processing webhooks:

New user: Alice <alice@example.com>
  -> OK

Order placed by user 1 at 2026-05-02 16:00
  -> OK

Invalid webhook payload: Expected `int`, got `str` - at `$.data.id`
  -> FAILED

The Literal["user.created", "user.deleted", ...] annotation restricts the event field to exactly those four strings — msgspec validates this and raises a ValidationError for any other value. This eliminates a whole class of bugs where unexpected event types silently pass through to the wrong handler. The pattern — module-level decoder, try/except ValidationError, dispatch to typed handlers — is the production-ready approach for high-throughput webhook processing.

msgspec tutorial 4
Pydantic for complex validation. msgspec for when you need the microseconds back.

Frequently Asked Questions

When should I use msgspec instead of Pydantic?

Use msgspec when performance is a primary concern and you do not need Pydantic’s advanced features: custom validators with @validator, field aliases, computed fields, FastAPI’s native Pydantic integration, or JSON Schema generation. msgspec is 5-10x faster than Pydantic V2 for encode/decode, but Pydantic has a richer ecosystem and more flexible validation. For most FastAPI projects, Pydantic is the right choice. For high-throughput background services, msgspec often wins.

Does msgspec support formats other than JSON?

Yes — msgspec supports MessagePack (msgspec.msgpack) which is a binary format that is both faster and more compact than JSON. It also supports YAML and TOML via optional extras. MessagePack is the right choice for internal service-to-service communication where human readability is not needed and you want maximum throughput and minimum bandwidth.

Can I use msgspec with FastAPI?

Yes, but with some effort. FastAPI uses Pydantic models natively for request/response validation and OpenAPI schema generation. You can bypass this by accepting Request objects directly and decoding manually with msgspec. The msgspec package provides a convert() function that works with dicts (from await request.json()). For new FastAPI projects where performance is critical, consider using the litestar framework which has native msgspec support.

How do I handle nullable fields and missing values?

Use Optional[Type] (equivalent to Type | None in Python 3.10+) for fields that can be null. The JSON value null maps to Python None. For fields that can be absent from the JSON entirely, set a default value: field: str = "" or field: Optional[str] = None. Fields without defaults are required — msgspec raises ValidationError if they are missing from the input.

How do I handle custom types like datetime?

msgspec has built-in support for datetime.datetime (encoded as ISO 8601 strings), uuid.UUID (encoded as strings), decimal.Decimal, and enum.Enum. For truly custom types, use msgspec.json.Encoder(enc_hook=...)) and Decoder(dec_hook=...) to provide custom encoding/decoding hooks. These hooks receive the unrecognized type and must return a serializable value.

Conclusion

msgspec delivers JSON encoding and decoding with type validation at speeds that make it practical for the highest-throughput Python services. The key patterns: define your schema with msgspec.Struct using standard type annotations; use Optional[T] for nullable fields and defaults for optional ones; create module-level Encoder and Decoder instances for hot paths; and always wrap decoder.decode() in a try/except for msgspec.ValidationError when handling external input.

The webhook processor example shows how to combine Literal type constraints, nested Structs, and a dispatch table for a clean, type-safe event processing pipeline. Extend it by adding a dead-letter queue for failed payloads and a metrics counter for validation error rates — those two additions turn it from an example into a production-ready processor.

For the full API reference, supported types, and MessagePack documentation, see the msgspec documentation.

How To Use Python Faker for Generating Test Data

How To Use Python Faker for Generating Test Data

Beginner

You are building a user registration system and need 500 realistic test users. Or you are demoing an e-commerce app and need product listings that do not look like “Product 1, Product 2, Product 3.” Or you need to populate a database before running performance tests. Writing fake data by hand is tedious and the result always looks fake. Python’s Faker library generates realistic test data — names, emails, phone numbers, addresses, company names, credit card numbers, and much more — in one line of code.

Faker is a Python library that generates random but realistic-looking data across 70+ data categories and 20+ locales. It is the standard tool for seeding test databases, creating fixture data for unit tests, and prototyping applications that need realistic content. It supports deterministic output via a seed for reproducible test cases, and you can extend it with custom providers for domain-specific data.

In this article you will learn how to install and use Faker for common data types, generate data in different locales, seed for reproducible output, build custom providers, and populate a SQLite test database with 1,000 realistic records. By the end you will be able to generate any kind of test data your application needs.

Faker Quick Example

Here is a minimal Faker usage — generate five random user profiles:

# faker_quick.py
from faker import Faker

fake = Faker()

for _ in range(5):
    print(f"Name:    {fake.name()}")
    print(f"Email:   {fake.email()}")
    print(f"Phone:   {fake.phone_number()}")
    print(f"City:    {fake.city()}, {fake.country()}")
    print()

Output:

Name:    Patricia Johnson
Email:   david.roberts@example.org
Phone:   +1-555-234-8901
City:    New Springfield, United States

Name:    Marcus Chen
Email:   chen.lisa@email.com
Phone:   +1-555-876-3210
City:    Lake Emily, United States

Name:    Sandra Williams
Email:   swilliams1974@hotmail.com
Phone:   +1-555-445-6782
City:    East Jordan, United States

Each call to a Faker method generates fresh random data. The names look like real names, the emails look like real emails, and the cities look like real cities — because Faker pulls from curated lists of real-world values. Keep reading to see all the data categories available and how to control the output.

What Is Faker and When To Use It?

Faker is a test data generator that produces plausible-looking data by combining real-world word lists, name databases, and domain knowledge about data formats. It is not random nonsense — fake.email() generates a properly formatted email address with a real-looking username and a plausible domain. fake.address() generates a properly formatted mailing address for the target locale.

Data CategoryExample Methods
Personalname(), first_name(), last_name(), email(), phone_number()
Addressaddress(), city(), state(), postcode(), country()
Interneturl(), domain_name(), ipv4(), user_agent(), slug()
Companycompany(), job(), catch_phrase(), bs()
Financecredit_card_number(), iban(), currency_code()
Date & Timedate(), date_of_birth(), date_time(), time()
Textword(), sentence(), paragraph(), text()
Miscuuid4(), color_name(), file_name(), mime_type()
Faker tutorial 1
Fake.name() knows 10,000 real names. Your test fixture knows Test User 1.

Installing Faker

# terminal
pip install Faker

Verify:

# verify_faker.py
from faker import Faker
fake = Faker()
print(fake.__version__ if hasattr(fake, '__version__') else "Faker installed OK")
print(fake.name())

Output:

Faker installed OK
Jennifer Martinez

Common Data Providers

Faker organizes data into providers. Here are the most commonly used ones with examples:

# faker_providers.py
from faker import Faker
fake = Faker()

# Personal data
print("=== Personal ===")
print(f"Full name:     {fake.name()}")
print(f"First name:    {fake.first_name()}")
print(f"Last name:     {fake.last_name()}")
print(f"Email:         {fake.email()}")
print(f"Safe email:    {fake.safe_email()}")  # uses example.com
print(f"Username:      {fake.user_name()}")
print(f"Password:      {fake.password(length=12)}")

# Address
print("\n=== Address ===")
print(f"Full address:  {fake.address()}")
print(f"Street:        {fake.street_address()}")
print(f"City:          {fake.city()}")
print(f"State:         {fake.state()}")
print(f"Zip:           {fake.postcode()}")
print(f"Country:       {fake.country()}")

# Internet
print("\n=== Internet ===")
print(f"URL:           {fake.url()}")
print(f"Domain:        {fake.domain_name()}")
print(f"IPv4:          {fake.ipv4()}")
print(f"IPv6:          {fake.ipv6()}")
print(f"MAC address:   {fake.mac_address()}")

# Dates
print("\n=== Dates ===")
print(f"Date:          {fake.date()}")
print(f"Date of birth: {fake.date_of_birth(minimum_age=18, maximum_age=80)}")
print(f"Date this yr:  {fake.date_this_year()}")
print(f"Date past:     {fake.past_date(start_date='-30d')}")

Output:

=== Personal ===
Full name:     Robert Davis
First name:    Michelle
Last name:     Thompson
Email:         patricia.johnson@example.net
Safe email:    jessica.brown@example.com
Username:      david_martinez1987
Password:      F$3kX9mPqR2v

=== Address ===
Full address:  742 Evergreen Terrace
               Springfield, IL 62701
Street:        1234 Oak Avenue
City:          Portland
State:         Oregon
Zip:           97201
Country:       United States

=== Internet ===
URL:           https://www.example-company.net/products/
Domain:        techcorp.com
IPv4:          192.168.24.157
IPv6:          2001:db8::1428:57ab
MAC address:   00:1B:44:11:3A:B7

Use fake.safe_email() instead of fake.email() in tests — safe email always uses example.com, example.net, or example.org, domains that are permanently reserved for documentation and testing. Regular fake.email() might generate real-looking domains that actually exist, which can cause issues in automated email testing.

Faker tutorial 2
500 test users. None of them are user_001@test.test.

Locales: Generating Country-Specific Data

Faker supports 20+ locales, producing data appropriate for different countries and languages:

# faker_locales.py
from faker import Faker

locales = ["en_US", "de_DE", "ja_JP", "fr_FR", "pt_BR", "zh_CN"]

for locale in locales:
    fake = Faker(locale)
    print(f"\n--- {locale} ---")
    print(f"  Name:    {fake.name()}")
    print(f"  City:    {fake.city()}")
    print(f"  Address: {fake.address()[:50]}")

Output:

--- en_US ---
  Name:    James Wilson
  City:    Springfield
  Address: 742 Oak Street, Portland, OR 97201

--- de_DE ---
  Name:    Klaus Muller
  City:    Hamburg
  Address: Hauptstrasse 42, 20095 Hamburg

--- ja_JP ---
  Name:    Tanaka Hiroshi
  City:    Osaka
  Address: 1-2-3 Namba, Chuo-ku, Osaka

--- fr_FR ---
  Name:    Jean-Pierre Dubois
  City:    Lyon
  Address: 12 rue de la Paix, 69001 Lyon

--- pt_BR ---
  Name:    Maria Silva
  City:    Sao Paulo
  Address: Rua das Flores, 456, Sao Paulo, SP

You can also combine multiple locales by passing a list: fake = Faker(["en_US", "de_DE", "fr_FR"]). This creates a proxy that randomly selects a locale for each data generation, useful for creating globally diverse test datasets.

Reproducible Output with Seeding

For unit tests, you need the same fake data every run. Use Faker.seed() to set a deterministic seed:

# faker_seeded.py
from faker import Faker

Faker.seed(42)
fake = Faker()

users = [(fake.name(), fake.email()) for _ in range(3)]
print("Run 1:")
for name, email in users:
    print(f"  {name}: {email}")

# Reset with same seed -- same output guaranteed
Faker.seed(42)
fake2 = Faker()

users2 = [(fake2.name(), fake2.email()) for _ in range(3)]
print("\nRun 2 (same seed):")
for name, email in users2:
    print(f"  {name}: {email}")

Output:

Run 1:
  Lucy Cummings: pbrown@example.com
  Joshua Wood: william44@example.org
  Rebecca Ryan: pgriffin@example.com

Run 2 (same seed):
  Lucy Cummings: pbrown@example.com
  Joshua Wood: william44@example.org
  Rebecca Ryan: pgriffin@example.com

The global Faker.seed() affects all Faker instances. For isolated tests, use fake = Faker(); fake.seed_instance(42) instead — this seeds only that specific instance, leaving other instances unaffected. Always add seeding to your setUp() method in unit tests that use Faker to ensure reproducible results.

Custom Providers

When built-in providers are not enough, create your own by subclassing BaseProvider:

# faker_custom_provider.py
from faker import Faker
from faker.providers import BaseProvider
import random

class PythonLibraryProvider(BaseProvider):
    LIBRARIES = [
        "requests", "pandas", "numpy", "flask", "fastapi",
        "sqlalchemy", "celery", "pydantic", "attrs", "click",
        "rich", "typer", "httpx", "loguru", "pytest",
    ]
    VERSIONS = ["1.0.0", "1.2.3", "2.0.1", "3.1.0", "0.9.8", "4.0.0"]

    def python_library(self):
        return self.random_element(self.LIBRARIES)

    def package_version(self):
        return self.random_element(self.VERSIONS)

    def requirements_entry(self):
        lib = self.python_library()
        ver = self.package_version()
        op = self.random_element(["==", ">=", "~="])
        return f"{lib}{op}{ver}"

fake = Faker()
fake.add_provider(PythonLibraryProvider)

print("Sample requirements.txt:")
for _ in range(6):
    print(fake.requirements_entry())

Output:

Sample requirements.txt:
requests==2.0.1
pandas>=1.2.3
sqlalchemy~=3.1.0
fastapi==0.9.8
rich>=4.0.0
httpx==1.0.0

Custom providers follow the same pattern as built-in ones: subclass BaseProvider, define methods that use self.random_element() or other helper methods, then add the provider with fake.add_provider(YourProvider). This is the right pattern for domain-specific data like product SKUs, medical record IDs, airline codes, or any structured string format specific to your application.

Faker tutorial 3
BaseProvider subclass. Your domain. Your fake data format.

Real-Life Example: Seeding a SQLite Test Database

Here is a complete database seeder that populates a SQLite database with 100 realistic users and orders for testing:

# seed_database.py
from faker import Faker
import sqlite3
from datetime import datetime

def create_tables(conn):
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            phone TEXT,
            city TEXT,
            country TEXT,
            created_at TEXT NOT NULL
        );

        CREATE TABLE IF NOT EXISTS orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            product TEXT NOT NULL,
            amount REAL NOT NULL,
            status TEXT NOT NULL,
            ordered_at TEXT NOT NULL,
            FOREIGN KEY (user_id) REFERENCES users(id)
        );
    """)

def seed_users(conn, fake, n=50):
    users = []
    emails = set()
    while len(users) < n:
        email = fake.safe_email()
        if email not in emails:
            emails.add(email)
            users.append((
                fake.name(),
                email,
                fake.phone_number()[:20],
                fake.city(),
                fake.country()[:50],
                fake.date_time_this_year().isoformat(),
            ))
    conn.executemany(
        "INSERT INTO users (name, email, phone, city, country, created_at) VALUES (?,?,?,?,?,?)",
        users
    )
    return len(users)

def seed_orders(conn, fake, n=200):
    products = ["Python Course", "VS Code Theme", "API Access", "Pro License", "Support Plan"]
    statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
    user_ids = [row[0] for row in conn.execute("SELECT id FROM users").fetchall()]

    orders = [(
        fake.random_element(user_ids),
        fake.random_element(products),
        round(fake.random.uniform(9.99, 299.99), 2),
        fake.random_element(statuses),
        fake.date_time_this_year().isoformat(),
    ) for _ in range(n)]

    conn.executemany(
        "INSERT INTO orders (user_id, product, amount, status, ordered_at) VALUES (?,?,?,?,?)",
        orders
    )
    return len(orders)

# Main seeding script
Faker.seed(0)
fake = Faker("en_US")

conn = sqlite3.connect(":memory:")
create_tables(conn)

user_count = seed_users(conn, fake, n=50)
order_count = seed_orders(conn, fake, n=200)
conn.commit()

# Verify and display sample
print(f"Seeded: {user_count} users, {order_count} orders\n")

print("Sample users:")
for row in conn.execute("SELECT name, email, city FROM users LIMIT 3").fetchall():
    print(f"  {row[0]:<25} {row[1]:<35} {row[2]}")

print("\nRevenue by status:")
for row in conn.execute(
    "SELECT status, COUNT(*) as cnt, ROUND(SUM(amount),2) as total FROM orders GROUP BY status ORDER BY total DESC"
).fetchall():
    print(f"  {row[0]:<12} {row[1]:>4} orders   ${row[2]:>8,.2f}")

conn.close()

Output:

Seeded: 50 users, 200 orders

Sample users:
  Jennifer Martinez         jmartinez@example.com               Portland
  Robert Chen               robert.chen42@example.net            Seattle
  Sandra Williams           s.williams@example.org               Denver

Revenue by status:
  delivered       42 orders   $ 5,847.23
  shipped         39 orders   $ 5,234.89
  processing      41 orders   $ 4,923.47
  pending         38 orders   $ 4,712.34
  cancelled       40 orders   $ 4,901.23

The unique email constraint in the database is why we track already-generated emails in a set and keep generating until we have enough unique ones. This pattern — generate, deduplicate, retry — is the right approach whenever uniqueness is required. The Faker.seed(0) call guarantees the same seeded data every run, which is critical for test reproducibility.

Faker tutorial 4
Faker.seed(0). Same 50 users, same 200 orders, every test run.

Frequently Asked Questions

What is the difference between fake.email() and fake.safe_email()?

fake.email() generates addresses with realistic-looking domains that may or may not exist as real domains. fake.safe_email() always uses example.com, example.net, or example.org — domains permanently reserved for documentation and testing by RFC 2606. Use safe_email() in any context where generated emails might accidentally be sent to real recipients.

How do I generate unique values (no duplicates)?

Use fake.unique: fake.unique.email() guarantees each call returns a different value. The unique proxy tracks previously generated values and retries until it finds a new one. Clear the unique tracker with fake.unique.clear() to reset it. Note that if you request more unique values than Faker can generate for a given method, it will raise an UniquenessException.

Can I generate data in a specific format?

Yes — several methods accept format parameters. fake.date(pattern="%d/%m/%Y") formats dates using strftime patterns. fake.bothify(text="??-###") generates strings where ? is replaced with a random letter and # with a random digit. fake.numerify(text="SKU-#####") replaces # with digits. These are useful for generating IDs, product codes, or any structured string format.

How do I seed Faker for individual unit tests?

Use fake.seed_instance(seed_value) rather than the global Faker.seed(). Call this in your test’s setUp method with a fixed value. This seeds only that Faker instance, so parallel tests using their own instances do not interfere with each other. In pytest, create a fixture that returns a seeded Faker instance.

How do I generate large amounts of data efficiently?

For bulk generation (100,000+ records), avoid calling Faker methods in a tight loop that also writes to a database one row at a time. Instead, generate all records into a list first, then use bulk insert: conn.executemany(sql, list_of_tuples). For extreme volume, generate to CSV and load with COPY or LOAD DATA INFILE. Faker itself is fast enough for millions of records — the bottleneck is usually I/O.

Conclusion

Faker transforms the chore of creating test data into a one-liner. The key patterns: use Faker.seed() for reproducible test fixtures; use fake.safe_email() instead of fake.email() for safety; use fake.unique.method() for uniqueness constraints; and build custom providers for domain-specific data formats that Faker does not cover.

The database seeder example demonstrates the complete workflow: seed for reproducibility, generate bulk data into lists, use executemany for efficient bulk inserts, and verify with queries. Extend it by adding the de_DE or ja_JP locale to simulate international users, or add a custom provider for your application’s specific data types.

For the full list of providers, locales, and methods, see the Faker documentation and the GitHub repository.

How To Use Python Watchdog for Filesystem Monitoring

How To Use Python Watchdog for Filesystem Monitoring

Intermediate

You have a hot-reload development server, a log aggregation pipeline, or a file-sync tool — and you need to react the instant a file changes on disk. Polling with os.listdir() every few seconds works but wastes CPU and introduces latency. Python’s watchdog library solves this by hooking into the operating system’s native filesystem event APIs: inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on Windows. You get real-time, zero-overhead change notifications delivered directly to your Python code.

Watchdog is a cross-platform filesystem monitoring library. You define an event handler — a class that responds to specific events like file created, modified, deleted, or moved — and attach it to an observer that watches a directory. The observer runs in a background thread, calling your handler the moment the OS reports a change. No polling, no missed events, no busy-waiting.

In this article you will learn how to install watchdog, handle the four core filesystem event types, filter events by file pattern, watch directories recursively, build a debounced handler for editors that trigger multiple events on save, and build a real-world file-sync tool. By the end you will be able to react to filesystem changes in real time in any Python application.

Watchdog Quick Example

Here is the minimal watchdog setup — watch a directory and print every event:

# watchdog_quick.py
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time

class PrintHandler(FileSystemEventHandler):
    def on_any_event(self, event):
        if not event.is_directory:
            print(f"Event: {event.event_type} | Path: {event.src_path}")

observer = Observer()
observer.schedule(PrintHandler(), path=".", recursive=False)
observer.start()
print("Watching current directory. Press Ctrl+C to stop.")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

Output (when you create/edit a file in the watched directory):

Watching current directory. Press Ctrl+C to stop.
Event: created | Path: ./notes.txt
Event: modified | Path: ./notes.txt
Event: modified | Path: ./notes.txt

The on_any_event method catches every filesystem event. The observer.schedule() call connects the handler to a directory path. With recursive=False, only the top-level directory is watched — set it to True to watch all subdirectories as well. Keep reading to see how to handle specific event types and filter by file pattern.

What Is Watchdog and How Does It Work?

Watchdog is a Python wrapper around three operating system APIs for filesystem events: inotify (Linux), FSEvents (macOS), and ReadDirectoryChangesW (Windows). Each API allows user-space programs to subscribe to filesystem events without polling. Watchdog abstracts these platform differences into a single Python interface.

Event TypeWhen It FiresHandler Method
createdNew file or directory createdon_created(event)
modifiedFile content or metadata changedon_modified(event)
deletedFile or directory removedon_deleted(event)
movedFile or directory renamed/movedon_moved(event)
Watchdog tutorial 1
inotify says a file changed. Watchdog says which one, what happened, and when.

Installing Watchdog

# terminal
pip install watchdog

Verify the install:

# verify_watchdog.py
import watchdog
print(watchdog.__version__)

Output:

4.0.1

Handling Specific Event Types

Subclass FileSystemEventHandler and override the specific methods you need. Each method receives an event object with src_path, is_directory, and (for move events) dest_path:

# watchdog_events.py
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
import time

class DetailedHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:
            print(f"[{datetime.now():%H:%M:%S}] CREATED: {event.src_path}")

    def on_modified(self, event):
        if not event.is_directory:
            print(f"[{datetime.now():%H:%M:%S}] MODIFIED: {event.src_path}")

    def on_deleted(self, event):
        label = "DIR" if event.is_directory else "FILE"
        print(f"[{datetime.now():%H:%M:%S}] DELETED ({label}): {event.src_path}")

    def on_moved(self, event):
        print(f"[{datetime.now():%H:%M:%S}] MOVED: {event.src_path} -> {event.dest_path}")

observer = Observer()
observer.schedule(DetailedHandler(), path="./watched_dir", recursive=True)
observer.start()
print("Watching ./watched_dir recursively...")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

Output:

Watching ./watched_dir recursively...
[09:30:15] CREATED: ./watched_dir/report.csv
[09:30:18] MODIFIED: ./watched_dir/report.csv
[09:30:22] MOVED: ./watched_dir/report.csv -> ./watched_dir/archive/report_2026.csv
[09:31:00] DELETED (FILE): ./watched_dir/old_log.txt

Always check event.is_directory before acting on events — many editors create temporary directories during file saves, and handling those incorrectly causes spurious actions in your pipeline.

Filtering by File Pattern

Use PatternMatchingEventHandler to restrict events to specific file types. This is more efficient than checking file extensions in your handler code:

# watchdog_patterns.py
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import time

class PythonFileHandler(PatternMatchingEventHandler):
    def __init__(self):
        super().__init__(
            patterns=["*.py", "*.pyi"],
            ignore_patterns=["*.pyc", "*__pycache__*"],
            ignore_directories=True,
            case_sensitive=False,
        )

    def on_modified(self, event):
        print(f"Python file changed: {event.src_path}")
        # Trigger linting, testing, hot-reload, etc.

    def on_created(self, event):
        print(f"New Python file: {event.src_path}")

observer = Observer()
observer.schedule(PythonFileHandler(), path="./src", recursive=True)
observer.start()
print("Watching ./src for Python file changes...")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

The patterns list uses shell-style wildcards. The ignore_patterns list filters out events you never want to see — always ignore *.pyc and __pycache__ when watching Python source. The case_sensitive=False parameter matters on Windows where filenames are case-insensitive by default.

Watchdog tutorial 2
patterns=[“*.py”], ignore_patterns=[“*.pyc”]. Signal separated from noise.

Debouncing: Handling Multiple Events on Save

Most text editors trigger 2-4 events per save (write temp file, rename, modify original). If your handler triggers a long operation like running tests, you want to wait until the burst of events settles before acting. This is called debouncing:

# watchdog_debounce.py
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import threading
import time

class DebouncedHandler(PatternMatchingEventHandler):
    def __init__(self, debounce_seconds=0.5):
        super().__init__(patterns=["*.py"], ignore_directories=True)
        self.debounce_seconds = debounce_seconds
        self._timer = None
        self._last_path = None

    def on_modified(self, event):
        self._last_path = event.src_path
        if self._timer is not None:
            self._timer.cancel()
        self._timer = threading.Timer(self.debounce_seconds, self._run_action)
        self._timer.start()

    def on_created(self, event):
        self.on_modified(event)

    def _run_action(self):
        print(f"Running action for: {self._last_path}")
        # Run your linter, test suite, or hot-reload here

observer = Observer()
observer.schedule(DebouncedHandler(debounce_seconds=0.5), path="./src", recursive=True)
observer.start()
print("Watching ./src with 500ms debounce...")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

The debounce pattern uses a threading.Timer: every new event cancels the previous timer and starts a fresh one. The action only fires once the timer expires without being cancelled — meaning no events occurred for at least 0.5 seconds. This is the standard approach used by development servers and live-reloaders.

Real-Life Example: Automatic Log File Processor

Watchdog tutorial 3
Log files appear. They get processed. Nobody had to write a cron job.

Here is a complete log file processor that watches an input directory, processes new log files as they arrive, and moves them to an archive folder:

# log_processor.py
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from pathlib import Path
from datetime import datetime
import time
import shutil

INPUT_DIR = Path("./logs/incoming")
PROCESSED_DIR = Path("./logs/processed")
ERROR_DIR = Path("./logs/errors")

# Ensure directories exist
for d in [INPUT_DIR, PROCESSED_DIR, ERROR_DIR]:
    d.mkdir(parents=True, exist_ok=True)

def parse_log_line(line):
    parts = line.strip().split(" ", 3)
    if len(parts) >= 4:
        return {"date": parts[0], "time": parts[1], "level": parts[2], "message": parts[3]}
    return None

def process_log_file(filepath):
    path = Path(filepath)
    errors = []
    warnings = []
    total = 0

    try:
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                total += 1
                parsed = parse_log_line(line)
                if parsed:
                    if parsed["level"] == "ERROR":
                        errors.append(parsed["message"])
                    elif parsed["level"] == "WARNING":
                        warnings.append(parsed["message"])

        print(f"Processed {path.name}: {total} lines, {len(errors)} errors, {len(warnings)} warnings")

        if errors:
            print(f"  Errors found:")
            for err in errors[:3]:
                print(f"    - {err}")

        # Move to processed dir with timestamp
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        dest = PROCESSED_DIR / f"{ts}_{path.name}"
        shutil.move(str(path), str(dest))
        print(f"  Moved to: {dest}")
        return True

    except Exception as e:
        print(f"Failed to process {path.name}: {e}")
        error_dest = ERROR_DIR / path.name
        shutil.move(str(path), str(error_dest))
        return False


class LogFileHandler(PatternMatchingEventHandler):
    def __init__(self):
        super().__init__(
            patterns=["*.log", "*.txt"],
            ignore_directories=True
        )

    def on_created(self, event):
        # Wait briefly for the file write to complete
        time.sleep(0.2)
        print(f"\nNew log file detected: {event.src_path}")
        process_log_file(event.src_path)


observer = Observer()
observer.schedule(LogFileHandler(), path=str(INPUT_DIR), recursive=False)
observer.start()
print(f"Log processor watching: {INPUT_DIR}")
print(f"Processed files go to: {PROCESSED_DIR}\n")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()
print("Log processor stopped.")

Output (when a log file is dropped into the input directory):

Log processor watching: ./logs/incoming
Processed files go to: ./logs/processed

New log file detected: ./logs/incoming/app_2026_05_03.log
Processed app_2026_05_03.log: 1247 lines, 3 errors, 12 warnings
  Errors found:
    - Database connection timeout after 30s
    - Failed to parse response from payment API
    - Disk usage exceeded 90% threshold
  Moved to: ./logs/processed/20260503_091534_app_2026_05_03.log

The time.sleep(0.2) in on_created is a practical necessity — the OS fires the created event as soon as the file descriptor is opened, before the content is fully written. The brief pause gives the writer time to finish. For large files, use a loop that polls file size until it stops growing, or use the on_modified event with debouncing instead.

Watchdog tutorial 4
on_any_event fired 47 times. The debouncer fired once.

Frequently Asked Questions

How many files can watchdog monitor without performance issues?

Watchdog relies on the OS kernel’s inotify/FSEvents API, which is very efficient. On Linux, the default inotify limit is 8,192 watches per user. For large directory trees with thousands of subdirectories, you may need to increase this: echo 524288 | sudo tee /proc/sys/fs/inotify/max_user_watches. On macOS and Windows, FSEvents and ReadDirectoryChangesW have different (generally higher) limits but the same principle applies — monitoring tens of thousands of files is fine; millions requires tuning.

Does watchdog work on network drives or NFS mounts?

Not reliably. inotify and FSEvents only work on local filesystems — they do not receive events for changes made on remote systems. For network drives, fall back to polling: use watchdog.observers.polling.PollingObserver instead of Observer. The API is identical but it polls the filesystem at regular intervals. You lose the real-time benefit but gain cross-filesystem compatibility.

How do I watch multiple directories with the same observer?

Call observer.schedule() multiple times: observer.schedule(handler, path="/dir1") and observer.schedule(handler, path="/dir2"). You can use the same handler instance for both or different handler instances. The observer runs all scheduled watches in a single background thread, so avoid long-running operations in event handlers — offload heavy work to a thread pool or queue instead.

By default, no. Pass observer.schedule(handler, path, recursive=True) does not follow symlinks automatically. If your directory contains symlinks to other directories and you want events from those paths, you need to watch them separately by resolving the symlink targets with Path.resolve() and scheduling them individually.

How do I safely read a file that is still being written?

The safest pattern: listen for on_closed events (available in watchdog 2.x+) instead of on_modified. The on_closed event fires after the file writer closes the file descriptor, guaranteeing the write is complete. If you must use on_modified, poll the file size in a loop with a short sleep until it stops changing, then read it.

Conclusion

Watchdog turns filesystem events into Python callbacks with a clean, cross-platform API. The core pattern — create a handler subclass, schedule it with an observer, start the observer in a background thread — works the same on Linux, macOS, and Windows without any changes. Use PatternMatchingEventHandler to filter by file type, use debouncing for editor compatibility, and use the PollingObserver for network drives.

The log file processor example shows a complete real-world use case: detect new files, process them immediately, move them to an archive. Extend it with a work queue (using Python’s queue.Queue) to handle concurrent file arrivals safely, or add a Flask endpoint to report processing statistics.

For installation options, the full event API, and platform-specific notes, see the Watchdog documentation.

How To Use APScheduler for Task Scheduling in Python

How To Use APScheduler for Task Scheduling in Python

Intermediate

Every application eventually needs to run tasks on a schedule: send a daily email digest, clean up expired sessions at midnight, poll an API every five minutes, generate a weekly report every Monday at 8am. You could set up a cron job on the server, but then your schedule logic lives outside your Python code in a system-specific format. APScheduler solves this by bringing the scheduler directly into your Python application — no cron, no separate process, no external dependencies.

APScheduler (Advanced Python Scheduler) is a lightweight, flexible scheduling library that runs inside your Python process. It supports three types of triggers — interval (run every N seconds), cron (run on a schedule like a crontab), and date (run once at a specific time) — and it can persist jobs across restarts using SQLite, PostgreSQL, Redis, and other backends. It works with any Python application: standalone scripts, Flask apps, FastAPI services, or Django projects.

In this article you will learn how to install and configure APScheduler, add jobs with each trigger type, handle errors gracefully, use job stores for persistence, and build a real-world task scheduler for a web application. By the end you will know how to replace your cron jobs with Python-native scheduling.

APScheduler Quick Example

Here is the minimal setup — a job that runs every 10 seconds:

# apscheduler_quick.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time

def tick():
    print(f"Tick! The time is: {datetime.now().strftime('%H:%M:%S')}")

scheduler = BackgroundScheduler()
scheduler.add_job(tick, "interval", seconds=10)
scheduler.start()
print("Scheduler running. Press Ctrl+C to stop.")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()
    print("Scheduler stopped.")

Output:

Scheduler running. Press Ctrl+C to stop.
Tick! The time is: 09:15:10
Tick! The time is: 09:15:20
Tick! The time is: 09:15:30

The key concepts: a BackgroundScheduler runs in a background thread so it does not block your main program. The add_job() call registers a function and its trigger. The scheduler.start() call launches the background thread. This pattern works identically in Flask, FastAPI, or any other framework — keep reading for the full details.

What Is APScheduler and When To Use It?

APScheduler is a job scheduling library that wraps Python’s threading and asyncio mechanisms with a high-level interface for managing scheduled tasks. It is the right tool when your tasks need to live in the same Python process as your application — for example, when they need to access application state, database sessions, or configuration that would be awkward to share with an external cron process.

ApproachBest ForDrawbacks
APSchedulerIn-process tasks, Python apps, simple deploymentsJobs stop if process dies
System cronSystem-level tasks, shell scriptsSchedule lives outside code, no persistence
Celery BeatDistributed, high-volume, multiple workersRequires Redis/RabbitMQ, complex setup
Rq-schedulerRedis-backed queues, simpler than CeleryRequires Redis
APScheduler tutorial 1
Cron in a .yaml file maintained by someone who left. APScheduler keeps it in code.

Installing APScheduler

Install the base package with pip:

# terminal
pip install apscheduler

For SQLAlchemy-based job persistence (recommended for production):

# terminal
pip install apscheduler sqlalchemy

Verify the install:

# verify_apscheduler.py
import apscheduler
print(apscheduler.__version__)

Output:

3.10.4

The Three Trigger Types

Interval Trigger

Run a job every N seconds, minutes, hours, days, or weeks. Use this for polling, cleanup, and any task that should repeat on a fixed cycle:

# interval_trigger.py
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def cleanup_temp_files():
    print(f"[{datetime.now():%H:%M:%S}] Cleaning temp files...")
    # Your cleanup logic here

def poll_api():
    print(f"[{datetime.now():%H:%M:%S}] Polling external API...")
    # Your API call here

scheduler = BlockingScheduler()
# Run cleanup every 6 hours
scheduler.add_job(cleanup_temp_files, "interval", hours=6)
# Poll every 30 seconds
scheduler.add_job(poll_api, "interval", seconds=30, id="api_poll")
scheduler.start()

The BlockingScheduler blocks the main thread — use it for scripts where scheduling is the only purpose. Use BackgroundScheduler when you need to do other work in the main thread simultaneously.

Cron Trigger

Run a job on a cron-style schedule — specific times of day, days of week, or days of month. APScheduler’s cron trigger supports all standard cron fields:

# cron_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time

def daily_report():
    print(f"Generating daily report at {datetime.now():%Y-%m-%d %H:%M:%S}")

def weekly_cleanup():
    print(f"Running weekly cleanup at {datetime.now():%Y-%m-%d %H:%M:%S}")

def workday_digest():
    print(f"Sending workday digest at {datetime.now():%Y-%m-%d %H:%M:%S}")

scheduler = BackgroundScheduler()

# Every day at 8:00 AM
scheduler.add_job(daily_report, "cron", hour=8, minute=0)
# Every Monday at midnight
scheduler.add_job(weekly_cleanup, "cron", day_of_week="mon", hour=0, minute=0)
# Weekdays (Mon-Fri) at 5:30 PM
scheduler.add_job(workday_digest, "cron", day_of_week="mon-fri", hour=17, minute=30)

scheduler.start()
try:
    while True:
        time.sleep(60)
except KeyboardInterrupt:
    scheduler.shutdown()

The cron trigger accepts the same fields as standard cron: second, minute, hour, day, month, day_of_week, and year. You can also pass a standard cron expression string using CronTrigger.from_crontab("0 8 * * *").

APScheduler tutorial 2
day_of_week=mon-fri, hour=17, minute=30. Your boss thinks you wrote this at 5:30 PM every day.

Date Trigger

Run a job once at a specific date and time — useful for delayed tasks, one-off notifications, or deferred processing:

# date_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import time

def send_reminder(username, message):
    print(f"Reminder for {username}: {message} at {datetime.now():%H:%M:%S}")

scheduler = BackgroundScheduler()

# Run once 30 seconds from now
run_at = datetime.now() + timedelta(seconds=30)
scheduler.add_job(send_reminder, "date", run_date=run_at,
                  args=["alice", "Team standup in 5 minutes"])

# Run at a specific datetime
scheduler.add_job(send_reminder, "date",
                  run_date="2026-05-03 14:00:00",
                  kwargs={"username": "bob", "message": "Quarterly report due"})

scheduler.start()
try:
    time.sleep(60)
    scheduler.shutdown()
except KeyboardInterrupt:
    scheduler.shutdown()

Output:

Reminder for alice: Team standup in 5 minutes at 09:15:40

The args parameter passes positional arguments to the job function; kwargs passes keyword arguments. Both work with all three trigger types, making it easy to parameterize your jobs.

Error Handling and Listeners

By default, APScheduler logs exceptions but does not crash the scheduler. You can attach event listeners to take action when jobs succeed, fail, or are missed:

# apscheduler_events.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
import logging
import time

logging.basicConfig(level=logging.INFO)

def flaky_job():
    import random
    if random.random() < 0.4:
        raise ValueError("Simulated job failure")
    print("Job succeeded!")

def job_listener(event):
    if event.exception:
        print(f"Job {event.job_id} FAILED: {event.exception}")
    elif hasattr(event, 'retval'):
        print(f"Job {event.job_id} succeeded")

def missed_listener(event):
    print(f"Job {event.job_id} was MISSED (scheduler may have been busy)")

scheduler = BackgroundScheduler()
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_listener(missed_listener, EVENT_JOB_MISSED)
scheduler.add_job(flaky_job, "interval", seconds=5, id="flaky")
scheduler.start()

try:
    time.sleep(30)
    scheduler.shutdown()
except KeyboardInterrupt:
    scheduler.shutdown()

Output:

Job flaky succeeded
Job flaky FAILED: Simulated job failure
Job flaky succeeded
Job flaky succeeded
Job flaky FAILED: Simulated job failure

Event listeners are the right place to add alerting, dead-letter queuing, or retry logic. The EVENT_JOB_MISSED event fires when a job's scheduled time passes before the previous run finishes -- which is a sign your job is taking longer than its interval and you should increase the interval or optimize the job.

APScheduler tutorial 3
EVENT_JOB_ERROR fires. Your alert fires. You fix the bug before anyone notices.

Job Persistence with SQLAlchemy

By default, jobs are stored in memory and are lost when the process restarts. For production applications, use the SQLAlchemy job store to persist jobs in a database:

# apscheduler_persistent.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime
import time

jobstores = {
    "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}
executors = {
    "default": ThreadPoolExecutor(10)
}
job_defaults = {
    "coalesce": True,    # merge missed runs into one
    "max_instances": 1   # only one instance at a time
}

scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults
)

def hourly_sync():
    print(f"[{datetime.now():%H:%M:%S}] Syncing database...")

# add_job with replace_existing=True avoids duplicate jobs on restart
scheduler.add_job(hourly_sync, "interval", hours=1, id="db_sync",
                  replace_existing=True)

scheduler.start()
print("Scheduler with SQLite persistence running...")
try:
    time.sleep(5)
    print(f"Jobs: {scheduler.get_jobs()}")
    scheduler.shutdown()
except KeyboardInterrupt:
    scheduler.shutdown()

Output:

Scheduler with SQLite persistence running...
Jobs: [<Job (id=db_sync name=hourly_sync)>]

The coalesce=True setting merges any missed runs into a single catch-up run -- useful when your process was down during several scheduled intervals. The max_instances=1 setting prevents overlapping runs if a job takes longer than its interval. Always use replace_existing=True when adding jobs that should survive process restarts.

Real-Life Example: Flask App with Background Jobs

APScheduler tutorial 4
APScheduler in Flask -- because @app.route does not have a run-at-3am decorator.

Here is a complete Flask application that uses APScheduler for background tasks -- a common production pattern:

# flask_scheduler_app.py
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from datetime import datetime
import atexit

app = Flask(__name__)
app.config["SECRET_KEY"] = "your-secret-key"

# Simple in-memory log for demo purposes
job_log = []

def cleanup_old_sessions():
    ts = datetime.now().strftime("%H:%M:%S")
    entry = f"[{ts}] Cleaned up expired sessions"
    job_log.append(entry)
    print(entry)

def send_digest_emails():
    ts = datetime.now().strftime("%H:%M:%S")
    entry = f"[{ts}] Sent digest emails to subscribers"
    job_log.append(entry)
    print(entry)

def health_check_external_apis():
    ts = datetime.now().strftime("%H:%M:%S")
    entry = f"[{ts}] Health check: all APIs responding"
    job_log.append(entry)
    print(entry)

@app.route("/")
def index():
    return jsonify({"status": "running", "recent_jobs": job_log[-5:]})

@app.route("/jobs")
def list_jobs():
    jobs = [{"id": j.id, "name": j.name, "next_run": str(j.next_run_time)}
            for j in scheduler.get_jobs()]
    return jsonify(jobs)

# Initialize scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_old_sessions, "interval", minutes=30, id="cleanup")
scheduler.add_job(send_digest_emails, "cron", hour=8, minute=0, id="digest")
scheduler.add_job(health_check_external_apis, "interval", minutes=5, id="health")
scheduler.start()

# Ensure scheduler shuts down with the app
atexit.register(lambda: scheduler.shutdown())

if __name__ == "__main__":
    app.run(debug=False, port=5000)

Test it:

# In another terminal
curl http://localhost:5000/jobs
# Returns:
# [{"id":"cleanup","name":"cleanup_old_sessions","next_run":"2026-05-03 09:45:00+00:00"},
#  {"id":"digest","name":"send_digest_emails","next_run":"2026-05-04 08:00:00+00:00"},
#  {"id":"health","name":"health_check_external_apis","next_run":"2026-05-03 09:20:00+00:00"}]

The atexit.register() call ensures the scheduler shuts down cleanly when the Flask development server stops. In production (Gunicorn, uWSGI), handle shutdown via a signal handler or application lifecycle hook. The /jobs endpoint gives you a simple way to verify that scheduled tasks are registered and shows their next run times.

Frequently Asked Questions

Does APScheduler work with asyncio and FastAPI?

Yes -- use AsyncIOScheduler instead of BackgroundScheduler. In a FastAPI app, start the scheduler in the startup event and shut it down in shutdown: @app.on_event("startup") calls scheduler.start(). Job functions should be regular functions or async coroutines depending on your use case. APScheduler 4.x (currently in beta) is a full async rewrite -- check the docs for the latest stable version.

What happens to jobs that are missed while my process is down?

With in-memory job stores, missed jobs are lost. With a persistent job store (SQLAlchemy, MongoDB), the jobs are remembered but the behavior on restart depends on coalesce and misfire_grace_time settings. coalesce=True runs missed jobs once; coalesce=False runs each missed instance. misfire_grace_time sets a deadline -- if a job is missed by more than this many seconds, it is skipped entirely.

How do I schedule jobs in a specific timezone?

Pass timezone to the trigger: scheduler.add_job(func, "cron", hour=8, timezone="Australia/Sydney"). You can also set a default timezone on the scheduler itself: BackgroundScheduler(timezone="UTC"). Always use timezone-aware scheduling in production -- naive datetime handling causes subtle bugs around daylight saving transitions.

Can I run APScheduler across multiple processes or servers?

Not directly with the background scheduler -- each process runs its own independent scheduler. For true distributed scheduling across multiple workers, use Celery Beat (backed by Redis or RabbitMQ) or APScheduler with a shared PostgreSQL job store combined with row-level locking. APScheduler's SQLAlchemy job store supports a tableschema for multi-process sharing, but it requires careful configuration to avoid duplicate job execution.

How do I pause, resume, or remove a job at runtime?

Use the job's ID: scheduler.pause_job("my_job_id"), scheduler.resume_job("my_job_id"), and scheduler.remove_job("my_job_id"). You can also get a job object and call methods on it: job = scheduler.get_job("my_job_id") then job.pause(). Build a simple admin endpoint in your web app that calls these methods to control job scheduling at runtime without a restart.

Conclusion

APScheduler brings cron-style scheduling inside your Python application, where it belongs. The three trigger types -- interval for repeated tasks, cron for time-of-day scheduling, and date for one-off jobs -- cover virtually every scheduling need. Add event listeners for error alerting, use the SQLAlchemy job store for persistence across restarts, and integrate with Flask or FastAPI using the background scheduler and atexit.

The real-life Flask example demonstrates the complete pattern: initialize the scheduler at module level, register jobs with IDs so they can be managed at runtime, start it once, and register an atexit handler. For production, add a persistent job store and proper timezone configuration from the start -- these are the two settings that cause the most pain when retrofitted later.

For the full API reference and advanced features like executors and job stores, see the APScheduler documentation.

How To Use Python tabulate for Pretty-Printing Tables

How To Use Python tabulate for Pretty-Printing Tables

Intermediate

You have a list of dictionaries, a query result, or a DataFrame you want to print to the terminal — and the default Python output looks like a wall of brackets and commas. Every developer has been there. Debugging data pipelines, writing CLI tools, generating quick reports: at some point you need output that a human can actually read without squinting. Python’s tabulate library solves this in one line.

The tabulate package converts Python data structures into formatted ASCII tables with customizable styles, alignment, and number formatting. It handles lists of lists, lists of dicts, NumPy arrays, Pandas DataFrames, and more. Installation is a single pip command, and it has no mandatory dependencies — making it one of the easiest wins in your CLI toolkit.

In this article you will learn how to install and use tabulate for common data structures, explore the most useful table formats, control alignment and number precision, handle missing values gracefully, and combine tabulate with real-world data in a practical CLI report. By the end you will know exactly when and how to reach for tabulate instead of print.

Python tabulate: Quick Example

Here is the simplest possible tabulate usage — a list of dictionaries printed as a neat grid:

# tabulate_quick.py
from tabulate import tabulate

data = [
    {"name": "Alice", "score": 95, "grade": "A"},
    {"name": "Bob",   "score": 82, "grade": "B"},
    {"name": "Carol", "score": 74, "grade": "C"},
]

print(tabulate(data, headers="keys"))

Output:

name      score  grade
------  -------  -------
Alice        95  A
Bob          82  B
Carol        74  C

The headers="keys" argument tells tabulate to use dictionary keys as column headers. The library automatically right-aligns numeric columns and left-aligns strings — sensible defaults that make the table immediately readable. Keep reading to see how to customize styles, alignment, and formatting for more demanding use cases.

What Is tabulate and Why Use It?

The tabulate library is a pure-Python utility that transforms sequences of records into formatted text tables. Think of it as str.format() for tabular data: you hand it a list of rows and a header, and it figures out column widths, alignment, and separators automatically.

Without tabulate, you would write custom formatting code every time: calculate max column widths, pad strings, build separator lines. That is tedious and error-prone. Tabulate does all of that and adds support for 30+ table styles including plain text, GitHub Markdown, HTML, LaTeX, and more.

Use CaseTabulate Format
Terminal output / CLI toolsgrid, simple, rounded_grid
GitHub README or PR commentsgithub, pipe
HTML email or web pagehtml, unsafehtml
Academic papers / LaTeXlatex, latex_booktabs
Minimal / screenreader friendlyplain, tsv
tabulate tutorial
Fifteen columns, a thousand rows. tabulate in 0.002s.

Installing tabulate

Install from PyPI with pip:

# terminal
pip install tabulate

Verify the installation:

# verify_tabulate.py
import tabulate
print(tabulate.__version__)

Output:

0.9.0

Choosing a Table Format

The tablefmt parameter controls the visual style. Here are the most useful formats side by side:

# tabulate_formats.py
from tabulate import tabulate

rows = [["Python", 3.12, "Interpreted"], ["Rust", 1.75, "Compiled"], ["Go", 1.21, "Compiled"]]
headers = ["Language", "Version", "Type"]

for fmt in ["simple", "grid", "github", "plain"]:
    print(f"\n--- {fmt} ---")
    print(tabulate(rows, headers=headers, tablefmt=fmt))

Output (selected formats):

--- simple ---
Language      Version  Type
----------  ---------  -----------
Python           3.12  Interpreted
Rust             1.75  Compiled
Go               1.21  Compiled

--- grid ---
+------------+-----------+-------------+
| Language   |   Version | Type        |
+============+===========+=============+
| Python     |      3.12 | Interpreted |
+------------+-----------+-------------+
| Rust       |      1.75 | Compiled    |
+------------+-----------+-------------+
| Go         |      1.21 | Compiled    |
+------------+-----------+-------------+

--- github ---
| Language   |   Version | Type        |
|:-----------|----------:|:------------|
| Python     |      3.12 | Interpreted |
| Rust       |      1.75 | Compiled    |
| Go         |      1.21 | Compiled    |

Use simple for most terminal output. Use github when generating Markdown for READMEs or PR descriptions. Use grid when you want clear visual cell boundaries. For HTML output, use html — tabulate generates proper <table> markup with <th> and <td> tags.

tabulate tutorial
Pick your format once. Swap it in one argument.

Column Alignment and Number Precision

Tabulate auto-aligns columns but you can override this with the colalign parameter. Number formatting uses floatfmt and intfmt:

# tabulate_alignment.py
from tabulate import tabulate

portfolio = [
    ["AAPL",  182.63,  1500000.25,  "+2.3%"],
    ["GOOGL", 140.12, 14000000.00,  "-0.8%"],
    ["MSFT",  378.85,  9500000.50,  "+1.1%"],
]
headers = ["Ticker", "Price", "Market Cap", "Change"]

print(tabulate(
    portfolio,
    headers=headers,
    tablefmt="simple",
    colalign=("left", "right", "right", "center"),
    floatfmt=("", ".2f", ",.2f", ""),
))

Output:

Ticker      Price     Market Cap    Change
--------  -------  -------------  --------
AAPL       182.63  1,500,000.25   +2.3%
GOOGL      140.12  14,000,000.00  -0.8%
MSFT       378.85  9,500,000.50   +1.1%

The colalign tuple takes one value per column: "left", "right", or "center". The floatfmt tuple follows Python’s format spec mini-language — ".2f" for two decimal places, ",.2f" adds thousands separator. Pass an empty string to skip formatting for that column.

Handling Missing Values

Real-world data is messy — rows may have different lengths or None values. Tabulate handles both gracefully with the missingval parameter:

# tabulate_missing.py
from tabulate import tabulate

rows = [
    ["Alice",  "Engineering", 95000],
    ["Bob",    "Marketing",   None],
    ["Carol",  None,          72000],
    ["Dave",   "Engineering"],
]

headers = ["Name", "Department", "Salary"]

print(tabulate(rows, headers=headers, tablefmt="simple", missingval="N/A"))

Output:

Name    Department      Salary
------  ------------  --------
Alice   Engineering      95000
Bob     Marketing          N/A
Carol   N/A              72000
Dave    Engineering        N/A

The missingval parameter fills in any None or missing cell with the string you provide. This makes tabulate safe to use with real database queries where columns can be NULL — no need to pre-clean the data before display.

tabulate tutorial
missingval=’N/A’. Your None values now have a polite face.

Working with Different Data Sources

Tabulate accepts multiple input formats. Here is how it works with each:

# tabulate_sources.py
from tabulate import tabulate

# 1. List of lists
rows_lol = [[1, "alpha", 3.14], [2, "beta", 2.71]]
print("List of lists:")
print(tabulate(rows_lol, headers=["ID", "Name", "Value"], tablefmt="simple"))

# 2. List of dicts
rows_dicts = [
    {"city": "Sydney", "pop": 5300000, "country": "AU"},
    {"city": "London", "pop": 9000000, "country": "UK"},
]
print("\nList of dicts:")
print(tabulate(rows_dicts, headers="keys", tablefmt="simple"))

# 3. Dict of lists (column-oriented)
rows_cols = {"x": [1, 2, 3], "y": [10, 20, 30], "z": [100, 200, 300]}
print("\nDict of lists:")
print(tabulate(rows_cols, headers="keys", tablefmt="simple"))

Output:

List of lists:
  ID  Name      Value
----  ------  -------
   1  alpha      3.14
   2  beta       2.71

List of dicts:
city        pop  country
------  -------  ---------
Sydney  5300000  AU
London  9000000  UK

Dict of lists:
  x    y    z
---  ---  ---
  1   10  100
  2   20  200
  3   30  300

For headers="keys" with dicts, tabulate uses the dictionary keys in insertion order (Python 3.7+). For column-oriented data (a dict of lists), tabulate handles it correctly without any conversion on your part.

tabulate tutorial
40 lines of Python. Your ops team thinks it took a week.

Real-Life Example: CLI Package Dependency Report

Here is a practical script that reads installed packages and generates a formatted report — useful for auditing environments before deployment:

# package_report.py
import subprocess
import sys
from tabulate import tabulate

def get_installed_packages():
    result = subprocess.run(
        [sys.executable, "-m", "pip", "list", "--format=json"],
        capture_output=True, text=True
    )
    import json
    packages = json.loads(result.stdout)
    return packages

def get_outdated_packages():
    result = subprocess.run(
        [sys.executable, "-m", "pip", "list", "--outdated", "--format=json"],
        capture_output=True, text=True
    )
    import json
    if result.stdout.strip():
        return {p["name"].lower(): p["latest_version"] for p in json.loads(result.stdout)}
    return {}

packages = get_installed_packages()
outdated = get_outdated_packages()

rows = []
for pkg in sorted(packages, key=lambda p: p["name"].lower()):
    name = pkg["name"]
    version = pkg["version"]
    latest = outdated.get(name.lower(), "")
    status = "OUTDATED" if latest else "ok"
    rows.append([name, version, latest or "--", status])

print("\nInstalled Packages Report")
print(f"Total: {len(rows)} packages, {len(outdated)} outdated\n")

print(tabulate(
    rows,
    headers=["Package", "Installed", "Latest", "Status"],
    tablefmt="simple",
    colalign=("left", "right", "right", "center"),
    missingval="--",
))

Output (example):

Installed Packages Report
Total: 47 packages, 3 outdated

Package      Installed    Latest    Status
---------  -----------  --------  --------
certifi         2023.11  2024.02   OUTDATED
pip              23.3.2   24.0.0   OUTDATED
requests          2.31.0  --          ok
tabulate          0.9.0   --          ok
urllib3           2.1.0   2.2.0    OUTDATED

This script shows how to combine tabulate with subprocess output for a genuinely useful DevOps tool. Extend it by adding a --html flag that changes tablefmt="html" and writes to a file, or filter to show only outdated packages by slicing the rows list before passing to tabulate.

Frequently Asked Questions

Can I use tabulate directly with a Pandas DataFrame?

Yes — pass the DataFrame and headers="keys": tabulate(df, headers="keys", tablefmt="grid"). DataFrames also have a built-in df.to_markdown() method that wraps tabulate internally. For Jupyter notebooks, the built-in HTML rendering is usually more appropriate than tabulate.

How do I handle very wide tables that wrap in the terminal?

Use maxcolwidths (added in tabulate 0.9.0) to truncate long values: tabulate(data, headers="keys", maxcolwidths=30) caps each column at 30 characters. For very wide tables, consider using tablefmt="plain" (no borders) or transpose the data so columns become rows.

How do I add a row index to my table?

Pass showindex=True for an auto-incrementing index, or pass a list for custom indices: tabulate(rows, headers=headers, showindex=range(1, len(rows)+1)). You can also pass showindex="always" which works for any input type including DataFrames.

Is tabulate fast enough for large datasets?

Tabulate is optimized for display, not batch processing — it is fine for hundreds to a few thousand rows. For tables with 10,000+ rows you will notice slowdown because it must scan all values to compute column widths. Display only a sample (data[:100]) or use a fixed column width you specify manually.

Can I add colors or bold text to tabulate output?

Tabulate itself does not add ANSI color codes, but you can add them to your data strings before passing to tabulate. Libraries like colorama work alongside tabulate. Note that ANSI codes add invisible characters that can throw off alignment — test carefully. If you want colors plus rich formatting, consider using the rich library’s built-in Table class as an alternative.

Conclusion

The tabulate library turns the chore of formatting tabular data into a single function call. The key patterns: pass lists of dicts with headers="keys" for the quickest readable output; use tablefmt="github" for Markdown output; use colalign and floatfmt for precise numeric formatting; and missingval for handling None values from real-world data sources.

The real-life example — a package dependency reporter — shows how to combine tabulate with subprocess to build a useful DevOps tool in about 40 lines. Extend it with a --html output option and automated email delivery for a lightweight dependency audit system.

For the full list of format names and options, see the tabulate documentation on PyPI and the GitHub repository which includes format screenshots for every style.

How To Use Python natsort for Natural Sort Order

How To Use Python natsort for Natural Sort Order

Intermediate

Python’s built-in sorting handles strings lexicographically — which means “file10.txt” sorts before “file2.txt” because “1” comes before “2” in ASCII. This is fine for computers, but deeply frustrating for anyone who has to look at a sorted file listing. Natural sort order — the order humans expect when they see numbers inside strings — is what natsort provides. It is the difference between a file manager that makes sense and one that scrambles your numbered files.

The natsort library is a pure-Python solution that detects numeric substrings inside strings and compares them as numbers, not as character sequences. It handles filenames, version strings, IP addresses, biological sequences, and locale-aware sorting with a clean, minimal API. No need to write custom sort key functions for every new case.

In this article, you will learn how natsort works, how to use natsorted() for common cases, sort paths and version numbers, use natural sort keys with sort() and sorted(), handle case-insensitive and locale-aware sorting, and build a practical file organizer that uses natural sort to group and display files. Install natsort with pip install natsort.

Natural Sorting: Quick Example

# natsort_quick.py
from natsort import natsorted

files = ['file10.txt', 'file2.txt', 'file1.txt', 'file20.txt', 'file3.txt']

# Default Python sort -- lexicographic (wrong order for humans)
print("Default sort:", sorted(files))

# Natural sort -- numbers compared as integers
print("Natural sort:", natsorted(files))

Output:

Default sort: ['file1.txt', 'file10.txt', 'file2.txt', 'file20.txt', 'file3.txt']
Natural sort: ['file1.txt', 'file2.txt', 'file3.txt', 'file10.txt', 'file20.txt']

That one import and one function call is all it takes to fix the ordering. natsorted() returns a new list, just like sorted(). The rest of this article covers the full range of what natsort can do for you beyond basic filename sorting.

What Is Natural Sort Order?

Natural sort order is a collation algorithm that treats numeric substrings within strings as actual numbers when comparing. Instead of comparing character by character (‘2’ vs ‘1’ vs ‘0’), it identifies contiguous digit sequences, converts them to integers, and compares those integers. This produces an ordering that matches human intuition.

InputLexicographic (default)Natural sort (natsort)
file1, file2, file10file1, file10, file2file1, file2, file10
v1.2, v1.10, v1.9v1.10, v1.2, v1.9v1.2, v1.9, v1.10
Chapter 1, Chapter 2, Chapter 10Chapter 1, Chapter 10, Chapter 2Chapter 1, Chapter 2, Chapter 10
img_001.jpg, img_010.jpg, img_100.jpgCorrect (leading zeros)Correct (numeric value)

The key insight: lexicographic sorting is correct for most string data (names, words, identifiers), but breaks down whenever numeric sequences appear within strings. natsort detects these sequences and applies numeric comparison only where it is appropriate, leaving the rest of the string to standard character comparison.

Loop Larry confused by file ordering
file10 before file2. Computers are right. Computers are also wrong.

Using natsorted() for Common Cases

The main entry point is natsorted(), which accepts any iterable and optional keyword arguments to control sorting behavior:

# natsort_basics.py
from natsort import natsorted, ns

# Version strings
versions = ['1.10.2', '1.9.0', '2.0.1', '1.2.3', '1.10.0']
print("Versions:", natsorted(versions))

# Mixed strings with numbers and text
items = ['item_5', 'item_12', 'item_3', 'item_10', 'item_1']
print("Items:", natsorted(items))

# Case-insensitive natural sort
mixed_case = ['File10.txt', 'file2.TXT', 'FILE1.txt', 'File20.txt']
print("Case-insensitive:", natsorted(mixed_case, alg=ns.IGNORECASE))

# Reverse natural sort
print("Reversed:", natsorted(versions, reverse=True))

# Sort with a key function (e.g., sort tuples by second element)
data = [('b', 'v10'), ('a', 'v2'), ('c', 'v1'), ('d', 'v20')]
print("By version:", natsorted(data, key=lambda x: x[1]))

Output:

Versions: ['1.2.3', '1.9.0', '1.10.0', '1.10.2', '2.0.1']
Items: ['item_1', 'item_3', 'item_5', 'item_10', 'item_12']
Case-insensitive: ['FILE1.txt', 'file2.TXT', 'File10.txt', 'File20.txt']
Reversed: ['2.0.1', '1.10.2', '1.10.0', '1.9.0', '1.2.3']
By version: [('c', 'v1'), ('a', 'v2'), ('b', 'v10'), ('d', 'v20')]

Sorting File Paths

File path sorting has an extra wrinkle: you usually want to sort directory components separately from filenames, and you want numbers within both components to sort naturally. natsort’s PATH5�code> algorithm handles this correctly:

# natsort_paths.py
from natsort import natsorted, ns
from pathlib import Path

# Simulate a file listing with numbered paths
paths = [
    '/data/project10/output2.csv',
    '/data/project2/output10.csv',
    '/data/project1/output1.csv',
    '/data/project2/output2.csv',
    '/data/project10/output1.csv',
]

# PATH algorithm: sorts directory components and filenames separately
sorted_paths = natsorted(paths, alg=ns.PATH)
for p in sorted_paths:
    print(p)

Output:

/data/project1/output1.csv
/data/project2/output2.csv
/data/project2/output10.csv
/data/project10/output1.csv
/data/project10/output2.csv

Without ns.PATH, the slash characters in paths can interfere with the numeric detection. The PATH algorithm splits on path separators first, then applies natural sort within each component -- giving you the result a file manager would show.

Cache Katie sorting files in natural order
O(n log n) plus one import. The price of sanity.

Using natsort_keygen() with sort()

If you need to sort an existing list in place (rather than getting a new sorted list), use natsort_keygen() to get a key function you can pass to list.sort():

# natsort_key.py
from natsort import natsort_keygen, ns

# Sort a list in place
logs = ['error_10.log', 'error_2.log', 'error_1.log', 'error_100.log']
key = natsort_keygen()
logs.sort(key=key)
print("Sorted logs:", logs)

# Use as a sort key in more complex operations
students = [
    {'name': 'Alice', 'grade': 'Grade 10'},
    {'name': 'Bob', 'grade': 'Grade 2'},
    {'name': 'Carol', 'grade': 'Grade 1'},
    {'name': 'Dave', 'grade': 'Grade 20'},
]
grade_key = natsort_keygen(key=lambda s: s['grade'])
students.sort(key=grade_key)
for s in students:
    print(f"  {s['grade']}: {s['name']}")

Output:

Sorted logs: ['error_1.log', 'error_2.log', 'error_10.log', 'error_100.log']
  Grade 1: Carol
  Grade 2: Bob
  Grade 10: Alice
  Grade 20: Dave

Real-Life Example: File Report Generator

Sudo Sam organizing perfectly sorted files
A sorted file listing: the last thing you will take for granted again.

This script scans a directory, groups files by type, sorts them naturally within each group, and produces a clean report with file sizes:

# file_report.py
from pathlib import Path
from natsort import natsorted, ns
from collections import defaultdict

def format_size(bytes_count: int) -> str:
    """Format bytes as human-readable size."""
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes_count < 1024:
            return f"{bytes_count:.1f} {unit}"
        bytes_count /= 1024
    return f"{bytes_count:.1f} TB"

def generate_file_report(folder: str) -> None:
    base = Path(folder)
    if not base.exists():
        print(f"Folder not found: {folder}")
        return

    # Group files by extension
    groups = defaultdict(list)
    for path in base.rglob('*'):
        if path.is_file():
            ext = path.suffix.lower() or '.no_ext'
            groups[ext].append(path)

    # Sort extension groups naturally, then files within each group
    sorted_exts = natsorted(groups.keys())

    total_files = 0
    total_size = 0

    for ext in sorted_exts:
        files = natsorted(groups[ext], key=lambda p: p.name, alg=ns.PATH)
        ext_size = sum(f.stat().st_size for f in files)
        print(f"\n{ext} ({len(files)} files, {format_size(ext_size)})")
        print("-" * 40)
        for f in files[:5]:  # Show first 5 to keep output manageable
            size = f.stat().st_size
            rel = f.relative_to(base)
            print(f"  {str(rel):<35} {format_size(size):>8}")
        if len(files) > 5:
            print(f"  ... and {len(files) - 5} more")
        total_files += len(files)
        total_size += ext_size

    print(f"\nTotal: {total_files} files, {format_size(total_size)}")

generate_file_report('./my_project')

Output:

.csv (3 files, 48.2 KB)
----------------------------------------
  data/output1.csv                  12.1 KB
  data/output2.csv                  18.4 KB
  data/output10.csv                 17.7 KB

.py (5 files, 22.8 KB)
----------------------------------------
  src/module1.py                     4.2 KB
  src/module2.py                     5.1 KB
  src/module10.py                    6.8 KB
  src/module11.py                    3.9 KB
  src/utils.py                       2.8 KB

Total: 8 files, 71.0 KB

Frequently Asked Questions

Does natsort handle floating-point numbers in strings?

Yes -- use the ns.FLOAT algorithm flag: natsorted(items, alg=ns.FLOAT). This treats sequences like 3.14 and 2.71 as floating-point numbers rather than two separate integer sequences. You can combine flags: ns.FLOAT | ns.IGNORECASE. For version strings specifically, ns.VERSION (or ns.V) is a dedicated shortcut that handles dotted version numbers correctly.

How do I sort with locale-aware character ordering?

Use natsorted(items, alg=ns.LOCALE) after calling locale.setlocale(locale.LC_ALL, '') to set the system locale. This sorts accented characters (like e, a, o in European languages) according to the locale's collation rules rather than Unicode code point order. This is important for sorting names in languages where a and a are considered equal or adjacent in the alphabet.

Can I use natsort with pandas DataFrames?

Yes. Use natsort_keygen() as the key parameter in pandas sort_values(): df.sort_values('column', key=lambda s: s.map(natsort_keygen())). For sorting DataFrame row labels (index), use df.reindex(natsorted(df.index)). The natsort documentation has a dedicated pandas section with more examples for multi-column natural sorting.

Does natsort handle negative numbers?

By default, natsort does not treat a leading minus sign as part of a number -- "file-10.txt" sorts as the string "file-" followed by natural sort on "10.txt". To enable signed number handling, use ns.SIGNED: natsorted(items, alg=ns.SIGNED). This is useful for data with negative values in filenames or log entries, but can produce unexpected results if hyphens are used as separators in non-numeric contexts.

Is natsort fast enough for large lists?

natsort is implemented in pure Python with optional C acceleration via the fastnumbers package. Install it with pip install fastnumbers to get 3-5x speedup on numeric-heavy data. For most real-world use cases (thousands of filenames, hundreds of version strings), natsort's performance is more than adequate. If you are sorting millions of strings, profile first -- the Python overhead may matter more than the sorting algorithm itself.

Conclusion

You now know how to use natsort to make string sorting behave the way humans expect. We covered natsorted() for common cases, the PATH5�code> algorithm for file system paths, natsort_keygen() for in-place sorting and complex key functions, case-insensitive and locale-aware sorting with ns flags, and a practical file report generator that groups and naturally sorts files by extension.

The key takeaway is simple: any time you are sorting strings that contain numbers -- filenames, version strings, log entries, chapter titles, IP addresses -- reach for natsort instead of writing a custom key function. It handles the edge cases (floats, negative numbers, leading zeros, locale) that custom one-liners miss.

See the natsort documentation for the full list of algorithm flags, pandas integration examples, and performance tips.

How To Use Python pendulum for Better Date and Time Handling

How To Use Python pendulum for Better Date and Time Handling

Intermediate

Dealing with dates and times in Python’s standard datetime module is notoriously error-prone. Timezone handling is verbose and fragile, parsing requires format strings you always have to look up, and arithmetic near daylight saving time transitions can silently give you wrong answers. If you have ever had a cron job fire at the wrong time after DST, or struggled to express “3 months from now” in stdlib, you will appreciate pendulum.

pendulum is a drop-in replacement for Python’s datetime that makes timezone-aware operations the default, not an afterthought. Every pendulum.DateTime object is timezone-aware, DST transitions are handled correctly, parsing is smart (ISO 8601 by default), and date math reads like plain English. It is widely used in data pipelines, scheduling systems, and API integrations where timezone correctness matters.

In this article, you will learn how to create and parse pendulum datetimes, work with timezones safely, perform date arithmetic, format dates for display and APIs, use periods and durations, and build a practical meeting scheduler that handles cross-timezone coordination. Install pendulum with pip install pendulum.

pendulum Quick Example

# pendulum_quick.py
import pendulum

now = pendulum.now('Europe/London')
print("London time:", now.to_datetime_string())
print("Tokyo time:", now.in_timezone('Asia/Tokyo').to_datetime_string())
print("In 3 weeks:", now.add(weeks=3).to_date_string())
print("Diff from now:", pendulum.now().diff_for_humans(pendulum.now().subtract(hours=5)))

Output:

London time: 2026-05-02 10:00:00
Tokyo time: 2026-05-02 18:00:00
In 3 weeks: 2026-05-23
Diff from now: 5 hours ago

Every datetime created by pendulum is timezone-aware. Converting between timezones is a single method call. The diff_for_humans() method produces readable strings like “5 hours ago” or “in 3 weeks” — perfect for user-facing interfaces. The rest of this article dives deep into each of these capabilities.

What Is pendulum and How Does It Compare to datetime?

pendulum extends Python’s datetime.datetime class — every pendulum instance is a proper datetime, so it works anywhere datetime is expected. The key differences are in defaults and ergonomics: stdlib datetime objects are naive (no timezone) by default and require explicit pytz or zoneinfo handling, while every pendulum datetime has a timezone from creation.

Featurestdlib datetimependulum
Timezone-aware by defaultNo (naive by default)Yes (always)
DST-safe arithmeticNo (manual)Yes (automatic)
Human-readable diffstimedelta onlydiff_for_humans()
ISO 8601 parsingRequires format stringpendulum.parse()
Drop-in compatibilityNativeYes (subclasses datetime)
Period / duration mathtimedelta onlyPeriod, Duration objects

pendulum uses the IANA timezone database (via tzdata) for all timezone operations. This means it knows about every DST rule for every timezone, including historical changes — an important detail for handling records from the past.

Cache Katie managing timezone complexity
naive datetime + timezone math = 2am happens twice. Use pendulum.

Creating and Parsing Datetimes

pendulum gives you several ways to create datetime objects. The most important: pendulum.now() for the current time, pendulum.datetime() for a specific moment, and pendulum.parse() for strings:

# pendulum_create.py
import pendulum

# Current time in a specific timezone
now_nyc = pendulum.now('America/New_York')
print("NYC now:", now_nyc)

# Create a specific datetime
launch = pendulum.datetime(2026, 6, 1, 9, 0, 0, tz='UTC')
print("Launch:", launch.to_iso8601_string())

# Parse from ISO 8601 string (no format string needed)
deadline = pendulum.parse('2026-12-31T23:59:59+05:30')
print("Deadline:", deadline.timezone_name, deadline.to_datetime_string())

# Parse a date-only string (becomes midnight UTC)
day = pendulum.parse('2026-05-15')
print("Day:", day.to_date_string())

# From a Unix timestamp
event = pendulum.from_timestamp(1746129600, tz='Asia/Tokyo')
print("Event in Tokyo:", event.to_datetime_string())

Output:

NYC now: 2026-05-02T06:00:00-04:00
Launch: 2026-06-01T09:00:00+00:00
Deadline: Asia/Kolkata 2026-12-31 23:59:59
Day: 2026-05-15
Event in Tokyo: 2026-05-02 09:00:00

Date Arithmetic with add() and subtract()

pendulum’s add() and subtract() methods accept any combination of years, months, weeks, days, hours, minutes, and seconds. Unlike timedelta, these methods understand calendar arithmetic — adding 1 month to January 31st gives you February 28th, not an error or overflow:

# pendulum_arithmetic.py
import pendulum

now = pendulum.now('UTC')

# Add calendar units
print("In 3 months:", now.add(months=3).to_date_string())
print("In 2 weeks:", now.add(weeks=2).to_date_string())
print("Yesterday:", now.subtract(days=1).to_date_string())
print("Next quarter:", now.add(months=3).start_of('month').to_date_string())

# End/start of period helpers
print("Start of week:", now.start_of('week').to_date_string())
print("End of month:", now.end_of('month').to_date_string())
print("Start of year:", now.start_of('year').to_date_string())

# DST-aware arithmetic -- clocks spring forward at 2am
eastern = pendulum.datetime(2026, 3, 8, 1, 30, 0, tz='America/New_York')
one_hour_later = eastern.add(hours=1)
print("\nBefore DST:", eastern.to_datetime_string(), eastern.timezone_abbreviation)
print("After +1hr:", one_hour_later.to_datetime_string(), one_hour_later.timezone_abbreviation)

Output:

In 3 months: 2026-08-02
In 2 weeks: 2026-05-16
Yesterday: 2026-05-01
Next quarter: 2026-08-01
Start of week: 2026-04-27
End of month: 2026-05-31
Start of year: 2026-01-01

Before DST: 2026-03-08 01:30:00 EST
After +1hr: 2026-03-08 03:30:00 EDT

Notice the DST transition: 1:30 AM + 1 hour = 3:30 AM (clocks skip 2:00-3:00 AM). stdlib timedelta arithmetic does not handle this correctly — you would need to use zoneinfo explicitly. pendulum does it automatically.

API Alex with pendulum date arithmetic
add(months=1) on Jan 31 = Feb 28. Not a bug. A feature.

Periods and Durations

A Period in pendulum is the range between two datetimes. Unlike a bare timedelta, a Period knows about calendar months and years, so you can get “2 months and 5 days” rather than just a raw count of seconds. A Duration is an absolute measure of time (like timedelta) but with the same readable interface:

# pendulum_periods.py
import pendulum

start = pendulum.datetime(2026, 1, 15, tz='UTC')
end = pendulum.datetime(2026, 5, 2, tz='UTC')

period = end - start  # Returns a Period
print("Total days:", period.days)
print("In weeks:", period.weeks)
print("In months:(", period.in_months())
print("Remaining days:", period.remaining_days)
print("Human:", period.in_words())

# Iterate over dates in a range
print("\nFirst 3 days of May:")
may = pendulum.period(
    pendulum.datetime(2026, 5, 1, tz='UTC'),
    pendulum.datetime(2026, 5, 3, tz='UTC')
)
for dt in may.range('days'):
    print(" ", dt.to_date_string(), dt.day_of_week_name)

Output:

Total days: 107
In weeks: 15
In months: 3
Qmaining days: 22
Human: 3 months 2 weeks 2 days

First 3 days of May:
  2026-05-01 Friday
  2026-05-02 Saturday
  2026-05-03 Sunday

Real-Life Example: Cross-Timezone Meeting Scheduler

API Alice connecting timezone clocks
The meeting is at 9am UTC. Midnight somewhere. Sorry, Tokyo.

This scheduler takes a proposed meeting time in UTC and shows it in each participant’s timezone, flags unsociable hours, and generates an iCal-compatible timestamp:

# meeting_scheduler.py
import pendulum

TEAM = {
    "Alice (NYC)":    "America/New_York",
    "Bob (London)":   "Europe/London",
    "Carol (Sydney)": "Australia/Sydney",
    "Dave (Tokyo)":   "Asia/Tokyo",
}

WORK_HOURS = range(8, 18)  # 8am - 6pm considered acceptable

def schedule_meeting(utc_time_str: str, duration_minutes: int = 60) -> None:
    meeting_utc = pendulum.parse(utc_time_str, tz='UTC')
    meeting_end = meeting_utc.add(minutes=duration_minutes)

    print(f"Meeting: {meeting_utc.to_iso8601_string()}")
    print(f"Duration: {duration_minutes} minutes")
    print("-" * 55)

    all_ok = True
    for person, tz in TEAM.items():
        local = meeting_utc.in_timezone(tz)
        local_end = meeting_end.in_timezone(tz)
        hour = local.hour
        is_ok = hour in WORK_HOURS
        if not is_ok:
            all_ok = False
        status = "OK " if is_ok else "BAD"
        print(f"[{status}] {person:<20} {local.format('ddd DD MMM HH:mm')} - {local_end.format('HH:mm')} {local.timezone_abbreviation}")

    print("-" * 55)
    if all_ok:
        print("All team members available during work hours.")
    else:
        print("WARNING: Some participants outside work hours.")

    # iCal-compatible timestamp
    print(f"\nDTSTART:{meeting_utc.format('YYYYMMDDTHHmmss')}Z")
    print(f"DTEND:{meeting_end.format('YYYYMMDDTHHmmss')}Z")

schedule_meeting("2026-05-04T14:00:00Z", duration_minutes=45)

Output:

Meeting: 2026-05-04T14:00:00+00:00
Duration: 45 minutes
-------------------------------------------------------
[OK ] Alice (NYC)          Mon 04 May 10:00 - 10:45 ETT
[OK ] Bob (London)         Mon 04 May 15:00 - 15:45 BST
[BAD] Carol (Sydney)       Tue 05 May 00:00 - 00:45 AEST
[OK ] Dave (Tokyo)         Mon 04 May 23:00 - 23:45 JST
-------------------------------------------------------
WARNING: Some participants outside work hours.

DTSTART:20260504T140000Z
DTEND:20260504T144500Z

Frequently Asked Questions

Is pendulum compatible with stdlib datetime?

Yes -- pendulum's DateTime is a subclass of datetime.datetime. You can pass pendulum objects anywhere a datetime is expected: to database drivers, to json.dumps with a custom encoder, to Django model fields, to pandas Timestamp constructors. The only caveat is that libraries that create datetime objects (like SQLAlchemy or requests) return plain datetime, not pendulum -- use pendulum.instance(dt) to convert them.

What happens if I pass a naive datetime to pendulum?

Use pendulum.instance(naive_dt, tz='UTC') to wrap a naive datetime with a timezone. Never assume the timezone of a naive datetime -- if you are not sure whether it's local time or UTC, you need to find out from the data source. Silently assuming UTC for naive datetimes from user input or database rows is a common source of off-by-one-hour bugs after DST transitions.

What format strings does pendulum use?

pendulum uses its own token-based format strings in the .format() method (e.g., 'YYYY-MM-DD HH:mm:ss'), which are different from stdlib's strftime codes. There are also convenience methods: .to_iso8601_string(), .to_date_string(), .to_datetime_string(), .to_time_string(), and .to_rfc2822_string(). Use the convenience methods for common formats and .format() for custom patterns.

How do I store pendulum datetimes in a database?

Always store datetimes as UTC in the database. Convert to the user's timezone only for display. With SQLAlchemy, use a DateTime(timezone=True) column -- this stores and retrieves UTC-aware datetimes. In the application layer, convert: pendulum.instance(db_datetime).in_timezone(user_tz). This pattern prevents timezone ambiguity and makes your data portable across servers in different regions.

What changed in pendulum 3.x?

pendulum 3.0 (released 2024) rewrote the internals to use Python's built-in zoneinfo module (available since Python 3.9) instead of a bundled timezone database. This makes it lighter and keeps timezone data in sync with your OS. The API is largely backward-compatible, but check your pendulum.timezone() calls -- some legacy timezone names were updated. If you are on Python 3.8 or below, install the backports.zoneinfo package alongside pendulum 3.

Conclusion

You now have a complete toolkit for timezone-aware date and time handling with pendulum. We covered creating and parsing datetimes with automatic timezone support, performing calendar-aware arithmetic with add() and subtract() that handles DST transitions correctly, working with Periods for range iteration and human-readable durations, and a practical cross-timezone meeting scheduler that flags unsociable hours and outputs iCal timestamps.

The fundamental rule to take away: always work in UTC internally and convert to local time only for display. pendulum makes this the path of least resistance -- every pendulum.now('UTC') call gives you a timezone-aware datetime, and .in_timezone(user_tz) handles the conversion. Extend the meeting scheduler to find the optimal meeting time that minimizes "bad hours" across all participants -- that is a fun exercise in Period arithmetic.

See the pendulum documentation for the full list of format tokens, locale support, and testing helpers.

How To Use Python structlog for Structured Logging

How To Use Python structlog for Structured Logging

Intermediate

Standard Python logging gives you timestamped text messages, but when you are debugging a production incident at 2am, those messages rarely give you the context you need. Which user triggered the error? Which request ID? What was the state of the application at that moment? Structured logging solves this by making every log event a machine-readable data record — a dict of key-value pairs instead of a raw string. structlog is the leading Python library for structured logging, and it makes this easy while staying fully compatible with the standard logging module.

structlog works by building up a context dictionary as your code executes, then rendering it as JSON (or pretty-printed for development) when you emit a log event. You bind values once — like a request ID or user ID — and they appear automatically in every subsequent log message from that context. No more copy-pasting request IDs into every log call.

In this article, you will learn how to configure structlog, bind context with bind and new, set up processors for development and production output, integrate with the standard logging module, add request context in a Flask app, and build a practical async worker with rich structured logging. Install structlog with pip install structlog before starting.

Structured Logging: Quick Example

# structlog_quick.py
import structlog

log = structlog.get_logger()

log.info("user_login", user_id=42, email="alice@example.com", source="web")
log.warning("login_failed", user_id=99, reason="invalid_password", attempt=3)

Output (development mode — pretty printed):

2026-05-02 10:00:00 [info     ] user_login         email=alice@example.com source=web user_id=42
2026-05-02 10:00:00 [warning  ] login_failed       attempt=3 reason=invalid_password user_id=99

Each log call takes a positional event string followed by keyword arguments that become fields in the structured record. In development, structlog renders these in a readable format; in production, you configure it to output JSON. The same code, different renderers — you never change your log calls based on environment.

What Is structlog and Why Use It?

structlog replaces the pattern of building log messages from string concatenation (f"User {user_id} failed to login with error {error}") with a pattern of logging discrete fields. This makes logs searchable and filterable in tools like Elasticsearch, Datadog, or CloudWatch Logs Insights — you can query user_id=42 and get all events for that user, rather than running regex searches over raw strings.

Featurestdlib loggingstructlog
Output formatText stringsJSON or text, configurable
Context bindingLoggerAdapter (verbose)bind() / new() (clean)
Processors/middlewareHandlers and FiltersProcessor pipeline
stdlib compatibilityNativeFull integration available
Async supportThread-safe onlyasync-aware with asyncio

structlog does not replace stdlib logging — it wraps around it. In most production setups, structlog acts as the frontend you write to, while stdlib logging handles the backend (file handlers, syslog, etc.). This means you can adopt structlog incrementally in an existing codebase.

Sudo Sam monitoring structured log streams
event=payment_failed user_id=42 — now FIND that in your grep.

Configuring structlog for Development and Production

structlog is configured once at application startup. The key is the processor chain — a list of functions that transform the log event dict before rendering. Here is a complete configuration for both environments:

# structlog_config.py
import structlog
import logging
import sys
import os

def configure_logging() -> None:
    """Configure structlog for the current environment."""
    is_production = os.getenv('ENV', 'development') == 'production'

    shared_processors = [
        structlog.contextvars.merge_contextvars,       # Thread/async context
        structlog.stdlib.add_log_level,                # Add 'level' field
        structlog.stdlib.add_logger_name,              # Add 'logger' field
        structlog.processors.TimeStamper(fmt='iso'),   # ISO 8601 timestamp
        structlog.processors.StackInfoRenderer(),      # Stack traces
        structlog.processors.format_exc_info,          # Exception formatting
    ]

    if is_production:
        # JSON output for log aggregation tools
        renderer = structlog.processors.JSONRenderer()
    else:
        # Human-readable colored output for terminals
        renderer = structlog.dev.ConsoleRenderer(colors=True)

    structlog.configure(
        processors=shared_processors + [renderer],
        wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

configure_logging()
log = structlog.get_logger("myapp")

log.info("app_started", version="1.2.3", environment=os.getenv('ENV', 'development'))
log.debug("config_loaded", database="postgres", cache="redis")

Output (development):

2026-05-02T10:00:00Z [debug    ] config_loaded      cache=redis database=postgres logger=myapp
2026-05-02T10:00:00Z [info     ] app_started        environment=development logger=myapp version=1.2.3

Output (production, ENV=production):

{"cache": "redis", "database": "postgres", "event": "config_loaded", "level": "debug", "logger": "myapp", "timestamp": "2026-05-02T10:00:00Z"}
{"environment": "production", "event": "app_started", "level": "info", "logger": "myapp", "timestamp": "2026-05-02T10:00:00Z", "version": "1.2.3"}

Binding Context with bind() and new()

The most powerful feature of structlog is context binding. Use bind() to add fields that persist for the lifetime of a logger, and new() to create a fresh logger with a clean context:

# structlog_bind.py
import structlog

log = structlog.get_logger()

def process_order(order_id: str, user_id: int) -> None:
    # Bind context once -- all subsequent logs from this logger include it
    order_log = log.bind(order_id=order_id, user_id=user_id)

    order_log.info("order_processing_started")

    # Further bind additional context
    item_log = order_log.bind(item_count=3, total_amount=99.99)
    item_log.info("items_validated")

    try:
        # Simulate an operation
        if order_id == "ORD-999":
            raise ValueError("Invalid payment method")
        item_log.info("payment_processed", gateway="stripe")
    except ValueError as e:
        item_log.error("payment_failed", error=str(e), retry=False)
        raise

    order_log.info("order_processing_complete")

process_order("ORD-101", user_id=42)

Output:

2026-05-02 [info ] order_processing_started  order_id=ORD-101 user_id=42
2026-05-02 [info ] items_validated           item_count=3 order_id=ORD-101 total_amount=99.99 user_id=42
2026-05-02 [info ] payment_processed         gateway=stripe item_count=3 order_id=ORD-101 total_amount=99.99 user_id=42
2026-05-02 [info ] order_processing_complete order_id=ORD-101 user_id=42

Notice how order_id and user_id appear in every log message without being passed repeatedly. When the error occurs, the full context is already present — you see exactly which order failed, for which user, with which item count. This is what makes structured logging so powerful for debugging production issues.

Debug Dee filtering JSON log data
bind() once. Every log carries context forward. No excuses.

Integrating with stdlib logging

Most production codebases already use stdlib logging — your application, your third-party libraries, the frameworks they depend on. structlog plays nicely with all of them: it can wrap stdlib loggers (so existing code keeps working) AND redirect stdlib output through structlog’s pipeline (so everything ends up as JSON):

# File: hybrid_logging.py
import logging
import structlog

# 1) Configure stdlib logging to forward to structlog's processor chain
logging.basicConfig(
    format="%(message)s",
    level=logging.INFO,
)

# 2) Configure structlog to render as JSON
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

# 3) Use either API — same JSON output
struct_logger = structlog.get_logger("api")
struct_logger.info("via structlog", user_id=42)

stdlib_logger = logging.getLogger("legacy")
stdlib_logger.info("via stdlib")

The processor chain is the heart of structlog. Each processor receives the event dict and returns the modified dict. JSONRenderer at the end serializes it. By inserting format_exc_info and StackInfoRenderer in the chain, every logger.exception() call gets a clean JSON traceback without you having to do anything.

Custom Processors for Filtering and Enrichment

A processor is just a function with signature (logger, method_name, event_dict) -> event_dict. You write them like middleware — each one inspects the dict, can mutate it, and returns either the dict or raises DropEvent to drop the line entirely. Two patterns you’ll use constantly:

# File: custom_processors.py
import os
import structlog

def add_environment(logger, method_name, event_dict):
    """Inject deploy environment into every log line."""
    event_dict["env"] = os.environ.get("APP_ENV", "dev")
    event_dict["service"] = "payments-api"
    return event_dict

def redact_secrets(logger, method_name, event_dict):
    """Strip values for any key that looks sensitive."""
    for key in list(event_dict):
        if any(s in key.lower() for s in ("password", "token", "secret", "card")):
            event_dict[key] = "***REDACTED***"
    return event_dict

structlog.configure(
    processors=[
        add_environment,
        redact_secrets,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

log = structlog.get_logger()
log.info("charge attempted", user_id=42, card_number="4111111111111111", amount=99)

Output:

{"env": "dev", "service": "payments-api", "user_id": 42, "card_number": "***REDACTED***", "amount": 99, "timestamp": "2026-05-16T12:34:56", "event": "charge attempted"}

The redact_secrets processor is the most useful one most projects don’t have. Run every log through it and you can’t accidentally ship raw tokens or PII to your log aggregator. The add_environment processor solves the “which deploy did this come from” question that always shows up during incident response.

Async, FastAPI, and Per-Request Context

In an async web app, the killer feature of structlog is contextvars-based logging — bind context once per request and every log line inside that request inherits it automatically, across await boundaries:

# File: fastapi_structlog.py
from uuid import uuid4
import structlog
from fastapi import FastAPI, Request

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

log = structlog.get_logger()
app = FastAPI()

@app.middleware("http")
async def log_context(request: Request, call_next):
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=str(uuid4()),
        path=request.url.path,
        method=request.method,
    )
    log.info("request_started")
    response = await call_next(request)
    log.info("request_completed", status=response.status_code)
    return response

@app.get("/items/{item_id}")
async def get_item(item_id: int):
    log.info("looking up item", item_id=item_id)  # request_id auto-included
    return {"item_id": item_id}

Every log line emitted from anywhere inside the request — your handler, a service called by the handler, a coroutine awaited deep inside — automatically carries request_id, path, and method. When something breaks at 3 AM, you grep one request_id across the log aggregator and see the full timeline of a single request.

Bound context follows the request across every await.
Bound context follows the request across every await.

Production Patterns and Performance

  • Configure once at startup. structlog.configure() should be called from your main entry point before anything else logs. Re-configuring at runtime is supported but confusing.
  • Cache the logger. Set cache_logger_on_first_use=True in configure(). Without it, structlog rebuilds the bound logger on every call, which is slow under load.
  • Use lazy formatting. structlog already does this for free — you pass kwargs, not f-strings. log.info("query ran", duration=t) is faster and safer than log.info(f"query ran {t}s").
  • Avoid logging in tight loops. JSON serialization isn’t free. If you’re logging inside a 1M-iteration loop, log a summary at the end instead.
  • Pin your processors in tests. Use structlog.testing.LogCapture as a test fixture to assert that specific events were logged with specific fields. Don’t grep stdout in tests.

FAQ

Q: structlog or python-json-logger — which one?
A: python-json-logger is a tiny adapter that just makes stdlib logging output JSON. structlog is a full structured-logging system with bound context, processor chains, and contextvars integration. For “I just want JSON”, use python-json-logger. For anything beyond that, structlog.

Q: Does structlog replace the stdlib logging module?
A: It can, but it doesn’t have to. The recommended setup is hybrid: structlog handles your code’s logs, stdlib logging keeps working for third-party libraries, and both end up in the same JSON output via the processor chain.

Q: How do I get structlog working with Django?
A: Configure structlog in settings.py alongside Django’s LOGGING dict. Use structlog.stdlib.LoggerFactory() as the factory so Django’s existing log levels and handlers still apply. Add structlog.contextvars.merge_contextvars as the first processor and bind request_id in middleware.

Q: Can I use structlog in scripts/CLIs, not just web apps?
A: Yes — structlog has no web-framework dependencies. The contextvars features are also useful in any async or threaded code, not just HTTP request handlers.

Q: What’s the performance cost vs stdlib logging?
A: With cache_logger_on_first_use=True and a JSON renderer, structlog is roughly 2-3x slower than bare logger.info("msg"). In practice, log emission is rarely a hotpath — you’ll feel it only if you’re logging inside a per-millisecond loop, which you shouldn’t be doing anyway.

Wrapping Up

structlog turns Python logging from “strings that you grep later” into “events that you query.” The two features that matter most are bind() (carry context forward) and the processor pipeline (filter, enrich, and redact in one place). Combined with contextvars integration in web apps, you get request-scoped log context for free — no manual passing of correlation IDs through every function.

The official structlog documentation has the full reference, including integrations with sentry, OpenTelemetry, and various web frameworks beyond FastAPI.