How To Use Python graphlib for Dependency Resolution

How To Use Python graphlib for Dependency Resolution

Intermediate

You have a list of tasks where some must run before others — database migrations before app startup, CSS before JavaScript, package A before package B. Keeping track of these dependencies manually is a recipe for mysterious bugs and circular import nightmares. What you really need is a topological sort: an ordering of nodes in a directed graph such that every dependency comes before the thing that depends on it.

Python 3.9 added the graphlib module to the standard library, giving you a clean, built-in way to solve exactly this problem. No third-party libraries, no reimplementing Kahn’s algorithm from scratch — just import TopologicalSorter and describe your dependency graph.

In this article, we will cover how graphlib.TopologicalSorter works, how to build and traverse dependency graphs, how to detect circular dependencies, and how to parallelize independent tasks. By the end you will have a solid understanding of topological sorting and a reusable pattern for dependency-driven task execution.

Python graphlib: Quick Example

Here is the fastest path from zero to a sorted dependency order using graphlib:

# quick_graphlib.py
from graphlib import TopologicalSorter

# Define a dependency graph: key depends on all values
graph = {
    "build": {"test"},
    "test": {"lint", "compile"},
    "lint": set(),
    "compile": {"lint"},
}

sorter = TopologicalSorter(graph)
order = list(sorter.static_order())
print("Build order:", order)

Output:

Build order: ['lint', 'compile', 'test', 'build']

The TopologicalSorter takes a dictionary where each key depends on its value set. static_order() returns a single valid execution order — no parallelism, just a flat sequence. This is the simplest mode and is perfect when you want a deterministic list.

The sections below cover more advanced use: dynamic mode for parallel scheduling, cycle detection, and real-world patterns for build systems and package managers.

What Is graphlib and Why Use It?

Topological sorting solves the question: “given a set of tasks with dependencies, in what order should I run them so every dependency is satisfied before the task that needs it?” It is named after topology because it works on directed acyclic graphs (DAGs) — graphs with no cycles.

Think of it like a university course planner. You cannot take Algorithms without taking Data Structures first, and you cannot take Data Structures without completing Introduction to Programming. A topological sort finds the valid sequence of courses to take.

ApproachProsCons
Manual orderingSimple for small graphsError-prone, doesn’t scale
Roll your own DFS/BFSFull control30+ lines, subtle bugs
graphlib.TopologicalSorterStandard library, clean API, cycle detectionPython 3.9+ only
NetworkXFull graph algorithmsThird-party dependency, heavy

For most dependency resolution use cases, graphlib hits the sweet spot: it is built in, readable, and handles cycle detection out of the box.

Building Dependency Graphs

The TopologicalSorter accepts a dictionary where each key is a node and each value is a set (or any iterable) of nodes that the key depends on. You can also build the graph incrementally using the add() method, which is useful when you discover dependencies at runtime.

# build_graph.py
from graphlib import TopologicalSorter

sorter = TopologicalSorter()

# Add nodes one at a time: add(node, *dependencies)
sorter.add("deploy", "build", "test")
sorter.add("build", "compile", "lint")
sorter.add("compile", "setup")
sorter.add("lint", "setup")
sorter.add("test", "build")
sorter.add("setup")  # no dependencies

order = list(sorter.static_order())
print("Deployment pipeline:", order)

Output:

Deployment pipeline: ['setup', 'compile', 'lint', 'build', 'test', 'deploy']

The add(node, *deps) signature is convenient when you are reading dependency data from a config file or database. You do not need to construct the full dictionary upfront — just call add() as you discover each dependency relationship.

Dependency graphs: when the order matters and your build system needs to know it.
Dependency graphs: when the order matters and your build system needs to know it.

Detecting Circular Dependencies

A topological sort is only possible on a directed acyclic graph — if you have a cycle (A depends on B depends on A), there is no valid ordering. graphlib raises a CycleError when it detects one, giving you the list of nodes involved in the cycle.

# cycle_detection.py
from graphlib import TopologicalSorter, CycleError

graph = {
    "A": {"B"},
    "B": {"C"},
    "C": {"A"},  # creates a cycle: A -> B -> C -> A
}

sorter = TopologicalSorter(graph)

try:
    order = list(sorter.static_order())
    print("Order:", order)
except CycleError as e:
    print(f"Circular dependency detected: {e}")
    print(f"Cycle involves: {e.args[1]}")

Output:

Circular dependency detected: ('nodes are in a cycle', ['A', 'C', 'B', 'A'])
Cycle involves: ['A', 'C', 'B', 'A']

The CycleError exception includes the list of nodes forming the cycle as e.args[1]. This makes it straightforward to surface a useful error message to the user — “Task A and Task C have a circular dependency” is far more actionable than a generic crash.

Dynamic Mode for Parallel Execution

The static_order() method gives you one valid sequence, but it is inherently serial. If some tasks can run in parallel (because they have no dependency between them), you are leaving performance on the table. graphlib provides a dynamic mode for exactly this use case.

In dynamic mode, you call prepare() first, then iterate using get_ready() and done(). The sorter yields batches of tasks that are currently ready to execute (all their dependencies have been marked done), allowing you to dispatch them concurrently.

# parallel_sorter.py
from graphlib import TopologicalSorter
import concurrent.futures
import time

def run_task(task_name):
    """Simulate a task with a short delay."""
    print(f"  Running: {task_name}")
    time.sleep(0.1)
    return task_name

graph = {
    "deploy": {"build", "test"},
    "build": {"lint", "compile"},
    "test": {"compile"},
    "compile": {"setup"},
    "lint": {"setup"},
    "setup": set(),
}

sorter = TopologicalSorter(graph)
sorter.prepare()

with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    while sorter.is_active():
        # Get all tasks whose dependencies are done
        for task in sorter.get_ready():
            future = pool.submit(run_task, task)
            futures[future] = task

        # Wait for any task to finish, then mark it done
        done_futures, _ = concurrent.futures.wait(
            futures, return_when=concurrent.futures.FIRST_COMPLETED
        )
        for future in done_futures:
            task = futures.pop(future)
            sorter.done(task)
            print(f"  Completed: {task}")

print("All tasks finished.")

Output:

  Running: setup
  Completed: setup
  Running: lint
  Running: compile
  Completed: lint
  Completed: compile
  Running: build
  Running: test
  Completed: test
  Completed: build
  Running: deploy
  Completed: deploy
All tasks finished.

The dynamic mode correctly dispatches lint and compile in parallel once setup finishes, then build and test in parallel once compile finishes. You get maximum parallelism automatically based on the graph structure.

get_ready() hands you independent tasks in batches. ThreadPoolExecutor handles the rest.
get_ready() hands you independent tasks in batches. ThreadPoolExecutor handles the rest.

Real-Life Example: Package Dependency Resolver

Let’s build a simplified package dependency resolver — the kind of logic that sits at the heart of pip, npm, and apt. Given a list of packages with their dependencies, it outputs a valid installation order and detects any circular dependencies.

# package_resolver.py
from graphlib import TopologicalSorter, CycleError
from dataclasses import dataclass, field
from typing import Dict, Set, List

@dataclass
class Package:
    name: str
    version: str
    requires: Set[str] = field(default_factory=set)

def resolve_install_order(packages: List[Package]) -> List[str]:
    """
    Given a list of packages, return a valid installation order
    that satisfies all dependencies.

    Raises CycleError if circular dependencies are detected.
    Raises ValueError if an unknown package is referenced.
    """
    known = {pkg.name for pkg in packages}

    # Validate: check all dependencies exist in the package list
    for pkg in packages:
        for dep in pkg.requires:
            if dep not in known:
                raise ValueError(
                    f"Package '{pkg.name}' depends on '{dep}', "
                    f"which is not in the package list"
                )

    # Build graph: each package depends on its requires set
    graph: Dict[str, Set[str]] = {
        pkg.name: pkg.requires for pkg in packages
    }

    try:
        sorter = TopologicalSorter(graph)
        return list(sorter.static_order())
    except CycleError as e:
        cycle_nodes = e.args[1]
        raise CycleError(
            f"Circular dependency detected involving: "
            f"{' -> '.join(cycle_nodes)}"
        )

# --- Example usage ---
packages = [
    Package("myapp",     "1.0.0", requires={"requests", "sqlalchemy"}),
    Package("requests",  "2.31.0", requires={"urllib3", "certifi"}),
    Package("sqlalchemy","2.0.0",  requires={"greenlet"}),
    Package("urllib3",   "2.0.7",  requires=set()),
    Package("certifi",   "2024.2", requires=set()),
    Package("greenlet",  "3.0.0",  requires=set()),
]

order = resolve_install_order(packages)
print("Installation order:")
for i, pkg in enumerate(order, 1):
    print(f"  {i}. {pkg}")

Output:

Installation order:
  1. urllib3
  2. certifi
  3. greenlet
  4. requests
  5. sqlalchemy
  6. myapp

This resolver validates that all referenced packages exist, builds the dependency graph from the package metadata, and uses TopologicalSorter to produce a valid installation order. The same pattern applies to build systems, CI pipeline stages, database migration runners, and microservice startup orchestrators. Extend it by adding version conflict detection or by switching to dynamic mode for parallel installs.

Dependency resolution: urllib3 before requests before myapp. Every time.
Dependency resolution: urllib3 before requests before myapp. Every time.

Frequently Asked Questions

What Python version does graphlib require?

The graphlib module was added in Python 3.9. If you are on Python 3.8 or earlier, you will need to either upgrade or implement topological sorting manually (using DFS or Kahn’s algorithm). On Python 3.9+, it is part of the standard library with no installation needed.

When should I use static_order() vs dynamic mode?

Use static_order() when you need a simple serial execution order and do not care about parallelism — it is one line and very readable. Use dynamic mode (prepare() + get_ready() + done()) when you want to run independent tasks concurrently. Dynamic mode requires more code but gives you maximum throughput on multi-core systems.

Are there multiple valid topological orders?

Yes. For most graphs there are several valid orderings. For example, if A and B both depend only on C, you can run A before B or B before A — both are correct. graphlib does not guarantee which valid order it returns, so do not write code that relies on a specific tie-breaking behavior. If you need a deterministic order among independent nodes, sort the node names before adding them to the graph.

How do I handle CycleError gracefully?

Catch graphlib.CycleError and read e.args[1] to get the list of nodes forming the cycle. Format this into a human-readable error message that tells the user which tasks are in a circular dependency. Never swallow the exception silently — a circular dependency is a configuration bug that must be fixed, not ignored.

Does graphlib scale to large graphs?

Yes. TopologicalSorter uses Kahn’s algorithm internally, which runs in O(V + E) time where V is the number of nodes and E is the number of edges. This scales linearly and handles graphs with thousands of nodes efficiently. For extremely large graphs (millions of nodes), consider a dedicated graph library like NetworkX, but for typical build system and dependency resolution use cases, graphlib is more than adequate.

Conclusion

The graphlib module gives you a clean, standard-library solution to dependency resolution. We covered TopologicalSorter with both static_order() for simple serial ordering and dynamic mode for parallel task scheduling. We used CycleError to catch circular dependencies before they cause subtle runtime bugs, and built a complete package dependency resolver as a real-world example.

Try extending the package resolver to detect version conflicts, or wire the parallel scheduler to asyncio tasks instead of threads. The dynamic mode API maps naturally to async coroutines — just replace the ThreadPoolExecutor with asyncio.gather.

For the full API reference, see the Python graphlib documentation.

How To Build a Dashboard with Python and Dash

How To Build a Dashboard with Python and Dash

Intermediate

You have a pandas DataFrame full of sales data, website metrics, or sensor readings, and you want to share it as an interactive dashboard — not a static chart, but something your team can filter, zoom, and explore. Dash is the Python framework for exactly this: it lets you build fully interactive web dashboards entirely in Python, no JavaScript required, and run them with a single command. The charts come from Plotly, the interactivity from Dash callbacks, and the layout from Dash’s HTML component library.

Install Dash with pip install dash, which also installs Plotly and the required Flask server. That is the only dependency. Every example in this tutorial runs with python app.py and opens in your browser at http://127.0.0.1:8050.

This tutorial covers Dash layouts with HTML and core components, building Plotly figures, connecting controls to charts with callbacks, chained callbacks for dependent dropdowns, and deploying a complete interactive sales dashboard. By the end you will have a working dashboard you can adapt to any dataset.

Your First Dash Dashboard: Quick Example

Here is the minimum Dash app — a bar chart with a dropdown to switch between datasets:

# quick_dash.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd

# Sample data
data = {
    'Month': ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
    'Sales': [120, 145, 132, 178, 156, 194],
    'Expenses': [90, 105, 98, 120, 115, 135],
}
df = pd.DataFrame(data)

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1("Monthly Metrics", style={'textAlign': 'center'}),
    dcc.Dropdown(
        id='metric-dropdown',
        options=[{'label': col, 'value': col} for col in ['Sales', 'Expenses']],
        value='Sales',
        clearable=False,
        style={'width': '300px', 'margin': '0 auto 20px'}
    ),
    dcc.Graph(id='metric-chart'),
])

@app.callback(Output('metric-chart', 'figure'), Input('metric-dropdown', 'value'))
def update_chart(metric):
    fig = px.bar(df, x='Month', y=metric, title=f'Monthly {metric}', color_discrete_sequence=['#636EFA'])
    fig.update_layout(plot_bgcolor='white', paper_bgcolor='white')
    return fig

if __name__ == '__main__':
    app.run(debug=True)

Run with python quick_dash.py then open http://127.0.0.1:8050. Output: A bar chart appears. Selecting “Expenses” in the dropdown immediately updates the chart with no page reload.

Every Dash app has the same three-part structure: app.layout defines the HTML structure using component objects, @app.callback decorators wire inputs to outputs, and app.run() starts the Flask development server. The callback receives the current dropdown value and returns a Plotly figure — Dash handles the JavaScript communication automatically.

Dash Architecture: Layout and Callbacks

Understanding how Dash separates layout from logic makes building complex dashboards straightforward:

LayerWhat It DoesKey Components
LayoutDefines the HTML structurehtml.Div, html.H1, html.P, html.Table
Core ComponentsInteractive input/output widgetsdcc.Graph, dcc.Dropdown, dcc.Slider, dcc.DatePickerRange
CallbacksPython functions that update components@app.callback with Input(), Output(), State()
FiguresPlotly chart objects returned by callbackspx.bar, px.line, px.scatter, go.Figure

The key design rule: every interactive element has an id, and callbacks reference those IDs. When an Input component’s value changes, Dash automatically calls the decorated function with the new value and updates the Output component with the return value. This pattern scales from simple single-input charts to complex dashboards with dozens of linked components.

Cache Katie at Dash dashboard control panel managing live metrics
Cache Katie at the controls — real-time metrics at a glance.

Building Layouts

Dash layouts use Python objects that mirror HTML elements. Wrap multiple components in rows and columns for a grid layout:

# layout_demo.py
import dash
from dash import dcc, html
import plotly.express as px

app = dash.Dash(__name__)

app.layout = html.Div(style={'fontFamily': 'Arial, sans-serif', 'maxWidth': '1200px', 'margin': '0 auto'}, children=[
    # Header row
    html.Div(style={'background': '#1a1a2e', 'color': 'white', 'padding': '20px', 'marginBottom': '20px'}, children=[
        html.H1("Sales Dashboard", style={'margin': 0}),
        html.P("Q2 2026 Performance Overview", style={'margin': 0, 'opacity': 0.7}),
    ]),

    # KPI row -- three summary cards
    html.Div(style={'display': 'flex', 'gap': '20px', 'marginBottom': '20px'}, children=[
        html.Div(style={'flex': 1, 'background': '#f0f8ff', 'padding': '20px', 'borderRadius': '8px', 'border': '1px solid #cce5ff'}, children=[
            html.H2("$284,500", style={'margin': 0, 'color': '#0066cc'}),
            html.P("Total Revenue", style={'margin': 0, 'color': '#666'}),
        ]),
        html.Div(style={'flex': 1, 'background': '#f0fff0', 'padding': '20px', 'borderRadius': '8px', 'border': '1px solid #b3ffb3'}, children=[
            html.H2("1,842", style={'margin': 0, 'color': '#009900'}),
            html.P("Total Orders", style={'margin': 0, 'color': '#666'}),
        ]),
        html.Div(style={'flex': 1, 'background': '#fff8f0', 'padding': '20px', 'borderRadius': '8px', 'border': '1px solid #ffd9b3'}, children=[
            html.H2("$154.45", style={'margin': 0, 'color': '#cc6600'}),
            html.P("Avg Order Value", style={'margin': 0, 'color': '#666'}),
        ]),
    ]),

    # Chart area
    dcc.Graph(id='main-chart', figure=px.bar(
        x=['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
        y=[42000, 48000, 51000, 49000, 46000, 52000],
        labels={'x': 'Month', 'y': 'Revenue ($)'},
        title='Monthly Revenue Q2 2026',
    )),
])

if __name__ == '__main__':
    app.run(debug=True)

Inline styles use Python dicts instead of CSS strings — style={'fontSize': '14px', 'color': '#333'} — because Dash components are Python objects. For production dashboards, move styles to an external CSS file placed in the assets/ folder next to your script; Dash automatically serves files from that folder.

Multi-Input Callbacks and Filtering

Callbacks can take multiple inputs and update multiple outputs simultaneously. Here is a dashboard with a date range filter and category selector that both affect the same chart:

# multi_input_dash.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd
import numpy as np

np.random.seed(42)
dates = pd.date_range('2026-01-01', periods=180, freq='D')
df = pd.DataFrame({
    'date': dates,
    'revenue': np.random.randint(1000, 5000, 180),
    'category': np.random.choice(['Electronics', 'Clothing', 'Books'], 180),
    'region': np.random.choice(['North', 'South', 'East', 'West'], 180),
})

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H2("Sales Explorer"),
    html.Div(style={'display': 'flex', 'gap': '20px', 'marginBottom': '20px'}, children=[
        html.Div([
            html.Label("Category"),
            dcc.Dropdown(
                id='category-filter',
                options=[{'label': 'All', 'value': 'All'}] + [{'label': c, 'value': c} for c in df['category'].unique()],
                value='All', clearable=False,
            )
        ], style={'flex': 1}),
        html.Div([
            html.Label("Region"),
            dcc.Dropdown(
                id='region-filter',
                options=[{'label': 'All', 'value': 'All'}] + [{'label': r, 'value': r} for r in df['region'].unique()],
                value='All', clearable=False,
            )
        ], style={'flex': 1}),
    ]),
    dcc.Graph(id='revenue-chart'),
    html.Div(id='stats-box', style={'padding': '10px', 'background': '#f9f9f9', 'borderRadius': '4px'}),
])

@app.callback(
    Output('revenue-chart', 'figure'),
    Output('stats-box', 'children'),
    Input('category-filter', 'value'),
    Input('region-filter', 'value'),
)
def update_dashboard(category, region):
    filtered = df.copy()
    if category != 'All':
        filtered = filtered[filtered['category'] == category]
    if region != 'All':
        filtered = filtered[filtered['region'] == region]

    daily = filtered.groupby('date')['revenue'].sum().reset_index()
    fig = px.line(daily, x='date', y='revenue', title=f'Daily Revenue -- {category} / {region}')
    fig.update_traces(fill='tozeroy', line_color='#636EFA')

    stats = f"Total: ${filtered['revenue'].sum():,.0f} | Orders: {len(filtered)} | Avg/day: ${daily['revenue'].mean():,.0f}"
    return fig, stats

if __name__ == '__main__':
    app.run(debug=True)

When multiple Input components are listed, the callback receives all their current values as arguments in the same order. The callback fires whenever any input changes. Returning a tuple from the callback populates the multiple Output components in matching order — the figure goes to revenue-chart and the stats string goes to stats-box.

Pyro Pete triggering Dash callbacks by pulling interactive levers
Pete pulls the levers — callbacks fire and charts update live.

Real-Life Example: Interactive Sales Dashboard

Here is a complete sales dashboard with linked charts, KPI cards that update with filters, and a data table — production-ready in under 100 lines:

# sales_dashboard.py
import dash
from dash import dcc, html, dash_table
from dash.dependencies import Input, Output
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np

np.random.seed(0)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
categories = ['Electronics', 'Clothing', 'Books', 'Toys']
rows = []
for m in months:
    for c in categories:
        rows.append({'Month': m, 'Category': c, 'Revenue': np.random.randint(5000, 20000), 'Units': np.random.randint(50, 300)})
df = pd.DataFrame(rows)

app = dash.Dash(__name__)

app.layout = html.Div(style={'fontFamily': 'Arial', 'padding': '20px'}, children=[
    html.H1("Sales Dashboard 2026", style={'textAlign': 'center'}),
    dcc.Dropdown(id='cat-filter', options=[{'label': 'All Categories', 'value': 'All'}] +
        [{'label': c, 'value': c} for c in categories], value='All', clearable=False,
        style={'width': '300px', 'marginBottom': '20px'}),
    html.Div(id='kpi-row', style={'display': 'flex', 'gap': '15px', 'marginBottom': '20px'}),
    html.Div(style={'display': 'flex', 'gap': '20px'}, children=[
        dcc.Graph(id='revenue-bar', style={'flex': 1}),
        dcc.Graph(id='units-pie', style={'flex': 1}),
    ]),
    dash_table.DataTable(id='data-table', page_size=8,
        style_header={'backgroundColor': '#1a1a2e', 'color': 'white'},
        style_data_conditional=[{'if': {'row_index': 'odd'}, 'backgroundColor': '#f9f9f9'}]),
])

@app.callback(
    Output('kpi-row', 'children'), Output('revenue-bar', 'figure'),
    Output('units-pie', 'figure'), Output('data-table', 'data'),
    Output('data-table', 'columns'), Input('cat-filter', 'value')
)
def update_all(category):
    d = df if category == 'All' else df[df['Category'] == category]
    total_rev = d['Revenue'].sum()
    total_units = d['Units'].sum()
    avg_rev = d.groupby('Month')['Revenue'].sum().mean()

    kpi_style = {'flex': 1, 'background': '#eef2ff', 'padding': '15px', 'borderRadius': '8px', 'textAlign': 'center'}
    kpis = [
        html.Div([html.H3(f"${total_rev:,.0f}"), html.P("Total Revenue")], style=kpi_style),
        html.Div([html.H3(f"{total_units:,}"), html.P("Total Units")], style=kpi_style),
        html.Div([html.H3(f"${avg_rev:,.0f}"), html.P("Avg Monthly Revenue")], style=kpi_style),
    ]

    monthly = d.groupby('Month')['Revenue'].sum().reindex(months).reset_index()
    bar_fig = px.bar(monthly, x='Month', y='Revenue', title='Revenue by Month', color_discrete_sequence=['#636EFA'])

    cat_totals = d.groupby('Category')['Units'].sum().reset_index()
    pie_fig = px.pie(cat_totals, names='Category', values='Units', title='Units by Category')

    cols = [{'name': c, 'id': c} for c in d.columns]
    return kpis, bar_fig, pie_fig, d.to_dict('records'), cols

if __name__ == '__main__':
    app.run(debug=True)

This dashboard updates all five outputs (KPIs, bar chart, pie chart, table data, table columns) from a single callback triggered by the category dropdown. The dash_table.DataTable component provides sortable, paginated tabular data with no extra code. Run it, select “Electronics”, and every component immediately filters to show only electronics data.

Frequently Asked Questions

How do I deploy a Dash app to production?

Dash runs on Flask, so any Flask deployment method works: Gunicorn (gunicorn app:server), Docker, or platform-as-a-service hosts like Heroku, Render, or Railway. Export the Flask server with server = app.server and point Gunicorn at it. For production, set debug=False and use environment variables for any credentials. Dash Enterprise (commercial) provides managed hosting with authentication built in.

How do I read an input value without triggering a callback?

Use State instead of Input for components you want to read without triggering updates: from dash.dependencies import State. A State value is passed to the callback only when an Input changes, not when the State component itself changes. This is useful for “submit” button patterns where you want to read several fields only when the button is clicked.

How do I update a chart with live data?

Use dcc.Interval as a callback input: dcc.Interval(id='timer', interval=5000) fires an event every 5 seconds. Add it to your layout, then use Input('timer', 'n_intervals') in your callback. The n_intervals value increments each time the interval fires, triggering your data refresh callback. This pattern works for polling a database, an API endpoint, or reading a log file.

How do I build a multi-page Dash app?

Use Dash Pages (built into Dash 2.5+): create a pages/ folder and add one Python file per page, each calling dash.register_page(__name__, path='/my-page'). Set app = dash.Dash(__name__, use_pages=True) in your main file and include dash.page_container in the layout. Dash automatically generates routing for all registered pages without any URL configuration.

How do I apply a consistent theme across my dashboard?

Use the dash-bootstrap-components library (pip install dash-bootstrap-components) for Bootstrap-based grid layouts and pre-styled components: app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]). Alternatively, put a CSS file in the assets/ folder — Dash serves everything in that folder automatically. For consistent Plotly chart theming, set plotly.io.templates.default = 'plotly_white' once at the top of your script.

Conclusion

Dash lets you build fully interactive data dashboards in pure Python. In this tutorial you built a dropdown-driven bar chart, applied multi-input filtering to a line chart with stats, and assembled a complete sales dashboard with KPI cards, linked charts, and a data table — all updated by a single callback. The entire dashboard works without writing a single line of JavaScript.

The sales dashboard is ready to extend — add a date range picker with dcc.DatePickerRange, a live refresh interval, an export-to-CSV button, or user authentication with Dash’s built-in dash-auth library. Each extension adds one new component and one new callback, following the same Input/Output pattern you learned here.

Official documentation: Dash — Python framework for building analytical web apps.

How To Use Python aiosqlite for Async SQLite

How To Use Python aiosqlite for Async SQLite

Intermediate

SQLite is the perfect database for small to medium applications — no server to run, no configuration, just a file. But Python’s built-in sqlite3 module is synchronous: every query blocks until the database responds. In an async application built with asyncio, aiohttp, or FastAPI, that blocking call halts the entire event loop for every database operation. aiosqlite wraps Python’s sqlite3 module in an async interface, letting your database queries run without freezing everything else.

Install aiosqlite with pip install aiosqlite. There is nothing else to configure — it uses the same SQLite file your synchronous code already uses. The API closely mirrors sqlite3, so if you have used that module before, the transition is mostly adding async def, await, and async with in the right places.

This tutorial covers opening async connections, executing queries, fetching results, using transactions, handling row factories for dict-like access, and building a complete async task tracker backed by SQLite. By the end, you will know how to integrate aiosqlite into any asyncio-based project without blocking the event loop.

Async SQLite in 15 Lines: Quick Example

Here is the minimum aiosqlite script — create a table, insert a row, and read it back, all asynchronously:

# quick_aiosqlite.py
import asyncio
import aiosqlite

async def main():
    async with aiosqlite.connect('quick_demo.db') as db:
        await db.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )
        ''')
        await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ('Alice', 'alice@example.com'))
        await db.commit()

        async with db.execute("SELECT id, name, email FROM users") as cursor:
            async for row in cursor:
                print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")

asyncio.run(main())

Output:

ID: 1, Name: Alice, Email: alice@example.com

Three patterns appear in every aiosqlite script: async with aiosqlite.connect() opens and auto-closes the connection, await db.execute() runs SQL without blocking, and await db.commit() persists changes. Queries that return rows use async with db.execute() to get a cursor you iterate with async for.

aiosqlite vs sqlite3 vs Other Async Databases

LibraryDatabaseI/O ModelBest For
aiosqliteSQLite (file)asyncioAsync apps needing local/embedded DB
sqlite3SQLite (file)SynchronousScripts, Django, Flask
databasesSQLite/Postgres/MySQLasyncioSQLAlchemy Core queries, async
asyncpgPostgreSQL onlyasyncioHigh-performance async Postgres

aiosqlite is the right choice when you want SQLite’s zero-configuration simplicity in an async application. For production systems that need concurrent writes, PostgreSQL with asyncpg or motor will outperform SQLite’s single-writer model. But for bots, CLI tools, personal apps, and microservices with low write concurrency, aiosqlite gives you async SQLite without any infrastructure setup.

Async CRUD Operations

Here is a complete CRUD implementation with a row factory for dict-style access and proper error handling:

# aiosqlite_crud.py
import asyncio
import aiosqlite

DB_PATH = 'products.db'

async def init_db(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            stock INTEGER DEFAULT 0
        )
    ''')
    await db.commit()

async def create_product(db, name, price, stock=0):
    cursor = await db.execute(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        (name, price, stock)
    )
    await db.commit()
    return cursor.lastrowid

async def get_product(db, product_id):
    async with db.execute("SELECT * FROM products WHERE id = ?", (product_id,)) as cur:
        row = await cur.fetchone()
        if row is None:
            return None
        return dict(zip([col[0] for col in cur.description], row))

async def list_products(db, max_price=None):
    sql = "SELECT * FROM products"
    params = ()
    if max_price is not None:
        sql += " WHERE price <= ?"
        params = (max_price,)
    sql += " ORDER BY name"
    async with db.execute(sql, params) as cur:
        cols = [c[0] for c in cur.description]
        return [dict(zip(cols, row)) async for row in cur]

async def update_price(db, product_id, new_price):
    await db.execute("UPDATE products SET price = ? WHERE id = ?", (new_price, product_id))
    await db.commit()

async def delete_product(db, product_id):
    await db.execute("DELETE FROM products WHERE id = ?", (product_id,))
    await db.commit()

async def main():
    async with aiosqlite.connect(DB_PATH) as db:
        await init_db(db)

        id1 = await create_product(db, 'Widget', 9.99, 100)
        id2 = await create_product(db, 'Gadget', 24.99, 50)
        id3 = await create_product(db, 'Doohickey', 4.49, 200)
        print(f"Created products: {id1}, {id2}, {id3}")

        product = await get_product(db, id1)
        print(f"Fetched: {product}")

        cheap = await list_products(db, max_price=10.0)
        print(f"Products under $10: {[p['name'] for p in cheap]}")

        await update_price(db, id1, 7.99)
        updated = await get_product(db, id1)
        print(f"Updated Widget price: ${updated['price']}")

        await delete_product(db, id3)
        all_products = await list_products(db)
        print(f"Remaining products: {[p['name'] for p in all_products]}")

asyncio.run(main())

Output:

Created products: 1, 2, 3
Fetched: {'id': 1, 'name': 'Widget', 'price': 9.99, 'stock': 100}
Products under $10: ['Doohickey', 'Widget']
Updated Widget price: $7.99
Remaining products: ['Gadget', 'Widget']
Debug Dee examining aiosqlite async database results methodically
Debug Dee inspects each row — aiosqlite keeps async queries clean.

Transactions and Batch Inserts

aiosqlite handles transactions explicitly. For batch inserts, wrapping everything in a single transaction is dramatically faster than committing after each row:

# aiosqlite_transactions.py
import asyncio
import aiosqlite
import time

async def insert_batch(db, records):
    """Insert a list of (name, price, stock) tuples in one transaction."""
    await db.executemany(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        records
    )
    await db.commit()

async def main():
    async with aiosqlite.connect(':memory:') as db:  # in-memory for demo
        await db.execute('''
            CREATE TABLE products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT, price REAL, stock INTEGER
            )
        ''')

        # Generate 10,000 sample products
        records = [(f'Product_{i}', round(i * 0.99, 2), i % 100) for i in range(1, 10001)]

        start = time.perf_counter()
        await insert_batch(db, records)
        elapsed = time.perf_counter() - start

        async with db.execute("SELECT COUNT(*) FROM products") as cur:
            count = (await cur.fetchone())[0]
        print(f"Inserted {count:,} rows in {elapsed:.3f}s")

        # Transaction with rollback on error
        try:
            async with db.execute("BEGIN"):
                await db.execute("UPDATE products SET price = price * 0.9 WHERE stock > 50")
                # Simulate an error condition
                raise ValueError("Simulated error -- rolling back")
                await db.commit()
        except ValueError as e:
            await db.rollback()
            print(f"Transaction rolled back: {e}")

asyncio.run(main())

Output:

Inserted 10,000 rows in 0.041s
Transaction rolled back: Simulated error -- rolling back

executemany() with a single commit is the standard pattern for bulk inserts -- it sends all rows in one database round-trip instead of thousands of individual commits. The async with db.execute("BEGIN") pattern starts an explicit transaction block; call await db.rollback() in the except clause to undo all changes if anything fails mid-transaction.

Real-Life Example: Async Task Tracker

Here is a practical async task tracker built on aiosqlite, showing concurrent queries and the row factory pattern in a complete working application:

# async_task_tracker.py
import asyncio
import aiosqlite
from datetime import datetime

DB = 'tasks.db'

async def setup(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            status TEXT DEFAULT 'pending',
            created_at TEXT NOT NULL,
            completed_at TEXT
        )
    ''')
    db.row_factory = aiosqlite.Row  # dict-like row access
    await db.commit()

async def add_task(db, title):
    cur = await db.execute(
        "INSERT INTO tasks (title, created_at) VALUES (?, ?)",
        (title, datetime.now().isoformat())
    )
    await db.commit()
    return cur.lastrowid

async def complete_task(db, task_id):
    await db.execute(
        "UPDATE tasks SET status='done', completed_at=? WHERE id=?",
        (datetime.now().isoformat(), task_id)
    )
    await db.commit()

async def get_summary(db):
    async with db.execute('''
        SELECT status, COUNT(*) as count FROM tasks GROUP BY status
    ''') as cur:
        return {row['status']: row['count'] async for row in cur}

async def list_pending(db):
    async with db.execute("SELECT id, title, created_at FROM tasks WHERE status='pending'") as cur:
        return [dict(row) async for row in cur]

async def main():
    async with aiosqlite.connect(DB) as db:
        await setup(db)

        # Add tasks concurrently
        ids = await asyncio.gather(
            add_task(db, 'Write unit tests'),
            add_task(db, 'Fix login bug'),
            add_task(db, 'Update README'),
            add_task(db, 'Deploy to staging'),
        )
        print(f"Added tasks: {ids}")

        # Complete two tasks
        await asyncio.gather(complete_task(db, ids[0]), complete_task(db, ids[2]))

        pending = await list_pending(db)
        summary = await get_summary(db)

        print(f"\nSummary: {summary}")
        print("Pending tasks:")
        for t in pending:
            print(f"  [{t['id']}] {t['title']}")

asyncio.run(main())

Output:

Added tasks: [1, 2, 3, 4]

Summary: {'pending': 2, 'done': 2}
Pending tasks:
  [2] Fix login bug
  [4] Deploy to staging
Stack Trace Steve checking off aiosqlite transaction commits
Steve ticks each commit — async transactions done right.

Frequently Asked Questions

How does aiosqlite avoid blocking the event loop?

aiosqlite runs all SQLite operations in a separate thread using Python's concurrent.futures.ThreadPoolExecutor. The await keyword yields control back to the event loop while the thread executes the blocking sqlite3 call. This means aiosqlite does not use true async I/O at the OS level -- it uses thread offloading. For most SQLite use cases this is perfectly adequate, but it does mean you are limited by the thread pool size for very high concurrency workloads.

How do I get dict-like row access instead of tuples?

Set db.row_factory = aiosqlite.Row after opening the connection. Then rows support both index access (row[0]) and key access (row['name']). Convert to a plain dict with dict(row) when you need JSON serialization. Alternatively, build the dict manually from cursor.description: dict(zip([c[0] for c in cursor.description], row)).

Can multiple coroutines share one aiosqlite connection?

Yes, but with caution. aiosqlite serializes all operations through its internal thread, so concurrent writes are safe from a thread-safety perspective. However, SQLite itself only allows one writer at a time -- concurrent write operations will serialize at the database level. For read-heavy workloads this is fine; for write-heavy concurrent workloads, consider PostgreSQL instead. Keep one connection per event loop rather than opening connections per coroutine.

Can I use an in-memory database with aiosqlite?

Yes: aiosqlite.connect(':memory:') creates an in-memory database that exists only for the lifetime of the connection. This is useful for tests -- each test function opens its own ':memory:' connection for a clean slate with no file cleanup needed. Note that an in-memory database cannot be shared between connections; each aiosqlite.connect(':memory:') call creates a completely separate database.

How do I handle schema migrations with aiosqlite?

For simple projects, use CREATE TABLE IF NOT EXISTS and ALTER TABLE ADD COLUMN IF NOT EXISTS in your startup script. For production apps with complex schemas, use the alembic migration tool with the synchronous sqlite3 driver (alembic does not need to be async -- run migrations at startup before launching the async event loop). Store the current schema version in a schema_version table and apply pending migrations sequentially.

Conclusion

aiosqlite brings SQLite into the async Python world with minimal friction. In this tutorial you used aiosqlite.connect() as an async context manager, performed CRUD with parameterized queries, ran batch inserts with executemany(), managed transactions with commit and rollback, and built a concurrent async task tracker with dict-like row access via aiosqlite.Row.

The task tracker is a solid base to extend -- add priority columns, due dates, a web interface via FastAPI, or push notifications when tasks are completed. All of these fit naturally into the async CRUD patterns you learned here without any synchronous blocking calls.

Official documentation: aiosqlite -- async bridge to sqlite3.

Why Async SQLite

sqlite3 in Python is synchronous — every query blocks the calling thread. In an async web framework (FastAPI, aiohttp), that blocks the event loop and stalls every other request. aiosqlite wraps sqlite3 with async APIs that yield to the loop during disk I/O:

# pip install aiosqlite

import asyncio
import aiosqlite

async def main():
    async with aiosqlite.connect("app.db") as db:
        await db.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
        await db.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
        await db.commit()

        async with db.execute("SELECT * FROM users") as cursor:
            async for row in cursor:
                print(row)

asyncio.run(main())

The API mirrors sqlite3 — same methods, same parameter style, just await-able. Migration from sync code is mechanical.

Connection Pooling Pattern

For an async web app, opening a new connection per request is wasteful. Use a connection pool — there's no official pool, but the pattern is straightforward with asyncio.Queue:

import asyncio
import aiosqlite
from contextlib import asynccontextmanager

class SQLitePool:
    def __init__(self, path: str, size: int = 5):
        self.path = path
        self.size = size
        self._queue: asyncio.Queue = asyncio.Queue(maxsize=size)
        self._closed = False

    async def init(self):
        for _ in range(self.size):
            con = await aiosqlite.connect(self.path)
            await con.execute("PRAGMA journal_mode=WAL")
            await self._queue.put(con)

    @asynccontextmanager
    async def acquire(self):
        con = await self._queue.get()
        try:
            yield con
        finally:
            await self._queue.put(con)

    async def close(self):
        self._closed = True
        while not self._queue.empty():
            con = await self._queue.get()
            await con.close()

# Usage
pool = SQLitePool("app.db", size=5)

async def get_user(user_id: int):
    async with pool.acquire() as con:
        async with con.execute("SELECT * FROM users WHERE id = ?", (user_id,)) as cur:
            return await cur.fetchone()

Row Factories

Same row_factory trick as sync sqlite3 — set it for dict-like access:

async with aiosqlite.connect("app.db") as db:
    db.row_factory = aiosqlite.Row
    async with db.execute("SELECT id, name, email FROM users") as cur:
        rows = await cur.fetchall()
        for row in rows:
            print(row["name"], row["email"])

Transactions

aiosqlite supports the same transaction patterns as sqlite3 but as async context managers:

async with aiosqlite.connect("app.db") as db:
    try:
        await db.execute("INSERT INTO accounts (id, balance) VALUES (?, ?)", (1, 1000))
        await db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
        await db.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
        await db.commit()
    except Exception:
        await db.rollback()
        raise

FastAPI Integration

The classic pattern: pool initialized at startup, injected via Depends, released after the request:

from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.pool = SQLitePool("app.db", size=10)
    await app.state.pool.init()
    yield
    await app.state.pool.close()

app = FastAPI(lifespan=lifespan)

async def get_db():
    async with app.state.pool.acquire() as con:
        yield con

@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)):
    async with db.execute("SELECT * FROM users WHERE id = ?", (user_id,)) as cur:
        row = await cur.fetchone()
        return dict(row) if row else {"error": "not found"}

Common Pitfalls

  • One global connection. aiosqlite connections aren't safe across concurrent coroutines. Use a pool or per-coroutine connection.
  • Forgetting commit() in async. Same as sync sqlite3 — writes don't persist without await db.commit().
  • WAL mode confusion. WAL helps when reads and writes happen on different connections. With a single connection, you don't benefit much.
  • Blocking pragmas at connect time. Some PRAGMAs are slow (full integrity check). Set fast ones (journal_mode, foreign_keys) at connect; defer expensive ones to maintenance windows.
  • aiosqlite vs SQLAlchemy async. SQLAlchemy 2.0 has async support — use it for complex queries with ORM benefits. aiosqlite is leaner for raw SQL.

FAQ

Q: aiosqlite or async SQLAlchemy?
A: aiosqlite for raw SQL with minimal overhead. SQLAlchemy async when you have models, relationships, and migrations.

Q: Is aiosqlite faster than sqlite3?
A: Same speed for individual queries — it wraps the same SQLite engine. The win is concurrency: while one query is on disk, the event loop runs other work.

Q: Does it work with WAL mode?
A: Yes — enable PRAGMA journal_mode=WAL at connect time. Multiple connections to the same DB can read while a writer is active.

Q: Can I use it with Django async?
A: Django has its own async ORM. For raw aiosqlite in Django, wrap calls in sync_to_async or use SQLAlchemy parallel to Django.

Q: What's the performance ceiling?
A: Several thousand queries/sec on commodity hardware. For higher concurrent writes, move to PostgreSQL.

Wrapping Up

aiosqlite is the right SQLite library for async Python applications. Same API as sqlite3, same correctness guarantees, just non-blocking. Add a small connection pool, enable WAL mode, and you have a database layer that handles thousands of concurrent FastAPI requests without breaking a sweat.

How To Use Python motor for Async MongoDB Operations

How To Use Python motor for Async MongoDB Operations

Intermediate

You are building an async Python service with FastAPI or aiohttp and you need MongoDB as your database. The standard PyMongo driver is synchronous — every database call blocks the event loop, turning your async application into an accidentally single-threaded one. Motor is the official async MongoDB driver for Python: it wraps PyMongo with a non-blocking interface designed for asyncio, so your database queries and your API handlers can run concurrently without blocking each other.

Install motor with pip install motor. You will also need a running MongoDB instance — docker run -d -p 27017:27017 mongo:7 starts one locally with no authentication. For cloud MongoDB, use a free Atlas cluster at mongodb.net; the connection string works identically in motor. PyMongo is installed automatically as motor’s dependency.

This tutorial covers motor’s async CRUD operations, querying with filters and projections, aggregation pipelines, indexes, bulk writes, and change streams. You will finish by building an async product catalogue API with FastAPI and motor that handles concurrent reads without blocking.

Async Insert and Query: Quick Example

Here is the minimal motor workflow — connect, insert a document, and query it back, all with async/await:

# quick_motor.py
import asyncio
import motor.motor_asyncio

async def main():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    db = client['mydb']
    collection = db['users']

    # Insert one document
    result = await collection.insert_one({'name': 'Alice', 'age': 30, 'role': 'admin'})
    print(f"Inserted ID: {result.inserted_id}")

    # Find it back
    user = await collection.find_one({'name': 'Alice'})
    print(f"Found: {user['name']}, age {user['age']}, role {user['role']}")

    client.close()

asyncio.run(main())

Output:

Inserted ID: 664f3a1b2c8d4e5f6a7b8c9d
Found: Alice, age 30, role admin

Motor’s API mirrors PyMongo almost exactly — the key difference is that every I/O operation returns a coroutine you must await. The client and collection objects are synchronous Python objects; only the database operations require await. This design means migrating from PyMongo to motor is usually straightforward: add async def and await to the functions that call the database.

Motor vs PyMongo vs Other Drivers

Choosing the right MongoDB driver depends on whether your application is async or sync:

DriverI/O ModelBest ForInstall
motorasyncio (non-blocking)FastAPI, aiohttp, async servicespip install motor
pymongoSynchronous (blocking)Django, Flask, scripts, CLIspip install pymongo
beanieasyncio + ODM (models)When you want Pydantic-based modelspip install beanie
mongoengineSynchronous + ODMDjango-like ORM on MongoDBpip install mongoengine

Use motor whenever your web framework is async. Using PyMongo in a FastAPI handler does not raise an error immediately, but it blocks the event loop during every database call, effectively serializing all requests through a single-threaded bottleneck. Motor keeps each query non-blocking, letting your async framework handle hundreds of concurrent connections efficiently.

CRUD Operations

Motor’s CRUD methods are direct async equivalents of PyMongo’s methods. Here is a complete reference implementation covering insert, find, update, and delete:

# motor_crud.py
import asyncio
import motor.motor_asyncio

async def crud_demo():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    col = client['demo']['products']

    # --- CREATE ---
    # insert_one: single document
    r = await col.insert_one({'name': 'Widget', 'price': 9.99, 'stock': 100})
    widget_id = r.inserted_id

    # insert_many: multiple documents at once
    await col.insert_many([
        {'name': 'Gadget', 'price': 24.99, 'stock': 50},
        {'name': 'Doohickey', 'price': 4.49, 'stock': 200},
    ])
    print("Inserted 3 products")

    # --- READ ---
    # find_one: returns first match or None
    product = await col.find_one({'name': 'Widget'})
    print(f"Widget price: ${product['price']}")

    # find: returns an async cursor
    async for p in col.find({'price': {'$lt': 10}}):
        print(f"  Under $10: {p['name']} @ ${p['price']}")

    # find with projection (only return name and price)
    async for p in col.find({}, {'_id': 0, 'name': 1, 'price': 1}):
        print(f"  {p['name']}: ${p['price']}")

    # --- UPDATE ---
    # update_one: modify first matching document
    result = await col.update_one({'name': 'Widget'}, {'$set': {'price': 7.99}, '$inc': {'stock': -1}})
    print(f"Modified: {result.modified_count} document(s)")

    # update_many: modify all matching documents
    result = await col.update_many({'stock': {'$gt': 100}}, {'$set': {'on_sale': True}})
    print(f"Sale items updated: {result.modified_count}")

    # --- DELETE ---
    result = await col.delete_one({'name': 'Doohickey'})
    print(f"Deleted: {result.deleted_count} document(s)")

    # cleanup
    await col.drop()
    client.close()

asyncio.run(crud_demo())

Output:

Inserted 3 products
Widget price: $9.99
  Under $10: Widget @ $9.99
  Widget: $9.99
  Gadget: $24.99
  Doohickey: $4.49
Modified: 1 document(s)
Sale items updated: 1
Deleted: 1 document(s)
Sudo Sam at MongoDB async filing cabinet using motor
Sudo Sam opens the async MongoDB cabinet — motor does the rest.

Aggregation Pipelines

MongoDB’s aggregation pipeline is motor’s killer feature for analytics — you can group, filter, sort, and reshape data entirely in the database, returning only the results your application needs:

# motor_aggregation.py
import asyncio
import motor.motor_asyncio

async def aggregation_demo():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    col = client['store']['orders']

    # Seed data
    await col.drop()
    await col.insert_many([
        {'product': 'Widget', 'qty': 3, 'price': 9.99, 'region': 'North'},
        {'product': 'Gadget', 'qty': 1, 'price': 24.99, 'region': 'South'},
        {'product': 'Widget', 'qty': 2, 'price': 9.99, 'region': 'South'},
        {'product': 'Gadget', 'qty': 4, 'price': 24.99, 'region': 'North'},
        {'product': 'Widget', 'qty': 5, 'price': 9.99, 'region': 'North'},
    ])

    # Pipeline: total revenue and units sold per product
    pipeline = [
        {'$group': {
            '_id': '$product',
            'total_revenue': {'$sum': {'$multiply': ['$qty', '$price']}},
            'units_sold': {'$sum': '$qty'},
            'order_count': {'$sum': 1},
        }},
        {'$sort': {'total_revenue': -1}},
        {'$project': {
            'product': '$_id',
            'total_revenue': {'$round': ['$total_revenue', 2]},
            'units_sold': 1,
            'order_count': 1,
            '_id': 0,
        }},
    ]

    print("Sales summary:")
    async for doc in col.aggregate(pipeline):
        print(f"  {doc['product']}: ${doc['total_revenue']} ({doc['units_sold']} units, {doc['order_count']} orders)")

    await col.drop()
    client.close()

asyncio.run(aggregation_demo())

Output:

Sales summary:
  Gadget: $124.95 (5 units, 2 orders)
  Widget: $99.90 (10 units, 3 orders)

The aggregation cursor is async, so you iterate it with async for exactly like the regular find cursor. Pipelines run entirely inside MongoDB — no Python-side grouping or sorting needed. For large datasets this is dramatically faster than fetching all documents and processing them in Python.

Real-Life Example: Async Product Catalogue API

Let us build a FastAPI product catalogue with motor as the database backend. This shows how motor integrates with a real async web framework, including startup/shutdown lifecycle and concurrent query handling:

# catalogue_api.py
# Install: pip install fastapi uvicorn motor
# Run: uvicorn catalogue_api:app --reload
# Test: http://localhost:8000/docs

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import motor.motor_asyncio
from bson import ObjectId

MONGO_URL = "mongodb://localhost:27017"

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create client and ensure index
    app.state.client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URL)
    app.state.col = app.state.client["catalogue"]["products"]
    await app.state.col.create_index("sku", unique=True)
    yield
    # Shutdown: close connection
    app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.get("/products")
async def list_products(category: str = None, max_price: float = None):
    query = {}
    if category:
        query["category"] = category
    if max_price is not None:
        query["price"] = {"$lte": max_price}
    products = []
    async for p in app.state.col.find(query, {"_id": 0}):
        products.append(p)
    return {"count": len(products), "products": products}

@app.post("/products", status_code=201)
async def create_product(product: dict):
    if not all(k in product for k in ["sku", "name", "price"]):
        raise HTTPException(400, "sku, name, and price are required")
    result = await app.state.col.insert_one(product)
    return {"id": str(result.inserted_id), "sku": product["sku"]}

@app.patch("/products/{sku}")
async def update_product(sku: str, updates: dict):
    result = await app.state.col.update_one({"sku": sku}, {"$set": updates})
    if result.matched_count == 0:
        raise HTTPException(404, f"Product '{sku}' not found")
    return {"updated": result.modified_count}

@app.delete("/products/{sku}")
async def delete_product(sku: str):
    result = await app.state.col.delete_one({"sku": sku})
    if result.deleted_count == 0:
        raise HTTPException(404, f"Product '{sku}' not found")
    return {"deleted": True}

The lifespan context manager is the modern FastAPI pattern for managing the motor client — it creates the connection on startup and closes it cleanly on shutdown. The unique index on sku prevents duplicate products at the database level. Because motor is async, all four endpoints can handle concurrent requests without one query blocking another.

API Alice juggling multiple async MongoDB queries simultaneously
Async means handling all queries at once — no waiting.
Async MongoDB: 10x the throughput at the same latency.
Async MongoDB: 10x the throughput at the same latency.

Frequently Asked Questions

How do I handle MongoDB’s ObjectId in API responses?

MongoDB’s _id field is a bson.ObjectId object, not a plain string. JSON serializers do not know how to handle it and will raise a TypeError. The cleanest fix is to either exclude _id from your queries using a projection ({"_id": 0}), or convert it when building your response: doc["id"] = str(doc.pop("_id")). If you use Pydantic models with beanie or a custom model, configure the JSON encoder to handle ObjectId automatically.

When should I create indexes?

Create indexes for any field you filter or sort on frequently. Without an index, MongoDB scans every document in the collection for each query. Use await collection.create_index("field_name") for single-field indexes, or create_index([("a", 1), ("b", -1)]) for compound indexes. Call index creation during application startup (in the lifespan handler), not on every request. Check your index usage with collection.find(query).explain("executionStats").

How does motor handle connection pooling?

Motor uses PyMongo’s connection pool internally. The default pool size is 100 connections. One AsyncIOMotorClient instance is meant to be shared across your entire application — do not create a new client per request. The client is thread-safe and coroutine-safe. Set the pool size with AsyncIOMotorClient(url, maxPoolSize=50) to reduce memory usage on low-traffic services.

Does motor support multi-document transactions?

Yes, starting with MongoDB 4.0 (replica sets) and 4.2 (sharded clusters). Use async with await client.start_session() as session: async with session.start_transaction(): ... to wrap multiple operations in a transaction. All motor CRUD methods accept an optional session parameter. Transactions have a performance cost — only use them when you genuinely need atomicity across multiple collections or documents.

How do I store files in MongoDB with motor?

Use motor.motor_asyncio.AsyncIOMotorGridFSBucket for files larger than 16MB (MongoDB’s document size limit). For smaller files (images, PDFs under a few MB), storing them as base64-encoded strings in a document field is simpler. GridFS splits large files into 255KB chunks and stores them across two collections, making upload and download resumable. In most modern setups, object storage (S3, GCS) is a better fit for files than GridFS.

Conclusion

Motor is the right MongoDB driver whenever your Python application uses asyncio. In this tutorial you performed async CRUD with insert_one, find, update_one, and delete_one, ran aggregation pipelines for analytics, created indexes for performance, and built a FastAPI product catalogue API that handles concurrent requests without blocking.

The catalogue API is ready to extend — add authentication middleware, implement pagination with skip() and limit(), add full-text search with a MongoDB text index, or add a change stream listener that notifies clients over WebSocket when products are updated. All of these patterns use the same async CRUD primitives you learned here.

Official documentation: motor — Async Python driver for MongoDB.

How To Use Python pika for RabbitMQ Message Queuing

How To Use Python pika for RabbitMQ Message Queuing

Intermediate

Your web app needs to send a thousand email notifications, resize uploaded images, and kick off reports — all without making the user wait. If you are handling this with threads or background tasks inside the same process, you are one busy period away from a frozen server. Message queues solve this by decoupling the code that produces work from the code that does the work. RabbitMQ is the most widely deployed open-source message broker, and pika is Python’s official client for it.

Install pika with pip install pika. You also need a running RabbitMQ instance — the fastest way to get one locally is docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management, which starts RabbitMQ with the management UI at http://localhost:15672 (login: guest/guest). Once that is running, every example in this tutorial will work as-is.

This tutorial covers the core pika workflow: connecting to RabbitMQ, declaring queues and exchanges, publishing messages, consuming them with worker processes, acknowledging delivery, handling errors, and building a real parallel task queue. By the end, you will be able to distribute work across multiple Python workers reliably.

Publishing and Consuming a Message: Quick Example

Here is the smallest possible producer-consumer pair. Two scripts — one that sends, one that receives — showing the entire lifecycle of a RabbitMQ message:

# producer.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='Hello, World!',
    properties=pika.BasicProperties(delivery_mode=2)  # make message persistent
)
print("[x] Sent 'Hello, World!'")
connection.close()
# consumer.py
import pika

def callback(ch, method, properties, body):
    print(f"[x] Received: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # acknowledge receipt

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
channel.basic_consume(queue='hello', on_message_callback=callback)
print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()

Run producer.py, then consumer.py in a separate terminal. Output:

# producer.py output:
[x] Sent 'Hello, World!'

# consumer.py output:
[*] Waiting for messages. CTRL+C to exit
[x] Received: Hello, World!

Three things to notice: queue_declare(durable=True) survives RabbitMQ restarts, delivery_mode=2 persists the message to disk before acknowledging, and basic_ack() tells the broker the message was processed successfully. Without the ack, RabbitMQ will re-queue the message and redeliver it to another consumer when the connection drops.

Core Concepts: Exchanges, Queues, and Routing

RabbitMQ’s routing model is more flexible than a simple queue. Messages are always published to an exchange, and the exchange decides which queues receive copies based on routing rules.

Exchange TypeHow Routing WorksUse Case
directRoutes to queues whose binding key exactly matches the routing keyTask distribution, worker pools
fanoutRoutes to all bound queues, ignoring routing keyBroadcast notifications, pub/sub
topicRoutes using wildcard patterns (* = one word, # = zero or more)Log routing, category-based filtering
headersRoutes based on message header attributesComplex multi-attribute filtering

The empty string exchange='' in the quick example is the default direct exchange — it routes messages directly to the queue named in routing_key. For most task queue use cases, the default exchange is all you need. The other exchange types shine when you have multiple consumers that need different subsets of the same message stream.

API Alice connecting to RabbitMQ with pika switchboard
Every message finds its queue — pika handles the wiring.

Publishing Messages

For production use, always declare the queue as durable and messages as persistent. Here is a more complete publisher with error handling and connection retry:

# robust_publisher.py
import pika
import json
import time

def get_channel(host='localhost', retries=5):
    """Connect with retry logic for container startup delays."""
    for attempt in range(retries):
        try:
            conn = pika.BlockingConnection(pika.ConnectionParameters(
                host=host,
                heartbeat=600,
                blocked_connection_timeout=300,
            ))
            return conn, conn.channel()
        except pika.exceptions.AMQPConnectionError as e:
            if attempt < retries - 1:
                print(f"Connection failed ({e}), retrying in 2s...")
                time.sleep(2)
            else:
                raise

connection, channel = get_channel()

# Declare a durable queue -- safe to call multiple times (idempotent)
channel.queue_declare(queue='tasks', durable=True)

# Publish 10 tasks
for i in range(10):
    task = {'id': i, 'action': 'resize_image', 'path': f'/uploads/img_{i}.jpg'}
    channel.basic_publish(
        exchange='',
        routing_key='tasks',
        body=json.dumps(task),
        properties=pika.BasicProperties(
            delivery_mode=2,       # persist to disk
            content_type='application/json',
        )
    )
    print(f"[x] Published task {i}")

connection.close()
print("[x] Done")

Output:

[x] Published task 0
[x] Published task 1
...
[x] Published task 9
[x] Done

The heartbeat=600 parameter keeps the connection alive during long operations by sending periodic heartbeat frames. Without it, the broker may close idle connections after 60 seconds -- a common source of confusing ConnectionResetError bugs in long-running services.

Consuming Messages with Workers

A worker process consumes messages one at a time, acknowledges each after successful processing, and rejects (nacks) messages that fail so they get requeued:

# worker.py
import pika
import json
import time
import random

def process_task(task):
    """Simulate work that takes a variable amount of time."""
    delay = random.uniform(0.1, 0.5)
    time.sleep(delay)
    return {'status': 'ok', 'processed': task['id'], 'time': round(delay, 2)}

def on_message(channel, method, properties, body):
    task = json.loads(body)
    print(f"[>] Processing task {task['id']}: {task['action']}")
    try:
        result = process_task(task)
        print(f"[+] Done: {result}")
        channel.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"[!] Failed: {e} -- requeuing")
        channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)

# basic_qos limits to 1 unacked message per worker -- prevents overload
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=on_message)

print("[*] Worker ready. CTRL+C to exit")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
connection.close()

Output (running two worker processes simultaneously):

[*] Worker ready. CTRL+C to exit
[>] Processing task 0: resize_image
[+] Done: {'status': 'ok', 'processed': 0, 'time': 0.23}
[>] Processing task 2: resize_image
[+] Done: {'status': 'ok', 'processed': 2, 'time': 0.41}

basic_qos(prefetch_count=1) is critical for fair work distribution. Without it, RabbitMQ sends all pending messages to workers at once -- the first worker that connects gets everything queued, and other workers sit idle. With prefetch_count=1, each worker receives the next message only after acknowledging the previous one, spreading load evenly.

Loop Larry watching messages flow on RabbitMQ conveyor belt
Messages flow independently — the consumer sets its own pace.

Real-Life Example: Parallel Image Resizer

Let us build a practical task queue that resizes uploaded images in parallel. A publisher queues resize jobs, and multiple worker processes consume them concurrently:

# image_queue_publisher.py
import pika
import json
import os

QUEUE = 'image_resize'

# Sample jobs -- in production these come from your web app
jobs = [
    {'input': f'uploads/photo_{i}.jpg', 'width': 800, 'height': 600}
    for i in range(20)
]

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue=QUEUE, durable=True)

for job in jobs:
    ch.basic_publish(
        exchange='',
        routing_key=QUEUE,
        body=json.dumps(job),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"Queued: {job['input']}")

conn.close()
print(f"Published {len(jobs)} resize jobs")
# image_queue_worker.py
import pika
import json
import time
import random

QUEUE = 'image_resize'

def resize_image(job):
    """Simulate image resize (replace with Pillow in real use)."""
    time.sleep(random.uniform(0.05, 0.2))
    return f"resized/{os.path.basename(job['input'])}"

import os

def on_job(ch, method, properties, body):
    job = json.loads(body)
    try:
        output = resize_image(job)
        print(f"[worker] Resized {job['input']} -> {output} ({job['width']}x{job['height']})")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"[worker] ERROR on {job['input']}: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)  # dead-letter

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue=QUEUE, durable=True)
ch.basic_qos(prefetch_count=1)
ch.basic_consume(queue=QUEUE, on_message_callback=on_job)

print(f"[worker] Listening on '{QUEUE}'...")
channel_name = conn.server_properties.get(b'cluster_name', b'rabbitmq').decode()
try:
    ch.start_consuming()
except KeyboardInterrupt:
    ch.stop_consuming()
conn.close()

Start 3--5 worker processes in separate terminals, then run the publisher. Each worker picks up jobs as they become available. Because RabbitMQ persists both the queue and the messages, you can kill and restart workers at any time without losing jobs. This is the key advantage over in-process background threads -- the work queue survives process crashes, deployments, and restarts.

Cache Katie at message persistence loading dock confirming delivery
Durable queues survive restarts — Cache Katie confirms.

Frequently Asked Questions

What happens if a worker crashes before acknowledging a message?

RabbitMQ tracks which messages are unacknowledged. If the consumer's connection closes without sending an ack, the broker automatically requeues the message and delivers it to the next available worker. This is the core durability guarantee of the AMQP protocol. The risk is that if your worker processes a message but crashes before the ack, the message gets processed twice -- so your task handlers should be idempotent (safe to run multiple times with the same input).

How do I handle messages that keep failing?

Configure a dead-letter exchange (DLX) on the queue. When a message is nacked with requeue=False or exceeds its retry count, RabbitMQ forwards it to the DLX rather than discarding it. You can then inspect failed messages separately, alert on them, or replay them manually after fixing the bug. Set it up via queue_declare(arguments={'x-dead-letter-exchange': 'dlx_name'}).

How do I connect to a cloud RabbitMQ instance?

Use pika.URLParameters('amqps://user:password@host:5671/vhost') for TLS connections (cloud providers use port 5671 for TLS, not 5672). CloudAMQP, Amazon MQ, and other hosted services provide the full connection URL in their dashboard. For credential security, load the URL from an environment variable: pika.URLParameters(os.environ['RABBITMQ_URL']).

How do I broadcast a message to all consumers?

Declare a fanout exchange and bind each consumer's queue to it. The publisher sends to the exchange name with an empty routing key: basic_publish(exchange='notifications', routing_key='', body=msg). Each bound queue receives a copy regardless of routing key. This is the pattern for broadcasting events like "cache invalidated" or "config updated" to all running instances of a service.

Can I use pika with asyncio or threading?

Yes. For asyncio applications, use aio-pika (pip install aio-pika) which provides an async/await API over the same AMQP protocol. For threaded applications, create one channel per thread -- channels are not thread-safe but connections can be shared. Alternatively, pika.SelectConnection provides an event-driven callback API that works with existing event loops.

Conclusion

Python's pika library gives you direct access to RabbitMQ's AMQP protocol for building reliable, scalable message pipelines. In this tutorial you connected to a broker with retry logic, published persistent messages, consumed them with fair dispatch using prefetch_count=1, handled acknowledgements and nacks, and built a parallel image-resize task queue that survives crashes and restarts.

The image resizer is ready to extend -- add a dead-letter queue for failed jobs, implement an exponential backoff retry policy, or wire the publisher into a Flask endpoint so web requests enqueue jobs instead of processing them synchronously. Each of these extensions follows the same publish/consume/ack pattern you learned here.

Official documentation: pika -- AMQP 0-9-1 client library.

How To Use Python timeit for Accurate Code Benchmarking

How To Use Python timeit for Accurate Code Benchmarking

Intermediate

You have two ways to do the same thing in Python — a list comprehension and a for-loop, str.join() and repeated concatenation, sorted() and list.sort() — and you want to know which one is faster. Guessing is wrong, and timing with time.time() gives unreliable one-shot measurements that vary wildly with OS scheduling. Python’s timeit module was built to solve exactly this problem.

timeit is part of Python’s standard library — nothing to install. It works by running your code snippet many thousands of times in a tight loop, averaging out random OS noise so the result actually reflects the code’s performance. You can use it from the command line for quick checks or from Python code for systematic benchmarks you can embed in your test suite.

This tutorial covers everything you need to benchmark Python accurately: the CLI interface, the timeit.timeit() and timeit.repeat() functions, benchmarking callable functions with setup code, comparing multiple implementations, and avoiding the common gotchas that give misleading results. By the end, you will have a reusable benchmarking harness you can apply to any performance question in your codebase.

Benchmarking Two Approaches: Quick Example

Here is the fastest way to settle a performance debate — comparing list comprehension against map() for squaring numbers:

# quick_timeit.py
import timeit

# Benchmark list comprehension
t1 = timeit.timeit('[x**2 for x in range(1000)]', number=10000)

# Benchmark map()
t2 = timeit.timeit('list(map(lambda x: x**2, range(1000)))', number=10000)

print(f"List comprehension: {t1:.4f}s over 10,000 runs")
print(f"map() equivalent:   {t2:.4f}s over 10,000 runs")
print(f"Winner: {'list comprehension' if t1 < t2 else 'map()'} by {abs(t1-t2):.4f}s")

Output:

List comprehension: 0.8231s over 10,000 runs
map() equivalent:   0.9847s over 10,000 runs
Winner: list comprehension by 0.1616s

The number=10000 argument runs the snippet 10,000 times and returns the total elapsed time. Dividing by number gives the per-call cost. Running many repetitions is what makes timeit reliable -- a single execution is too noisy to trust.

What Is timeit and When Should You Use It?

The timeit module provides a simple way to time small bits of Python code with high accuracy. It works by disabling garbage collection during the measurement and running the snippet inside a tight loop, both of which reduce measurement noise significantly compared to a manual time.time() wrapper.

ToolUse WhenGranularitySetup Required
timeitComparing small snippets, functionsMicrosecondsNone (stdlib)
time.time()Coarse script timingMillisecondsNone (stdlib)
cProfileFinding bottlenecks in whole programsPer-functionNone (stdlib)
line_profilerLine-by-line profilingPer-linepip install

Use timeit when you have a specific performance question: "Is approach A faster than approach B?" or "Is this optimisation actually an improvement?" Use cProfile when you need to find where time is being spent across a larger codebase.

Using timeit from the Command Line

The fastest way to benchmark a one-liner is the python -m timeit command. It automatically chooses a sensible number of repetitions and reports the best time across multiple runs:

# Run from your terminal (not a Python file)
python -m timeit "'-'.join(str(i) for i in range(100))"

Output:

50000 loops, best of 5: 7.48 usec per loop

The CLI prints the loop count and the best time per loop across five timing runs. "Best of 5" means timeit ran the measurement 5 times and took the minimum -- this filters out noise from OS interrupts and context switches. A higher loop count with a lower per-loop time is a sign of a fast snippet; a lower loop count with higher time signals a slower one.

For multi-line setups, use multiple -s flags for setup code and the main statement as the final argument:

# Multi-line benchmark with setup
python -m timeit -s "import json; data = {'key': 'value', 'num': 42}" "json.dumps(data)"

Output:

500000 loops, best of 5: 0.612 usec per loop
Sudo Sam benchmarking Python code with timeit
Trust the loop count, not your gut.

Using timeit Programmatically

For benchmark scripts you want to commit and rerun, the Python API gives you full control over repetitions, setup code, and result formatting:

# programmatic_timeit.py
import timeit

# timeit.timeit() runs the stmt number times and returns total seconds
result = timeit.timeit(
    stmt="[i**2 for i in range(500)]",
    number=50000
)
print(f"Total for 50,000 runs: {result:.3f}s")
print(f"Per run: {result/50000*1e6:.2f} microseconds")

# Use setup= to import modules or define variables used in stmt
result2 = timeit.timeit(
    stmt="sorted(data)",
    setup="data = list(range(1000, 0, -1))",  # reversed list, set up once
    number=10000
)
print(f"\nsorted() on 1000-item reversed list:")
print(f"Total for 10,000 runs: {result2:.3f}s")
print(f"Per run: {result2/10000*1e6:.2f} microseconds")

Output:

Total for 50,000 runs: 0.834s
Per run: 16.68 microseconds

sorted() on 1000-item reversed list:
Total for 10,000 runs: 0.621s
Per run: 62.10 microseconds

The setup parameter is key -- it runs once before the timing loop starts, so expensive operations like imports or data creation don't pollute your measurements. Anything that must be repeated each iteration goes in stmt; anything that only needs to happen once goes in setup.

Using timeit.repeat() for Robust Statistics

timeit.repeat() runs the full timing measurement multiple times, giving you a list of results you can analyse statistically. This is more rigorous than a single run:

# repeat_benchmark.py
import timeit
import statistics

def benchmark(stmt, setup="pass", number=10000, repeat=7):
    """Run a benchmark and return min, mean, stdev."""
    times = timeit.repeat(stmt=stmt, setup=setup, number=number, repeat=repeat)
    per_run = [t / number * 1e6 for t in times]  # convert to microseconds per call
    return {
        "min_us": min(per_run),
        "mean_us": statistics.mean(per_run),
        "stdev_us": statistics.stdev(per_run),
        "runs": number,
        "repeats": repeat,
    }

# Compare two string join approaches
r1 = benchmark("''.join(str(i) for i in range(100))", number=20000)
r2 = benchmark("''.join(map(str, range(100)))", number=20000)

print("Generator expression join:")
print(f"  Min: {r1['min_us']:.2f} us  Mean: {r1['mean_us']:.2f} us  Stdev: {r1['stdev_us']:.2f} us")
print("\nmap(str, ...) join:")
print(f"  Min: {r2['min_us']:.2f} us  Mean: {r2['mean_us']:.2f} us  Stdev: {r2['stdev_us']:.2f} us")

faster = "map(str)" if r2['min_us'] < r1['min_us'] else "generator"
speedup = max(r1['min_us'], r2['min_us']) / min(r1['min_us'], r2['min_us'])
print(f"\nWinner: {faster} ({speedup:.1f}x faster)")

Output:

Generator expression join:
  Min: 8.34 us  Mean: 8.51 us  Stdev: 0.18 us

map(str, ...) join:
  Min: 6.92 us  Mean: 7.08 us  Stdev: 0.14 us

Winner: map(str) (1.2x faster)

The standard deviation tells you how noisy the measurement is. A low stdev means your results are reliable; a high stdev means something is interfering -- background processes, thermal throttling, or GC pressure. The minimum is often reported as the "true" performance because it represents the run with the least OS interference.

Debug Dee comparing two benchmark timings
repeat=7 and take the min — one data point is not a benchmark.

Benchmarking Functions

Timing functions is slightly different from timing string snippets because you need to reference the function object. Use a lambda or pass a callable directly to the Timer class:

# function_benchmark.py
import timeit

def approach_a(n):
    """Build a list of squares using a loop."""
    result = []
    for i in range(n):
        result.append(i * i)
    return result

def approach_b(n):
    """Build a list of squares using a comprehension."""
    return [i * i for i in range(n)]

def approach_c(n):
    """Build a list of squares using map."""
    return list(map(lambda x: x * x, range(n)))

N = 1000

# Use lambda to wrap callable (avoids global lookup overhead)
t_a = timeit.timeit(lambda: approach_a(N), number=10000)
t_b = timeit.timeit(lambda: approach_b(N), number=10000)
t_c = timeit.timeit(lambda: approach_c(N), number=10000)

results = [("loop + append", t_a), ("list comprehension", t_b), ("map + lambda", t_c)]
results.sort(key=lambda x: x[1])

print(f"Benchmarking list-of-squares for n={N} (10,000 runs each):\n")
fastest_time = results[0][1]
for name, t in results:
    speedup = t / fastest_time
    print(f"  {name:<22} {t:.4f}s total  {t/10000*1e6:.2f} us/call  {speedup:.2f}x")

Output:

Benchmarking list-of-squares for n=1000 (10,000 runs each):

  list comprehension     0.4123s total  41.23 us/call  1.00x
  map + lambda           0.5014s total  50.14 us/call  1.22x
  loop + append          0.5892s total  58.92 us/call  1.43x

Using a lambda wrapper means the callable is captured at benchmark setup time, avoiding repeated global name lookups that would slightly inflate the slower approaches' times. When benchmarking functions with arguments, always pass them inside the lambda rather than in the function signature -- this gives you fair comparison conditions.

Real-Life Example: Benchmarking a String Formatter

Let us build a systematic benchmark that tests five approaches to string formatting in Python and produces a ranked summary report -- the kind of micro-benchmark you would run before deciding which formatting style to standardize on for a hot code path:

# string_format_benchmark.py
import timeit
import statistics

NAME = "Alice"
AGE = 30
SCORE = 98.6

def bench(fn, number=50000, repeat=5):
    times = timeit.repeat(fn, number=number, repeat=repeat)
    per_us = [t / number * 1e6 for t in times]
    return min(per_us), statistics.mean(per_us)

approaches = {
    "%-formatting":    lambda: "%s is %d years old, score: %.1f" % (NAME, AGE, SCORE),
    "str.format()":    lambda: "{} is {} years old, score: {:.1f}".format(NAME, AGE, SCORE),
    "f-string":        lambda: f"{NAME} is {AGE} years old, score: {SCORE:.1f}",
    "str concat":      lambda: NAME + " is " + str(AGE) + " years old, score: " + str(round(SCORE, 1)),
    "Template":        None,  # set up below
}

from string import Template
tmpl = Template("$name is $age years old, score: $score")
approaches["Template"] = lambda: tmpl.substitute(name=NAME, age=AGE, score=SCORE)

print(f"{'Approach':<20} {'Min (us)':>10} {'Mean (us)':>10}")
print("-" * 42)

results = []
for name, fn in approaches.items():
    mn, avg = bench(fn)
    results.append((name, mn, avg))

results.sort(key=lambda x: x[1])
for name, mn, avg in results:
    print(f"{name:<20} {mn:>10.3f} {avg:>10.3f}")

Output:

Approach             Min (us)  Mean (us)
------------------------------------------
f-string                0.082      0.085
%-formatting            0.098      0.101
str.format()            0.124      0.128
str concat              0.143      0.148
Template                0.621      0.641

F-strings win on every modern Python version, which matches CPython's implementation: f-strings compile to FORMAT_VALUE bytecodes that avoid function call overhead. Template.substitute() is the slowest by a wide margin because it uses regex parsing at runtime. This benchmark gives you concrete numbers to justify your team's style guide choice, not just "f-strings feel faster."

Loop Larry choosing between two Python approaches on a race track
Guessing performance is always wrong — measure it.

Frequently Asked Questions

How do I choose the right number of repetitions?

Aim for a total measurement time of 0.1 to 5 seconds. If your snippet takes 1 microsecond per call, use number=1000000. If it takes 1 millisecond, use number=1000. The CLI auto-selects number for you based on a calibration run. For manual selection, start with number=10000 and adjust until the total time is in the 0.2--2s range -- this keeps measurement noise below 1%.

Why does timeit disable garbage collection?

timeit temporarily disables Python's cyclic garbage collector during measurement because GC pauses can add unpredictable milliseconds to individual runs. This gives more consistent results but means your benchmark does not reflect real-world performance if your code produces a lot of cyclic garbage. If GC behavior matters for your use case, use timeit.Timer directly and call gc.enable() inside your setup string to re-enable it.

Why does my benchmark fail with a NameError about my functions?

String snippets run in a minimal namespace that does not include your module's globals. Either pass globals=globals() as a keyword argument to timeit.timeit(), or use the setup parameter to import what you need: setup="from __main__ import my_function". The lambda wrapper approach sidesteps this entirely because the closure captures the variable directly.

Can timeit measure memory usage?

timeit only measures time, not memory. For memory profiling, use the memory_profiler package (pip install memory-profiler) which provides a @profile decorator and line-by-line memory usage reports. For quick peak memory checks, tracemalloc from the standard library can take memory snapshots before and after running code to show allocation deltas.

What is the difference between wall time and CPU time?

timeit uses time.perf_counter() which measures wall-clock time -- the elapsed real time including any waits for I/O, locks, or sleep. For CPU-bound code this is equivalent to CPU time. For I/O-bound code (network requests, file reads), wall time includes the wait for the I/O operation to complete, which is not a reflection of code efficiency. For I/O benchmarks, wall time is usually what you want anyway since that is what the user experiences.

Conclusion

Python's timeit module gives you repeatable, noise-resistant microbenchmarks without any external dependencies. In this tutorial you used timeit.timeit() for single measurements, timeit.repeat() for statistical analysis, the CLI for quick command-line comparisons, and lambda wrappers to benchmark real functions with arguments. The string formatter comparison project tied all of these together into a systematic benchmarking harness.

A good next step is to add the benchmark() helper function to your project's test utilities and call it from your CI pipeline to catch performance regressions before they reach production. Even a loose assertion like assert bench(my_fn)[0] < 100 catches catastrophic slowdowns automatically.

Official documentation: timeit -- Measure execution time of small code snippets.

How To Use Python inspect to Examine Live Objects and Code

How To Use Python inspect to Examine Live Objects and Code

Intermediate

You’re debugging a 3 AM production issue, staring at a stack trace from a function whose source file is unfamiliar. You wonder: what arguments does it actually accept? Which module did Python import it from? What does its docstring say? You could grep through the repo or open the file in your editor — or you could just ask Python directly with the inspect module.

inspect is Python’s reflection toolkit: it answers questions about live objects without leaving the interpreter. This guide covers the practical bits — getting function signatures, source code, callers, type hints, and class hierarchies — plus how those features power test frameworks, ORMs, and CLI argument parsers under the hood.

inspect Quick Example

Five lines that show why inspect is useful:

# File: quick.py
import inspect
import requests

print(inspect.signature(requests.get))
print(inspect.getsource(requests.get)[:200])
print(inspect.getfile(requests.get))
print(inspect.getdoc(requests.get).split("\n")[0])

Output:

(url, params=None, **kwargs)
def get(url, params=None, **kwargs):
    r"""Sends a GET request.

    :param url: URL for the new :class:`Request` object.
/Users/you/.venv/lib/python3.12/site-packages/requests/api.py
Sends a GET request.

You just asked Python: what’s the signature of requests.get? Show me its source. Tell me where it lives. Read its docstring. No browsing through GitHub or installing source maps — Python answers from its own runtime.

What Is the inspect Module?

Every Python object carries metadata at runtime — its type, its attributes, its source location, the values of its closure variables, the signature of its methods. inspect is the official way to read that metadata without touching the private double-underscore attributes directly.

It’s the foundation of dozens of frameworks you already use:

  • FastAPI inspects route handler signatures to build OpenAPI docs and validate input.
  • pytest inspects fixture signatures to figure out what each test needs.
  • argparse alternatives like Click and Typer inspect your CLI function signatures to generate help text.
  • SQLAlchemy inspects class attributes to map them to database columns.
  • Debuggers and IDEs use inspect to render local variables and frames during stepping.

You’ll reach for it directly when writing tools that work with arbitrary user code: decorators that need to know what they’re wrapping, test utilities that need argument names, or debugging helpers that need a clean view of the current frame.

Reflection: ask Python about itself.
Reflection: ask Python about itself.

Getting Function Signatures with signature()

inspect.signature() returns a Signature object describing a function’s parameters: their names, defaults, kinds (positional, keyword-only, *args, **kwargs), and type annotations:

# File: signatures.py
import inspect

def transfer(from_account: str, to_account: str, amount: float, *, currency: str = "USD", note: str | None = None) -> dict:
    \"\"\"Transfer funds between accounts.\"\"\"
    return {"status": "ok"}

sig = inspect.signature(transfer)
print(f"Signature: {sig}")
print(f"Return type: {sig.return_annotation}")

for name, param in sig.parameters.items():
    kind = param.kind.name
    annot = param.annotation if param.annotation is not inspect.Parameter.empty else "(no annotation)"
    default = param.default if param.default is not inspect.Parameter.empty else "(required)"
    print(f"  {name:<14} kind={kind:<22} type={annot} default={default}")

Output:

Signature: (from_account: str, to_account: str, amount: float, *, currency: str = 'USD', note: str | None = None) -> dict
Return type: <class 'dict'>
  from_account   kind=POSITIONAL_OR_KEYWORD  type=<class 'str'>   default=(required)
  to_account     kind=POSITIONAL_OR_KEYWORD  type=<class 'str'>   default=(required)
  amount         kind=POSITIONAL_OR_KEYWORD  type=<class 'float'> default=(required)
  currency       kind=KEYWORD_ONLY           type=<class 'str'>   default=USD
  note           kind=KEYWORD_ONLY           type=str | None      default=None

This is the same data CLI generators and validators use. kind tells you whether a parameter can be passed positionally, by keyword only, or as part of *args/**kwargs. annotation gives you the type hint, ready for runtime validation. default distinguishes required from optional arguments.

Binding Arguments with bind() and bind_partial()

Signature.bind() is the runtime equivalent of "would this function call work?". It takes positional/keyword arguments and either returns a BoundArguments object — showing exactly which parameter got which value — or raises a clean TypeError:

import inspect

def render_email(to: str, subject: str, body: str, cc: list[str] | None = None) -> str:
    return f"To: {to}\\nSubject: {subject}\\n\\n{body}"

sig = inspect.signature(render_email)

# Successful bind — figure out what each value maps to
bound = sig.bind("alice@example.com", "Hello", body="Welcome to our platform")
print(bound.arguments)
# {'to': 'alice@example.com', 'subject': 'Hello', 'body': 'Welcome to our platform'}

# Apply defaults explicitly
bound.apply_defaults()
print(bound.arguments)
# {'to': '...', 'subject': '...', 'body': '...', 'cc': None}

# Failed bind raises TypeError with a clear message
try:
    sig.bind("alice@example.com")  # missing subject + body
except TypeError as e:
    print(f"Bind failed: {e}")
# Bind failed: missing a required argument: 'subject'

Bind is how middleware and decorators answer "did the caller provide all the required arguments?" without actually invoking the wrapped function. It's also useful for testing — pre-validate calls before running them.

signature(): the runtime version of go-to-definition.
signature(): the runtime version of go-to-definition.

Reading Source Code with getsource() and getsourcelines()

When you want the actual Python text behind an object — function, class, or module — getsource() reads it from the file the object was loaded from:

import inspect
import json

# Get a function's source
print(inspect.getsource(json.dumps))

# Get source with line numbers (useful for showing context)
lines, start_line = inspect.getsourcelines(json.dumps)
print(f"\\nStarts at line {start_line}")
for i, line in enumerate(lines[:5], start=start_line):
    print(f"  {i}: {line.rstrip()}")

# Just the file path
print(f"\\nFile: {inspect.getfile(json.dumps)}")

This is how IDE "Jump to Definition" works on objects whose source is available. It fails on built-in C functions (len, print) because they have no Python source — wrap the call in a try/except TypeError.

Class Introspection: Members, MRO, Inheritance

For classes, inspect gives you the method resolution order (MRO), the list of members, and predicates to filter them:

import inspect

class Animal:
    def speak(self): ...
    def eat(self): ...

class Dog(Animal):
    def speak(self):
        return "Woof"
    def fetch(self):
        return "Got the ball!"

# Method resolution order — Python's inheritance chain
print(inspect.getmro(Dog))
# (<class 'Dog'>, <class 'Animal'>, <class 'object'>)

# All methods defined on the class (and inherited ones)
methods = inspect.getmembers(Dog, predicate=inspect.isfunction)
print("Methods:", [name for name, _ in methods])
# Methods: ['eat', 'fetch', 'speak']

# Just the ones defined directly on Dog (not inherited)
own_methods = [name for name, fn in methods if fn.__qualname__.startswith("Dog.")]
print("Own methods:", own_methods)
# Own methods: ['fetch', 'speak']

# Check if a class is a subclass — works on uninstantiated classes
print(inspect.isclass(Dog), inspect.isclass(Dog()))    # True, False
print(issubclass(Dog, Animal))                          # True

The inspect.is* predicates (isfunction, isclass, ismethod, isgenerator, iscoroutinefunction) are how frameworks decide what to do with each attribute they encounter — register coroutines as async routes, treat regular functions as sync routes, mount classes as resource handlers, etc.

FastAPI inspects your handler signatures to build OpenAPI for free.
FastAPI inspects your handler signatures to build OpenAPI for free.

Stack Frames: Knowing Who Called You

Sometimes a function needs to know who called it — for logging, for security checks, for context-aware behavior. inspect.stack() returns the chain of frames leading to the current call:

import inspect

def caller_info():
    frame = inspect.stack()[1]  # [0] is us, [1] is our caller
    return {
        "function": frame.function,
        "filename": frame.filename,
        "lineno": frame.lineno,
        "code": frame.code_context[0].strip() if frame.code_context else None,
    }

def buggy():
    info = caller_info()
    print(f"Called from {info['function']} at {info['filename']}:{info['lineno']}")
    print(f"  Line: {info['code']}")

def my_handler():
    buggy()

my_handler()
# Called from my_handler at example.py:14
#   Line: buggy()

This is the mechanism deprecation warnings use to point at the caller's line, not the deprecated function's own line. Use it sparingly — frame inspection is slow and creates a tight coupling between functions.

A Real Example: Auto-Generating CLI Args from a Function

Combining signature inspection with argparse gives you a 30-line decorator that turns any function into a CLI:

# File: autoargs.py
import argparse
import inspect
from typing import get_type_hints

def cli(func):
    \"\"\"Run func as a CLI, deriving arguments from its signature.\"\"\"
    parser = argparse.ArgumentParser(description=inspect.getdoc(func))
    sig = inspect.signature(func)
    hints = get_type_hints(func)

    for name, param in sig.parameters.items():
        param_type = hints.get(name, str)
        if param.default is inspect.Parameter.empty:
            parser.add_argument(name, type=param_type)
        else:
            parser.add_argument(f"--{name}", type=param_type, default=param.default)

    args = parser.parse_args()
    return func(**vars(args))

@cli
def greet(name: str, *, greeting: str = "Hello", times: int = 1):
    \"\"\"Greet someone, possibly multiple times.\"\"\"
    for _ in range(times):
        print(f"{greeting}, {name}!")

# Run: python autoargs.py Pubs --greeting Howdy --times 3

This is exactly how Typer and Fire work internally. You annotate types on your function, the framework inspects them at decoration time, builds an argparse parser, and dispatches to your function. No boilerplate.

Common Pitfalls

  • getsource() on built-ins. C-implemented functions (len, dict methods, most of itertools) have no Python source — getsource() raises TypeError. Check with inspect.isbuiltin() first, or catch the exception.
  • Signature on overloaded C functions. Some C-implemented callables (open(), print()) don't expose their parameter information in a way inspect.signature() can read. You'll see ValueError: no signature found. Pass follow_wrapped=False or fall back to docstring parsing.
  • Frame leaks. inspect.currentframe() returns a reference to the frame object. If you store that reference (or anything from frame.f_locals) past the function's return, you keep the frame's local variables alive — a memory leak. Always copy out only the bits you need before returning.
  • Type hints as strings. If a module uses from __future__ import annotations (PEP 563), all annotations are stored as strings, not actual types. Use typing.get_type_hints(func) instead of reading param.annotation directly — it evaluates the strings into real types.
  • Decorator transparency. A decorated function's signature is the decorator's signature unless the decorator uses @functools.wraps. Always wrap your decorators — otherwise inspect sees the wrong thing.

FAQ

Q: How is inspect different from dir() and vars()?
A: dir() returns a list of attribute names; vars() returns the __dict__. inspect gives you semantically rich data: parameter kinds and defaults, source location, type annotations, MRO. For listing attributes use dir; for understanding what they ARE use inspect.

Q: Can inspect read the source of code typed interactively in a REPL?
A: No — interactively-defined functions have no source file backing them, so getsource() raises OSError. Their signatures still work (inspect.signature(f)), but the source text was never persisted to disk for Python to recover.

Q: How do I introspect an async function?
A: Use inspect.iscoroutinefunction() to detect that it's async, and inspect.signature() to read its signature — both work the same as for sync functions. To inspect an awaited coroutine object (not the function), use inspect.iscoroutine() and inspect.getcoroutinestate().

Q: Is it safe to use inspect.stack() in production?
A: Yes for occasional use (logging, deprecation warnings), no for hot paths. stack() walks all frames and creates frame objects — it's measurably slow. If you need caller info often, prefer logging contextvars or pass an explicit identifier.

Q: What about typing? When do I use inspect vs typing.get_type_hints()?
A: They complement each other. inspect.signature() gives you the structural shape (which parameters exist, which are required). typing.get_type_hints() resolves the type annotations, including forward references in from __future__ import annotations code. For full introspection of a typed function, use both.

Wrapping Up

The inspect module is one of those tools that quietly powers most of the Python ecosystem. You may never use it directly until you're writing a framework, a debugger, or an admin tool — and then it's indispensable. Start with signature(), getsource(), and getmembers(): those three cover the bulk of practical reflection work. Add stack() and the is* predicates as you encounter advanced cases.

The full inspect module documentation has the complete reference, including the lesser-known utilities like getclosurevars, BoundArguments.arguments, and the AST-level helpers.

How To Use Python tkinter to Build Desktop GUI Apps

How To Use Python tkinter to Build Desktop GUI Apps

Beginner

You have written Python scripts that run in the terminal — but what if you want a real window with buttons, text fields, and menus? Python ships with tkinter, a fully capable GUI toolkit that lets you build desktop applications without installing anything. Whether you want a quick utility tool, an admin interface for your scripts, or your first ever desktop app, tkinter is the fastest path from idea to a working window on your screen.

tkinter is part of the Python standard library on Windows and macOS. On Linux you may need to install the python3-tk package via your package manager (sudo apt install python3-tk). Once that is done, you need nothing else — no pip, no virtual environments, no framework setup.

This tutorial covers everything you need to know to build real desktop apps: the widget system, geometry managers (pack, grid, place), event handling, entry forms, listboxes, message dialogs, and the main event loop. You will finish by building a task manager GUI you can use every day.

Your First tkinter Window: Quick Example

Here is the minimum viable tkinter app — a window with a label and a button that changes text when clicked:

# hello_tkinter.py
import tkinter as tk

def on_click():
    label.config(text="You clicked it!")

root = tk.Tk()
root.title("My First App")
root.geometry("300x150")   # width x height in pixels

label = tk.Label(root, text="Hello, tkinter!", font=("Arial", 14))
label.pack(pady=20)

button = tk.Button(root, text="Click Me", command=on_click)
button.pack()

root.mainloop()   # starts the event loop -- blocks until window closes

Output: A 300×150 window appears with a label and button. Clicking the button changes the label text to “You clicked it!”

Every tkinter app has the same three-part structure: create the root window with tk.Tk(), add widgets and bind callbacks, then call root.mainloop() to start the event loop. The event loop listens for mouse clicks, keyboard presses, and other user actions and dispatches them to your callback functions. The sections below show you how to build complex layouts with this same pattern.

What Is tkinter and When Should You Use It?

tkinter is Python’s wrapper around the Tk GUI toolkit, which originated in the Tcl programming language. It provides a set of widgets (visual components) and a geometry manager (layout system) that together let you describe how your interface looks and behaves in pure Python.

LibraryBest ForInstall RequiredLook and Feel
tkinterSimple utilities, quick tools, learningNo (stdlib)Native on each OS
PyQt / PySideProfessional commercial appsYesConsistent, polished
wxPythonWindows-heavy desktop appsYesVery native Windows
Dear PyGuiData dashboards, GPU-acceleratedYesCustom dark-mode

tkinter shines for internal tools, quick prototypes, and educational projects where “works everywhere with no install” matters more than pixel-perfect polish. For a commercial product with a design team, PyQt or PySide are better choices. But for the vast majority of Python developer use cases — wrapping a script in a GUI, building a team utility, or learning GUI fundamentals — tkinter is exactly the right tool.

Core Widgets

tkinter provides over 20 widget types. These are the ones you will use in almost every app:

# core_widgets.py
import tkinter as tk
from tkinter import ttk   # themed widgets (look more modern)

root = tk.Tk()
root.title("Widget Gallery")
root.geometry("400x500")

# Label -- display text or images
tk.Label(root, text="I am a Label", font=("Arial", 12)).pack(pady=5)

# Button -- triggers a callback
tk.Button(root, text="I am a Button", bg="#4CAF50", fg="white").pack(pady=5)

# Entry -- single-line text input
entry = tk.Entry(root, width=30)
entry.insert(0, "Type something here")
entry.pack(pady=5)

# Text -- multi-line text area
text = tk.Text(root, width=40, height=4)
text.insert("1.0", "Multi-line\ntext widget")
text.pack(pady=5)

# Checkbutton -- boolean toggle
var = tk.BooleanVar(value=True)
tk.Checkbutton(root, text="Enable feature", variable=var).pack(pady=5)

# Radiobutton -- choose one from many
choice = tk.StringVar(value="option1")
for opt in ["option1", "option2", "option3"]:
    tk.Radiobutton(root, text=opt, variable=choice, value=opt).pack()

# Combobox (ttk) -- dropdown menu
combo = ttk.Combobox(root, values=["Python", "Go", "Rust", "TypeScript"])
combo.set("Python")
combo.pack(pady=5)

# Listbox -- scrollable list of items
lb = tk.Listbox(root, height=3)
for item in ["Item A", "Item B", "Item C"]:
    lb.insert(tk.END, item)
lb.pack(pady=5)

root.mainloop()

Output: A window displaying each widget type, all functioning and interactive.

The ttk submodule provides themed versions of many widgets that look more native on modern operating systems. Use ttk.Combobox, ttk.Treeview, and ttk.Progressbar where available — they look noticeably more polished than their tk counterparts, especially on Windows.

Pyro Pete at tkinter GUI control panel
tk.Button, tk.Entry, tk.Listbox — the whole dashboard in one import.

Layout Management

tkinter has three geometry managers for positioning widgets. You must use the same manager for all widgets inside a given container — mixing pack and grid in the same frame will cause unpredictable behaviour.

# layout_grid.py
import tkinter as tk

root = tk.Tk()
root.title("Grid Layout")
root.geometry("300x200")

# grid() arranges widgets in rows and columns
tk.Label(root, text="Name:").grid(row=0, column=0, sticky="w", padx=10, pady=8)
tk.Entry(root, width=20).grid(row=0, column=1, padx=10, pady=8)

tk.Label(root, text="Email:").grid(row=1, column=0, sticky="w", padx=10, pady=8)
tk.Entry(root, width=20).grid(row=1, column=1, padx=10, pady=8)

tk.Label(root, text="Role:").grid(row=2, column=0, sticky="w", padx=10, pady=8)
tk.Entry(root, width=20).grid(row=2, column=1, padx=10, pady=8)

# columnspan stretches a widget across multiple columns
tk.Button(root, text="Submit", width=20).grid(
    row=3, column=0, columnspan=2, pady=15
)

root.mainloop()

Output: A clean form layout with labels and entry fields aligned in two columns and a full-width submit button.

sticky controls alignment within the grid cell — "w" means align to the left (west), "ew" means stretch to fill the full width. padx and pady add spacing around the widget. For most forms and dialogs, grid() is the clearest layout manager because row and column positions are explicit and predictable.

Event Handling

Beyond button clicks, tkinter can respond to keyboard input, mouse movement, window resize, and more. You bind event listeners to widgets using the bind() method:

# event_handling.py
import tkinter as tk

root = tk.Tk()
root.title("Event Demo")
root.geometry("350x200")

output = tk.Label(root, text="Press a key or click", font=("Arial", 12))
output.pack(pady=30)

# Keyboard event -- fires when any key is pressed
def on_key(event):
    output.config(text=f"Key pressed: '{event.char}' (keycode {event.keycode})")

# Mouse click event
def on_click(event):
    output.config(text=f"Clicked at ({event.x}, {event.y})")

# Double-click event
def on_double(event):
    output.config(text="Double-clicked!")

root.bind("", on_key)
root.bind("", on_click)       # left mouse button
root.bind("", on_double)

# Entry validation: block non-numeric input
validate_cmd = root.register(lambda s: s.isdigit() or s == '')
num_entry = tk.Entry(root, validate="key",
                     validatecommand=(validate_cmd, '%S'), width=15)
num_entry.pack(pady=10)
tk.Label(root, text="(numbers only entry above)").pack()

root.mainloop()

Output: A window that displays which key was pressed or where the mouse clicked. The entry field rejects any non-digit input as you type.

Event strings follow Tk’s binding syntax: "<KeyPress>", "<Button-1>", "<Return>", "<Configure>" (window resize), "<FocusIn>". The event object passed to callbacks contains context-specific attributes like event.char, event.keycode, event.x, and event.y.

Tkinter is ugly. Tkinter ships with Python. That's the trade.
Tkinter is ugly. Tkinter ships with Python. That’s the trade.

Message Boxes and File Dialogs

tkinter’s messagebox and filedialog submodules give you native OS dialogs for confirmations, errors, and file selection without writing a single layout line:

# dialogs.py
import tkinter as tk
from tkinter import messagebox, filedialog, simpledialog

root = tk.Tk()
root.title("Dialogs Demo")
root.geometry("300x200")

def show_info():
    messagebox.showinfo("Done", "Your file has been saved successfully.")

def ask_confirm():
    result = messagebox.askyesno("Confirm", "Delete selected items?")
    status.config(text=f"You chose: {'Yes' if result else 'No'}")

def pick_file():
    path = filedialog.askopenfilename(
        title="Select a file",
        filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
    )
    status.config(text=f"Selected: {path}" if path else "Cancelled")

def ask_name():
    name = simpledialog.askstring("Input", "Enter your name:")
    if name:
        status.config(text=f"Hello, {name}!")

tk.Button(root, text="Show Info",   command=show_info,   width=20).pack(pady=5)
tk.Button(root, text="Ask Yes/No",  command=ask_confirm, width=20).pack(pady=5)
tk.Button(root, text="Pick File",   command=pick_file,   width=20).pack(pady=5)
tk.Button(root, text="Ask Name",    command=ask_name,    width=20).pack(pady=5)

status = tk.Label(root, text="", wraplength=280)
status.pack(pady=10)

root.mainloop()

Output: Each button opens a native OS dialog. The selected result is displayed in the label below the buttons.

These dialogs block execution until the user responds — askyesno() returns True or False, askopenfilename() returns a file path string (or empty string if cancelled). Using native dialogs means your app automatically matches the operating system’s look and feel without any custom CSS or theming work.

Real-Life Example: Task Manager App

Let us build a practical task manager with a full CRUD interface — add tasks, mark them complete, and delete them — all in one tkinter window:

# task_manager.py
import tkinter as tk
from tkinter import messagebox, simpledialog

class TaskManagerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Task Manager")
        self.root.geometry("480x400")
        self.root.resizable(True, True)
        self.tasks = []   # list of {'text': str, 'done': bool}
        self._build_ui()

    def _build_ui(self):
        # Top frame: entry + add button
        top = tk.Frame(self.root)
        top.pack(fill="x", padx=10, pady=8)

        self.task_var = tk.StringVar()
        entry = tk.Entry(top, textvariable=self.task_var, font=("Arial", 12))
        entry.pack(side="left", fill="x", expand=True, padx=(0, 8))
        entry.bind("", lambda e: self.add_task())

        tk.Button(top, text="Add", width=8, command=self.add_task,
                  bg="#4CAF50", fg="white").pack(side="left")

        # Middle frame: listbox with scrollbar
        mid = tk.Frame(self.root)
        mid.pack(fill="both", expand=True, padx=10, pady=4)

        scrollbar = tk.Scrollbar(mid)
        scrollbar.pack(side="right", fill="y")

        self.listbox = tk.Listbox(mid, font=("Arial", 12),
                                  yscrollcommand=scrollbar.set,
                                  selectmode="single", activestyle="none")
        self.listbox.pack(fill="both", expand=True)
        scrollbar.config(command=self.listbox.yview)

        # Bottom frame: action buttons
        bot = tk.Frame(self.root)
        bot.pack(fill="x", padx=10, pady=8)

        tk.Button(bot, text="Mark Done",  command=self.mark_done,
                  width=12, bg="#2196F3", fg="white").pack(side="left", padx=4)
        tk.Button(bot, text="Edit",       command=self.edit_task,
                  width=12).pack(side="left", padx=4)
        tk.Button(bot, text="Delete",     command=self.delete_task,
                  width=12, bg="#f44336", fg="white").pack(side="left", padx=4)

        self.count_label = tk.Label(bot, text="0 tasks")
        self.count_label.pack(side="right", padx=4)

    def _refresh(self):
        self.listbox.delete(0, tk.END)
        for task in self.tasks:
            prefix = "[x] " if task['done'] else "[ ] "
            self.listbox.insert(tk.END, prefix + task['text'])
        total = len(self.tasks)
        done  = sum(1 for t in self.tasks if t['done'])
        self.count_label.config(text=f"{done}/{total} done")

    def add_task(self):
        text = self.task_var.get().strip()
        if not text:
            return
        self.tasks.append({'text': text, 'done': False})
        self.task_var.set("")
        self._refresh()

    def mark_done(self):
        sel = self.listbox.curselection()
        if not sel:
            messagebox.showwarning("No selection", "Please select a task first.")
            return
        idx = sel[0]
        self.tasks[idx]['done'] = not self.tasks[idx]['done']
        self._refresh()

    def edit_task(self):
        sel = self.listbox.curselection()
        if not sel:
            messagebox.showwarning("No selection", "Please select a task first.")
            return
        idx = sel[0]
        new_text = simpledialog.askstring(
            "Edit Task", "Update task text:",
            initialvalue=self.tasks[idx]['text']
        )
        if new_text:
            self.tasks[idx]['text'] = new_text.strip()
            self._refresh()

    def delete_task(self):
        sel = self.listbox.curselection()
        if not sel:
            messagebox.showwarning("No selection", "Please select a task first.")
            return
        idx = sel[0]
        task_text = self.tasks[idx]['text']
        if messagebox.askyesno("Delete", f"Delete '{task_text}'?"):
            self.tasks.pop(idx)
            self._refresh()

if __name__ == "__main__":
    root = tk.Tk()
    app = TaskManagerApp(root)
    root.mainloop()

Output: A fully functional task manager window. Add tasks with the entry field, mark them done (toggles [x]), edit task text, or delete with a confirmation dialog. The counter in the bottom-right tracks completed vs total tasks.

This class-based structure is the recommended pattern for real apps — it keeps state (self.tasks), layout (_build_ui), and update logic (_refresh) cleanly separated. You can extend it by adding persistent storage with json or sqlite3, due-date columns using ttk.Treeview, or keyboard shortcuts with additional root.bind() calls.

Cache Katie with tkinter mainloop task checklist
mainloop() never sleeps. Your to-do list should.

Frequently Asked Questions

Why does my program freeze when I call a long function from a button?

tkinter is single-threaded. If your callback does anything slow (network request, file processing, heavy calculation), it blocks the event loop and the window appears frozen. The fix is to run the slow work in a background thread using Python’s threading.Thread, then use root.after(0, update_ui_function) to push the result back to the main thread safely. Never update tkinter widgets from a background thread directly — only from the main thread.

Can I use pack() and grid() in the same window?

Not in the same container — mixing them inside the same frame or window will cause a TclError. The workaround is to use nested Frame widgets: the outer frame uses pack() to position major sections (header, body, footer), and each inner frame uses grid() for its own contents. This separation keeps your layout code clean and avoids the constraint conflict.

How do I make tkinter look more modern?

Use the ttk module instead of basic tk widgets where available, and apply a theme with style = ttk.Style(); style.theme_use('clam') (available themes vary by OS). The ttkthemes third-party package provides polished options like arc and equilux. For a fully custom look, configure widget colors and fonts using widget.config(bg='#2b2b2b', fg='#ffffff').

How do I display images in tkinter?

Use the PIL (Pillow) library: from PIL import Image, ImageTk. Open the image, convert it to a PhotoImage with ImageTk.PhotoImage(img), and set it on a Label with label.config(image=photo). Keep a reference to the PhotoImage object (e.g., label.image = photo) — Python’s garbage collector will delete it and your image will disappear without this step.

How do I package a tkinter app for distribution?

Use PyInstaller to bundle your app into a standalone executable: pyinstaller --onefile --windowed myapp.py. The --windowed flag prevents a terminal window from opening alongside the GUI on Windows. For macOS, py2app creates a proper .app bundle. Both tools include the Python interpreter and all dependencies, so end users do not need Python installed.

Conclusion

Python’s tkinter module puts a complete GUI toolkit at your fingertips with zero installation overhead. In this tutorial, you worked with the core widgets (Label, Button, Entry, Text, Listbox, Checkbutton, Combobox), organised layouts with grid() and pack(), handled keyboard and mouse events with bind(), used native OS dialogs from messagebox and filedialog, and built a full task manager with class-based architecture.

The task manager is a solid foundation to extend — add file persistence, drag-and-drop reordering, or a priority column to make it genuinely useful in your daily workflow. tkinter’s simplicity is its superpower: you can prototype and ship a GUI utility in an afternoon without learning a web framework or a complex GUI toolkit.

Explore the full widget reference and geometry manager documentation in the official Python docs: tkinter — Python interface to Tcl/Tk.

How To Use Python imaplib to Read and Manage Emails

How To Use Python imaplib to Read and Manage Emails

Intermediate

Your inbox is a goldmine of structured data — receipts, order confirmations, alerts, reports — and most developers are leaving it completely untouched. If you have ever wanted to automatically archive invoices from a particular sender, extract tracking numbers from shipping emails, or build a lightweight email-to-ticket pipeline, Python’s built-in imaplib module is what you need. No third-party packages, no complicated setup — just the IMAP protocol wrapped in a clean Python interface.

The imaplib module ships with Python’s standard library, so there is nothing to install. You will need an email account with IMAP access enabled and, for Gmail, an App Password (not your regular password). That is the only setup step. Everything else is Python.

This tutorial walks you through connecting to an IMAP server, authenticating securely, listing and selecting mailboxes, searching for messages by sender or date, fetching full email bodies, and parsing headers with the companion email module. By the end, you will have a working email reader that you can adapt for any automation task.

Reading Your Inbox: Quick Example

Here is the shortest path to reading your 5 most recent unread emails. This script connects, searches, fetches, and prints subject lines — all in about 20 lines:

# quick_email_reader.py
import imaplib
import email

IMAP_HOST = 'imap.gmail.com'
USERNAME  = 'you@gmail.com'
PASSWORD  = 'your-app-password'   # Gmail App Password, not account password

with imaplib.IMAP4_SSL(IMAP_HOST) as mail:
    mail.login(USERNAME, PASSWORD)
    mail.select('INBOX')

    _, uids = mail.search(None, 'UNSEEN')
    recent = uids[0].split()[-5:]          # last 5 unread UIDs

    for uid in recent:
        _, data = mail.fetch(uid, '(RFC822)')
        msg = email.message_from_bytes(data[0][1])
        print(f"From:    {msg['From']}")
        print(f"Subject: {msg['Subject']}")
        print(f"Date:    {msg['Date']}")
        print('---')

Output:

From:    "GitHub" <noreply@github.com>
Subject: [python-project] Pull request merged
Date:    Wed, 13 May 2026 09:14:32 +0000
---
From:    orders@amazon.com
Subject: Your order has shipped
Date:    Wed, 13 May 2026 08:03:11 +0000
---

The IMAP4_SSL context manager handles the secure connection and automatic logout. mail.search(None, 'UNSEEN') returns a list of message UIDs matching the IMAP search criterion. The email module then parses the raw RFC 822 message bytes into a navigable object with named header access. The sections below break down each part of this pattern so you can customise it for your use case.

What Is IMAP and Why Use imaplib?

IMAP (Internet Message Access Protocol) is the standard protocol for reading email from a server. Unlike POP3, which downloads and deletes messages, IMAP keeps mail on the server and lets you query it without permanently removing it — making it safe to run scripts against your real inbox. Almost every email provider supports IMAP: Gmail, Outlook, Yahoo Mail, Fastmail, and most self-hosted mail servers.

ApproachBest ForSetup RequiredLibrary
imaplib (IMAP)Reading, searching, archiving emailIMAP enabled + App PasswordBuilt-in
smtplib (SMTP)Sending email onlySMTP credentialsBuilt-in
Gmail APIFull Gmail integration, OAuthGoogle Cloud projectgoogle-api-python-client
Microsoft GraphOutlook/365 integrationAzure app registrationmsal

imaplib wins when you want something that works with any provider, has zero dependencies, and gives you full control over search queries. The Gmail API is better if you need push notifications, label management, or you are already in the Google ecosystem and want OAuth instead of passwords.

Connecting and Authenticating

Before you can read any email, you need a live, authenticated IMAP connection. The two main connection classes are IMAP4 (plain-text, port 143) and IMAP4_SSL (TLS-encrypted, port 993). Always use IMAP4_SSL in production — never send credentials over an unencrypted connection.

# connect_imap.py
import imaplib

# Gmail IMAP settings
IMAP_HOST = 'imap.gmail.com'
IMAP_PORT = 993

mail = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)

# Returns ('OK', [b'...capability string...'])
status, caps = mail.capability()
print(f"Status: {status}")
print(f"Server supports: {caps[0].decode()[:80]}...")

# Authenticate -- use App Password for Gmail (Settings > Security > App Passwords)
mail.login('you@gmail.com', 'your-app-password')
print("Logged in successfully")

mail.logout()

Output:

Status: OK
Server supports: CAPABILITY IMAP4rev1 UNSELECT IDLE NAMESPACE QUOTA ID XLIST CHILDREN...
Logged in successfully

The capability() call tells you what extensions the server supports. Most of the time you do not need to inspect it, but it is useful for debugging connection problems. Every imaplib method returns a (status, data) tuple — status is 'OK' on success or 'NO'/'BAD' on failure. Always check it in production scripts before proceeding.

Sudo Sam connecting to IMAP SSL server
IMAP4_SSL: because plaintext auth over port 143 is a crime against your users.

Listing and Selecting Mailboxes

An IMAP account contains multiple mailboxes — INBOX, Sent, Drafts, Trash, and any custom folders or Gmail labels you have created. Before you can read messages, you must select a mailbox. Use list() to discover what is available:

# list_mailboxes.py
import imaplib

with imaplib.IMAP4_SSL('imap.gmail.com') as mail:
    mail.login('you@gmail.com', 'your-app-password')

    _, mailboxes = mail.list()
    for mb in mailboxes:
        print(mb.decode())

Output:

(\HasNoChildren) "/" "INBOX"
(\HasNoChildren) "/" "[Gmail]/Sent Mail"
(\HasNoChildren) "/" "[Gmail]/Drafts"
(\HasNoChildren) "/" "[Gmail]/Spam"
(\HasNoChildren) "/" "[Gmail]/Starred"
(\HasNoChildren) "/" "[Gmail]/All Mail"

Each line shows the mailbox flags, the hierarchy delimiter (/), and the mailbox name. Gmail prefixes its system folders with [Gmail]/. Once you know the name, select it with mail.select() before running any search() or fetch() calls. select() returns the number of messages in that mailbox:

# select_mailbox.py
import imaplib

with imaplib.IMAP4_SSL('imap.gmail.com') as mail:
    mail.login('you@gmail.com', 'your-app-password')

    status, data = mail.select('INBOX')
    count = int(data[0])
    print(f"INBOX has {count} messages")

    status, data = mail.select('[Gmail]/Sent Mail')
    count = int(data[0])
    print(f"Sent Mail has {count} messages")

Output:

INBOX has 1482 messages
Sent Mail has 247 messages

Searching for Messages

IMAP search criteria are powerful and composable. You can combine conditions like FROM, SUBJECT, SINCE, BEFORE, UNSEEN, and SEEN to build precise queries without downloading thousands of messages first.

# search_emails.py
import imaplib

with imaplib.IMAP4_SSL('imap.gmail.com') as mail:
    mail.login('you@gmail.com', 'your-app-password')
    mail.select('INBOX')

    # Search by sender
    _, uids = mail.search(None, 'FROM', '"github.com"')
    print(f"GitHub messages: {len(uids[0].split())}")

    # Search by subject keyword
    _, uids = mail.search(None, 'SUBJECT', '"invoice"')
    print(f"Invoice messages: {len(uids[0].split())}")

    # Combine: unread emails from a specific sender since a date
    _, uids = mail.search(None, '(UNSEEN FROM "noreply@github.com" SINCE "01-May-2026")')
    print(f"Unread GitHub since May 1: {len(uids[0].split())}")

    # All messages this month
    _, uids = mail.search(None, 'SINCE', '01-May-2026')
    print(f"Messages since May 1: {len(uids[0].split())}")

Output:

GitHub messages: 143
Invoice messages: 22
Unread GitHub since May 1: 8
Messages since May 1: 94

The first argument to search() is the character set (use None for US-ASCII, which covers most searches). The search criteria string follows IMAP search syntax — criteria are AND-ed together by default, and you wrap a group in parentheses to nest them. Dates must be in the DD-Mon-YYYY format like '13-May-2026'.

Debug Dee searching through email messages
UNSEEN FROM ‘noreply@’ SINCE ’01-May-2026′ — finally, inbox zero as a query.

Fetching and Parsing Email Content

Once you have a list of UIDs from search(), use fetch() to download the actual message. The fetch specification controls what parts you retrieve — RFC822 fetches the full raw message, while RFC822.HEADER fetches only headers (much faster when you only need subject and sender).

# fetch_emails.py
import imaplib
import email
from email.header import decode_header

def decode_str(value):
    """Decode encoded email header values (handles UTF-8, base64, etc.)."""
    if value is None:
        return ''
    parts = decode_header(value)
    result = []
    for part, charset in parts:
        if isinstance(part, bytes):
            result.append(part.decode(charset or 'utf-8', errors='replace'))
        else:
            result.append(part)
    return ''.join(result)

with imaplib.IMAP4_SSL('imap.gmail.com') as mail:
    mail.login('you@gmail.com', 'your-app-password')
    mail.select('INBOX')

    _, uids = mail.search(None, 'UNSEEN')
    uid_list = uids[0].split()

    for uid in uid_list[:3]:          # process first 3 unread
        _, data = mail.fetch(uid, '(RFC822)')
        raw_email = data[0][1]
        msg = email.message_from_bytes(raw_email)

        subject = decode_str(msg['Subject'])
        sender  = decode_str(msg['From'])
        date    = msg['Date']

        print(f"From:    {sender}")
        print(f"Subject: {subject}")
        print(f"Date:    {date}")

        # Extract plain text body
        if msg.is_multipart():
            for part in msg.walk():
                if part.get_content_type() == 'text/plain':
                    body = part.get_payload(decode=True).decode('utf-8', errors='replace')
                    print(f"Body:    {body[:120].strip()}...")
                    break
        else:
            body = msg.get_payload(decode=True).decode('utf-8', errors='replace')
            print(f"Body:    {body[:120].strip()}...")
        print('---')

Output:

From:    GitHub <noreply@github.com>
Subject: [my-repo] New issue opened by user123
Date:    Wed, 13 May 2026 10:21:05 +0000
Body:    A new issue has been opened in your repository. View it at https://github.com/...
---
From:    "AWS Billing" <aws-billing@amazon.com>
Subject: Your AWS bill for April 2026 is now available
Date:    Wed, 13 May 2026 08:00:44 +0000
Body:    Your AWS monthly bill for April 2026 is now available. Total charges: $14.72...
---

The decode_header() call from email.header handles encoded subject lines like =?UTF-8?B?SGVsbG8gV29ybGQ=?= that appear when senders use non-ASCII characters. The is_multipart() check is essential — HTML emails bundle a plain text part and an HTML part together; msg.walk() lets you pick the one you want.

Marking, Moving, and Deleting Messages

Reading email is only half the job. Production scripts often need to mark messages as read, move them to a folder, or delete them after processing. IMAP handles all of this through flags and the copy() + store() pair:

# manage_emails.py
import imaplib

with imaplib.IMAP4_SSL('imap.gmail.com') as mail:
    mail.login('you@gmail.com', 'your-app-password')
    mail.select('INBOX')

    # Find messages from a specific sender
    _, uids = mail.search(None, 'FROM', '"invoices@acme.com"')
    uid_list = uids[0].split()
    print(f"Found {len(uid_list)} invoice emails")

    for uid in uid_list:
        # Mark as read (add the \Seen flag)
        mail.store(uid, '+FLAGS', '\\Seen')

        # Copy to an archive folder
        mail.copy(uid, 'Archive')

        # Flag the original for deletion
        mail.store(uid, '+FLAGS', '\\Deleted')

    # Permanently remove all messages flagged for deletion
    mail.expunge()
    print(f"Archived and expunged {len(uid_list)} messages")

Output:

Found 7 invoice emails
Archived and expunged 7 messages

The store() method modifies IMAP flags. '+FLAGS' adds a flag, '-FLAGS' removes it. The standard flags are \\Seen, \\Deleted, \\Answered, \\Flagged, and \\Draft. Marking a message \\Deleted does not remove it immediately — it only disappears when you call expunge(), which gives you a safety window to undo mistakes.

Loop Larry archiving emails on conveyor belt
mail.store(uid, ‘+FLAGS’, ‘\\\\Deleted’) — archived, processed, gone. As it should be.

Real-Life Example: Invoice Extractor

Let us build a practical tool that scans your inbox for invoice emails, extracts key details from each one, and saves them to a CSV file for accounting purposes. This is a pattern you can adapt for any structured-data extraction task.

# invoice_extractor.py
import imaplib
import email
from email.header import decode_header
import csv
import re
from datetime import datetime

IMAP_HOST = 'imap.gmail.com'
USERNAME  = 'you@gmail.com'
PASSWORD  = 'your-app-password'

def decode_str(value):
    if value is None:
        return ''
    parts = decode_header(value)
    result = []
    for part, charset in parts:
        if isinstance(part, bytes):
            result.append(part.decode(charset or 'utf-8', errors='replace'))
        else:
            result.append(part)
    return ''.join(result)

def get_body(msg):
    if msg.is_multipart():
        for part in msg.walk():
            if part.get_content_type() == 'text/plain':
                return part.get_payload(decode=True).decode('utf-8', errors='replace')
    return msg.get_payload(decode=True).decode('utf-8', errors='replace')

def extract_amount(body):
    # Look for patterns like "$14.72" or "Total: 14.72"
    match = re.search(r'\$[\d,]+\.\d{2}', body)
    return match.group(0) if match else 'Unknown'

invoices = []

with imaplib.IMAP4_SSL(IMAP_HOST) as mail:
    mail.login(USERNAME, PASSWORD)
    mail.select('INBOX')

    # Search for likely invoice emails from the last 30 days
    _, uids = mail.search(None, '(SUBJECT "invoice" SINCE "01-Apr-2026")')
    uid_list = uids[0].split()
    print(f"Found {len(uid_list)} potential invoice emails...")

    for uid in uid_list:
        _, data = mail.fetch(uid, '(RFC822)')
        msg = email.message_from_bytes(data[0][1])

        subject = decode_str(msg['Subject'])
        sender  = decode_str(msg['From'])
        date    = msg['Date']
        body    = get_body(msg)
        amount  = extract_amount(body)

        invoices.append({
            'date':    date,
            'from':    sender,
            'subject': subject,
            'amount':  amount,
        })
        # Mark as read once processed
        mail.store(uid, '+FLAGS', '\\Seen')

with open('invoices.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['date','from','subject','amount'])
    writer.writeheader()
    writer.writerows(invoices)

print(f"Saved {len(invoices)} invoices to invoices.csv")

Output:

Found 12 potential invoice emails...
Saved 12 invoices to invoices.csv

This script ties together everything from the tutorial: SSL connection, search by subject and date, full message fetch, multipart body extraction, regex parsing, and flag management. You can extend it by adding PDF attachment extraction with the email module’s get_payload(decode=True) on application/pdf parts, or by pushing rows directly to a Google Sheet instead of a CSV file.

API Alice processing email receipts
regex + imaplib: your accountant’s worst nightmare, your best script.

Frequently Asked Questions

Why can’t I log in with my Gmail password?

Google requires App Passwords for IMAP access when 2-Step Verification is enabled (and it is required for most accounts). Go to your Google Account > Security > 2-Step Verification > App Passwords, create a password for “Mail” on “Other device”, and use that 16-character string instead of your regular password. Never store credentials in source code — use environment variables or a secrets manager.

How do I search for emails within a date range?

Combine SINCE and BEFORE in your search string. Dates must be in the DD-Mon-YYYY format: mail.search(None, '(SINCE "01-May-2026" BEFORE "13-May-2026")'). Note that SINCE is inclusive but BEFORE is exclusive, so the example above returns May 1 through May 12.

How do I save email attachments?

Walk the message parts and check part.get_content_disposition() == 'attachment'. Then call part.get_payload(decode=True) to get the raw bytes, and write them to a file using the filename from part.get_filename(). Always sanitise the filename with os.path.basename() before writing to disk to prevent directory traversal attacks.

Can imaplib watch for new emails in real time?

The imaplib module does not directly support IMAP IDLE (push notifications). For real-time monitoring, use the imapclient third-party library, which wraps imaplib and adds IDLE support. Alternatively, poll every 60 seconds using a while True loop with time.sleep(60) — less efficient but works with any server.

How do I handle connection drops mid-batch?

Wrap your main loop in a try/except that catches imaplib.IMAP4.abort and imaplib.IMAP4.error. On abort, re-connect and re-select the mailbox, then resume from the last processed UID. Store processed UIDs in a set so you do not reprocess messages on reconnect. Long-running scripts should also send a mail.noop() every few minutes to keep the connection alive.

Conclusion

Python’s imaplib module gives you direct, scriptable access to any IMAP email account without needing any third-party packages. In this tutorial, you connected over TLS with IMAP4_SSL, listed and selected mailboxes, searched with criteria like UNSEEN, FROM, SUBJECT, and SINCE, fetched full messages and parsed headers with the email module’s decode_header(), and managed messages with store() and expunge().

The invoice extractor project shows how these primitives combine into a real automation tool. Extend it further by adding PDF attachment parsing, pushing data to a database, or integrating with Slack using smtplib‘s sibling for notifications. The IMAP protocol is remarkably capable once you learn the query syntax.

For deeper reading, the official Python documentation covers every method and flag in detail: imaplib — IMAP4 protocol client. Pair it with the email package documentation for full parsing power.

How To Use Python Textual for Modern Terminal UIs

How To Use Python Textual for Modern Terminal UIs

Intermediate

Terminal applications have come a long way from blinking cursors and plain text. Today’s Python developers can build dashboards, file explorers, database browsers, and data pipelines with rich color, interactive widgets, and keyboard navigation — all running inside a standard terminal window. The challenge has always been that building these UIs from scratch with curses is tedious, platform-inconsistent, and extremely difficult to maintain.

Python’s Textual framework solves this with a modern, batteries-included TUI (terminal user interface) toolkit. It brings CSS-inspired styling, a widget library, reactive data binding, and an async event system to the terminal. Install it with pip install textual and you are writing rich TUIs in minutes.

In this article we cover installing Textual and creating your first app, using built-in widgets (Button, Input, DataTable), applying CSS styles to your layout, handling keyboard and click events, and building a real-world task manager TUI. By the end you will have the skills to build professional terminal applications entirely in Python.

Textual TUI: Quick Example

Here is the minimal Textual app — a clickable button that updates a label. Run it with python quick_textual.py and use your keyboard or mouse to interact.

# quick_textual.py
from textual.app import App, ComposeResult
from textual.widgets import Button, Static

class QuickApp(App):
    def compose(self) -> ComposeResult:
        yield Static("Hello from Textual!", id="message")
        yield Button("Click Me", id="btn")

    def on_button_pressed(self, event: Button.Pressed) -> None:
        self.query_one("#message", Static).update("Button clicked!")

if __name__ == "__main__":
    QuickApp().run()

Output:

[Terminal renders a centered message and a clickable button]
Hello from Textual!
[ Click Me ]

After clicking:
Button clicked!

The compose() method returns the widget tree for your app — this is where you declare the UI structure. The on_button_pressed method is an event handler that fires whenever any Button is pressed. The query_one method finds a widget by CSS-style selector and calls update() to change its content. Everything runs asynchronously inside Textual’s built-in event loop.

What Is Textual and Why Use It?

Textual is a Python framework built by the creators of Rich for building feature-rich TUI applications that run in any terminal. It uses an async-first architecture, a CSS-inspired styling system, and a reactive data model that automatically updates the UI when your data changes.

FeaturecursesurwidTextual
CSS stylingNoNoYes
Mouse supportLimitedYesYes
Async-nativeNoNoYes
Widget libraryNoneBasicRich
Testing toolsNoneNoneBuilt-in

Textual is the right choice when you want a professional-quality TUI without the complexity of curses, especially if your app involves real-time data updates, interactive forms, or complex layouts.

Pyro Pete excited about Textual terminal dashboard
curses.initscr() vs. Textual. One of these is a trauma response.

Installing Textual

Textual is on PyPI and supports Python 3.8 and later. It includes the Rich library for rendering:

# Install Textual
pip install textual

# For development -- includes the Textual DevTools for live CSS editing
pip install textual-dev

The textual-dev package adds the textual CLI with a live development mode: run textual run --dev your_app.py and changes to your CSS reload instantly without restarting the app. This speeds up UI development significantly.

Built-in Widgets

Textual ships with a comprehensive widget library. Here is a form using Input, Button, and Label together:

# widgets_demo.py
from textual.app import App, ComposeResult
from textual.widgets import Button, Input, Label
from textual.containers import Vertical

class FormApp(App):
    CSS = """
    Vertical { align: center middle; }
    Input { margin: 1; width: 40; }
    Button { margin: 1; }
    Label { margin: 1; color: green; }
    """

    def compose(self) -> ComposeResult:
        with Vertical():
            yield Label("Enter your name:", id="prompt")
            yield Input(placeholder="Type here...", id="name_input")
            yield Button("Submit", variant="primary", id="submit")
            yield Label("", id="result")

    def on_button_pressed(self, event: Button.Pressed) -> None:
        name = self.query_one("#name_input", Input).value
        result = self.query_one("#result", Label)
        if name:
            result.update(f"Hello, {name}!")
        else:
            result.update("[red]Please enter a name.[/red]")

if __name__ == "__main__":
    FormApp().run()

Output:

[Terminal shows a centered form]
Enter your name:
[_____________________ ]
[     Submit          ]

After typing "Alice" and clicking Submit:
Hello, Alice!

The CSS class variable lets you write inline CSS that controls layout, dimensions, colors, and alignment. The Vertical container stacks widgets top-to-bottom, while align: center middle centers the column both horizontally and vertically in the terminal. Rich markup like [red]...[/red] works inside Label text for quick inline styling.

Debug Dee studying Textual CSS layout
align: center middle. Finally, CSS that actually centers things.

Interactive DataTable

The DataTable widget renders sortable, scrollable tabular data with keyboard navigation — ideal for displaying structured data like database results or log files:

# datatable_demo.py
from textual.app import App, ComposeResult
from textual.widgets import DataTable

ROWS = [
    ("Name", "Language", "Stars"),
    ("FastAPI", "Python", "80k"),
    ("Textual", "Python", "25k"),
    ("htmx", "JavaScript", "38k"),
    ("Rust", "Rust", "95k"),
]

class TableApp(App):
    def compose(self) -> ComposeResult:
        yield DataTable()

    def on_mount(self) -> None:
        table = self.query_one(DataTable)
        table.add_columns(*ROWS[0])
        for row in ROWS[1:]:
            table.add_row(*row)

if __name__ == "__main__":
    TableApp().run()

Output:

Name       Language    Stars
FastAPI    Python      80k
Textual    Python      25k
htmx       JavaScript  38k
Rust       Rust        95k

The on_mount event fires once when the app starts and the widget tree is ready — the right place to populate data. add_columns takes column names as arguments, and add_row takes cell values. Press the arrow keys to navigate rows and columns, and press Enter to select a row (the DataTable.RowSelected event fires).

Reactive Attributes

Textual’s reactive system automatically re-renders parts of the UI whenever a tracked attribute changes — no manual refresh calls needed:

# reactive_demo.py
from textual.app import App, ComposeResult
from textual.reactive import reactive
from textual.widgets import Button, Static

class CounterApp(App):
    count: reactive[int] = reactive(0)

    def compose(self) -> ComposeResult:
        yield Static(id="counter")
        yield Button("+1", id="inc")
        yield Button("Reset", id="reset")

    def watch_count(self, value: int) -> None:
        self.query_one("#counter", Static).update(f"Count: {value}")

    def on_button_pressed(self, event: Button.Pressed) -> None:
        if event.button.id == "inc":
            self.count += 1
        elif event.button.id == "reset":
            self.count = 0

if __name__ == "__main__":
    CounterApp().run()

Output:

[Terminal shows counter updating in real time]
Count: 0
[+1]  [Reset]

After clicking +1 three times:
Count: 3

The reactive descriptor declares a tracked attribute. Whenever self.count changes, Textual automatically calls the corresponding watch_count watcher method, which updates the Static widget. This pattern keeps your UI in sync with your data without any manual glue code — similar to how React state works, but for the terminal.

Loop Larry watching reactive counter in Textual
watch_count fires on every change. No polling. No callbacks. Just vibes.

Real-Life Example: Terminal Task Manager

Here is a complete task manager TUI that lets you add tasks, mark them complete, and delete them — all from the keyboard:

# task_manager.py
from textual.app import App, ComposeResult
from textual.widgets import Button, DataTable, Input, Label
from textual.containers import Horizontal, Vertical

class TaskApp(App):
    CSS = """
    #top { height: 5; }
    #input { width: 40; margin: 1; }
    #add { margin: 1; }
    #tasks { height: 1fr; }
    #status { color: green; margin: 1; }
    """

    def __init__(self):
        super().__init__()
        self.tasks = []
        self.next_id = 1

    def compose(self) -> ComposeResult:
        with Vertical():
            with Horizontal(id="top"):
                yield Input(placeholder="New task...", id="input")
                yield Button("Add", variant="primary", id="add")
                yield Label("", id="status")
            yield DataTable(id="tasks")

    def on_mount(self) -> None:
        table = self.query_one("#tasks", DataTable)
        table.add_columns("ID", "Task", "Done")

    def on_button_pressed(self, event: Button.Pressed) -> None:
        inp = self.query_one("#input", Input)
        text = inp.value.strip()
        if not text:
            return
        task = {"id": self.next_id, "text": text, "done": False}
        self.tasks.append(task)
        table = self.query_one("#tasks", DataTable)
        table.add_row(str(task["id"]), task["text"], "[ ]")
        inp.value = ""
        self.next_id += 1
        self.query_one("#status", Label).update(f"Task {task['id']} added")

    def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
        table = self.query_one("#tasks", DataTable)
        row_key = event.row_key
        task_id = int(table.get_cell(row_key, "ID"))
        task = next((t for t in self.tasks if t["id"] == task_id), None)
        if task:
            task["done"] = not task["done"]
            done_str = "[x]" if task["done"] else "[ ]"
            table.update_cell(row_key, "Done", done_str)

if __name__ == "__main__":
    TaskApp().run()

Output:

[Terminal shows a task manager interface]
[___New task____________] [Add]
ID  Task              Done
1   Write tests       [ ]
2   Update README     [x]
3   Deploy to prod    [ ]
Task 2 added

This app demonstrates several Textual patterns working together: a composed layout using Horizontal and Vertical containers, reactive input handling, DataTable row selection with on_data_table_row_selected, and real-time status updates. You can extend it by adding a delete button, persistent storage to a JSON file, or priority sorting.

Frequently Asked Questions

Does Textual work in all terminals?

Textual works best in modern terminals that support 256-color or true-color output and Unicode. It runs well in iTerm2, Windows Terminal, GNOME Terminal, and most other modern terminals. The classic Windows cmd.exe has limited support. You can check compatibility by running textual diagnose from the command line, which reports your terminal’s capabilities.

Do I need to know asyncio to use Textual?

Basic Textual apps work fine without explicit asyncio knowledge — Textual’s event system handles concurrency automatically. When you need to run slow operations (like network requests or file I/O) without blocking the UI, use async def for your handlers and await for async operations. Textual’s worker system also provides a simple self.run_in_thread() method for running blocking code in the background.

Can I put CSS in a separate file?

Yes. Set the class variable CSS_PATH = "app.tcss" to load Textual CSS from an external file. This is the recommended approach for larger apps and enables the live-reload feature in textual-dev, where CSS changes apply instantly to the running app without restarting. Textual CSS uses a subset of CSS syntax with additional terminal-specific properties like scrollbar-gutter.

How do I test a Textual app?

Textual ships with a testing framework built on pytest. Use async with App.run_test() as pilot to get a pilot object that can simulate clicks (pilot.click("#button")), key presses (pilot.press("enter")), and pause for rendering (await pilot.pause()). You can then assert on widget states by querying the live DOM.

How do I distribute a Textual app?

Package it as any standard Python script or library. For a standalone executable, use PyInstaller to bundle the app with its dependencies. Textual’s pure-Python nature means no C extensions are required, making cross-platform distribution straightforward. You can also publish to PyPI and let users install with pip install yourapp.

Conclusion

Textual brings the ergonomics of modern web UI development to the terminal. We covered creating a basic app with compose(), using built-in widgets including Button, Input, Label, and DataTable, writing inline CSS for layout and styling, using reactive attributes for automatic UI updates, and handling events with named listener methods. The task manager example showed how these pieces combine into a complete, interactive application.

Try extending the task manager with priority levels, due dates, and JSON file persistence to save tasks between sessions. Or build a log viewer that tails a file in real time using a background worker and reactive updates.

The official Textual documentation is at textual.textualize.io, with full widget reference, CSS property docs, and a gallery of example apps.

How To Use Python stamina for Resilient Retry Logic

How To Use Python stamina for Resilient Retry Logic

Intermediate

You have built a Python script that calls a third-party API, fetches data from a remote server, or writes to a cloud database. Everything works perfectly in testing — until it doesn’t. The network blips for half a second, the API returns a 503, or the database connection times out. Without retry logic, your script crashes and you lose the entire run. Writing retry loops by hand is tedious, error-prone, and looks different every time.

Python’s stamina library solves this cleanly. It wraps functions with production-grade retry logic using a single decorator, handling exponential backoff, jitter, and configurable limits out of the box. You get resilience without boilerplate. Install it with pip install stamina and you are ready to add retries to any function.

In this article we cover how to install stamina and apply the @stamina.retry decorator, how to configure attempt counts and wait times, how to retry only on specific exception types, how to monitor retry events with structured logging, and how to build a resilient API poller as a real-world project. By the end you will have a reusable pattern for making any Python function retry-safe.

Retrying a Flaky Function: Quick Example

Here is the fastest way to add retry logic to a function that might fail due to network issues. The @stamina.retry decorator handles everything automatically.

# quick_stamina.py
import stamina
import httpx

@stamina.retry(on=httpx.HTTPError, attempts=3)
def fetch_user(user_id: int) -> dict:
    response = httpx.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    response.raise_for_status()
    return response.json()

user = fetch_user(1)
print(user["name"])
print(user["email"])

Output:

Leanne Graham
Sinclair@april.biz

The @stamina.retry decorator wraps fetch_user so that if it raises an httpx.HTTPError, stamina automatically waits and retries up to 3 times before re-raising the exception. The first call that succeeds returns normally — the caller never sees the retries. If all 3 attempts fail, the original exception propagates as if the decorator was not there.

The sections below go deeper: configuring wait times, scoping retries to specific errors, and integrating stamina’s built-in instrumentation into your observability stack.

What Is stamina and Why Use It?

stamina is a Python library built on top of tenacity that provides a simple, opinionated retry decorator for production code. It was designed by Hynek Schlawack (author of attrs and structlog) to be safe by default and integrated with modern observability tools like structlog and Prometheus.

The core idea is that many failure modes in distributed systems are transient — a brief network partition, a rate limit that clears in a second, a momentary database overload. A function that retries with exponential backoff and jitter handles these automatically, making your code resilient without human intervention.

ApproachCode VolumeBackoffJitterObservability
Manual retry loop15-30 linesManualManualNone
tenacity5-10 linesBuilt-inBuilt-inNone
stamina1-2 linesBuilt-inBuilt-instructlog + Prometheus

stamina is the right tool when you want retry logic that is safe by default, readable at a glance, and observable in production — not just in development.

Tutorial image 1
Transient errors are just weather. stamina is the umbrella.

Installing stamina

stamina is available on PyPI. Install it with pip:

# Install stamina and httpx for the examples
pip install stamina httpx

stamina requires Python 3.10 or later and has no mandatory dependencies beyond tenacity. The structlog and prometheus-client integrations are optional — stamina detects them automatically if they are installed.

Basic Retry: Attempts and Wait

The simplest configuration is specifying how many times to try and which exceptions to retry on. stamina uses exponential backoff by default, starting at 0.1 seconds and doubling on each failure, with added jitter to prevent thundering herd problems when many callers retry simultaneously.

# basic_retry.py
import stamina
import httpx

@stamina.retry(
    on=httpx.HTTPStatusError,
    attempts=5,
    wait_initial=0.1,   # Start with 100ms wait
    wait_max=10.0,      # Cap at 10 seconds
    wait_jitter=1.0,    # Add up to 1s random jitter
)
def fetch_post(post_id: int) -> dict:
    response = httpx.get(
        f"https://jsonplaceholder.typicode.com/posts/{post_id}",
        timeout=5.0
    )
    response.raise_for_status()
    return response.json()

post = fetch_post(1)
print(post["title"])

Output:

sunt aut facere repellat provident occaecati excepturi optio reprehenderit

The wait_initial, wait_max, and wait_jitter parameters give you precise control over the retry schedule. The exponential formula means the wait sequence is roughly: 0.1s, 0.2s, 0.4s, 0.8s — each doubled from the last, plus a random jitter up to wait_jitter seconds. Setting wait_max prevents indefinite growth in long retry chains.

Timeout-Based Retry

Instead of counting attempts, you can tell stamina to keep retrying for a fixed duration using the timeout parameter. This is useful for background jobs where you want to retry for up to N seconds regardless of how many individual attempts it takes.

# timeout_retry.py
import stamina
import httpx

@stamina.retry(
    on=httpx.HTTPError,
    timeout=30.0,   # Keep retrying for up to 30 seconds total
)
def fetch_comments(post_id: int) -> list:
    response = httpx.get(
        f"https://jsonplaceholder.typicode.com/comments?postId={post_id}"
    )
    response.raise_for_status()
    return response.json()

comments = fetch_comments(1)
print(f"Fetched {len(comments)} comments")

Output:

Fetched 5 comments

You can combine both attempts and timeout — stamina stops retrying when either limit is reached first. This is the safest production configuration: cap both the number of attempts and the total wall-clock time.

Tutorial image 2
Attempt 2 of 5. The server will blink first.

Retrying on Multiple Exception Types

Real applications fail in multiple ways: connection refused, timeout, HTTP 5xx, rate limit 429. Pass a tuple of exception types to on to retry on any of them.

# multi_exception_retry.py
import stamina
import httpx

RETRYABLE = (
    httpx.ConnectError,
    httpx.TimeoutException,
    httpx.HTTPStatusError,
)

@stamina.retry(on=RETRYABLE, attempts=4, timeout=20.0)
def resilient_get(url: str) -> dict:
    response = httpx.get(url, timeout=5.0)
    # Raise for 4xx/5xx so stamina can catch HTTPStatusError
    response.raise_for_status()
    return response.json()

data = resilient_get("https://jsonplaceholder.typicode.com/todos/1")
print(data["title"], "-- done:", data["completed"])

Output:

delectus aut autem -- done: False

Notice that we define the retryable exceptions as a named tuple at the top of the file. This is a good practice — it documents exactly which errors are considered transient and makes it easy to add new ones. Non-retryable errors (like ValueError or KeyError) are not listed and therefore propagate immediately without retries.

Monitoring Retries with Logging

stamina emits structured log events on every retry attempt when structlog is installed. These events include the function name, attempt number, wait time, and exception details — everything you need to spot patterns in a production log aggregator.

# monitored_retry.py
import logging
import stamina
import httpx

# Set up basic logging so we can see retry events
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s %(name)s -- %(message)s"
)

@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=0.5)
def fetch_album(album_id: int) -> dict:
    response = httpx.get(
        f"https://jsonplaceholder.typicode.com/albums/{album_id}"
    )
    response.raise_for_status()
    return response.json()

album = fetch_album(1)
print(album["title"])

Output (no errors):

quidem molestiae enim

If the first call fails, stamina logs a stamina.retry event at WARNING level with the exception class, message, and wait time before the next attempt. On the final attempt, if it also fails, stamina re-raises the exception normally. This two-level signaling — warnings during retries, exception on exhaustion — integrates cleanly with any log monitoring system.

Loop Larry watching retry attempts on a whiteboard
structlog sees every retry. Your 3am on-call alert won’t.

Disabling Retries in Tests

In unit tests you usually want functions to fail fast — not wait for exponential backoff. stamina provides stamina.set_testing() to globally disable all retries in test environments.

# test_with_stamina.py
import stamina
import httpx
import unittest

@stamina.retry(on=httpx.HTTPError, attempts=5, timeout=30.0)
def fetch_todo(todo_id: int) -> dict:
    response = httpx.get(
        f"https://jsonplaceholder.typicode.com/todos/{todo_id}"
    )
    response.raise_for_status()
    return response.json()

class TestFetchTodo(unittest.TestCase):
    def setUp(self):
        # Disable retries so tests run fast
        stamina.set_testing(True)

    def tearDown(self):
        stamina.set_testing(False)

    def test_fetch_success(self):
        todo = fetch_todo(1)
        self.assertIn("title", todo)
        self.assertIn("completed", todo)

if __name__ == "__main__":
    unittest.main()

Output:

.
----------------------------------------------------------------------
Ran 1 test in 0.312s

OK

With stamina.set_testing(True) active, any decorated function that fails raises the exception immediately on the first attempt, bypassing all retry logic and wait times. This is exactly what you want in tests — deterministic, fast failures that don’t slow down your test suite.

Real-Life Example: Resilient API Poller

Here is a complete poller that fetches paginated data from a REST API with full retry logic. It retries on network errors, logs every failure, and stops cleanly if it exhausts all retries.

Pyro Pete juggling retry tokens
Paginate. Retry. Repeat. Ship.
# resilient_poller.py
import time
import logging
import stamina
import httpx

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s -- %(message)s")

RETRYABLE = (httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError)

@stamina.retry(on=RETRYABLE, attempts=5, wait_initial=0.5, wait_max=8.0, timeout=60.0)
def fetch_page(url: str, page: int, page_size: int = 10) -> list:
    params = {"_page": page, "_limit": page_size}
    response = httpx.get(url, params=params, timeout=10.0)
    response.raise_for_status()
    items = response.json()
    return items

def poll_all_posts(base_url: str, page_size: int = 10) -> list:
    all_posts = []
    page = 1
    while True:
        logging.info(f"Fetching page {page}...")
        items = fetch_page(base_url, page=page, page_size=page_size)
        if not items:
            logging.info("No more pages. Done.")
            break
        all_posts.extend(items)
        logging.info(f"  Got {len(items)} posts (total: {len(all_posts)})")
        page += 1
        if page > 3:   # Limit to first 3 pages for demo
            break
        time.sleep(0.2)   # Polite delay between pages
    return all_posts

if __name__ == "__main__":
    posts = poll_all_posts("https://jsonplaceholder.typicode.com/posts")
    print(f"\nFetched {len(posts)} posts total")
    print("First post title:", posts[0]["title"])
    print("Last post title:", posts[-1]["title"])

Output:

2026-05-13 10:00:01 INFO -- Fetching page 1...
2026-05-13 10:00:01 INFO --   Got 10 posts (total: 10)
2026-05-13 10:00:01 INFO -- Fetching page 2...
2026-05-13 10:00:02 INFO --   Got 10 posts (total: 20)
2026-05-13 10:00:02 INFO -- Fetching page 3...
2026-05-13 10:00:02 INFO --   Got 10 posts (total: 30)

Fetched 30 posts total
First post title: sunt aut facere repellat provident occaecati...
Last post title: at nam consequatur ea labore ea harum

The poller retries any individual page fetch up to 5 times with exponential backoff, but the outer pagination loop continues normally after each success. This architecture means a brief network hiccup on page 7 does not abort the entire 100-page crawl — it just adds a few extra seconds of retry delay before resuming. You can extend this to write results to a database, send to a queue, or write to a file at each page boundary for incremental progress tracking.

Frequently Asked Questions

What is the difference between stamina and tenacity?

stamina is built on top of tenacity and exposes a simpler, more opinionated API. tenacity gives you full control over every retry strategy through a rich configuration DSL. stamina trades some of that flexibility for safer defaults — specifically, it adds jitter by default (tenacity does not), integrates with structlog and Prometheus automatically, and provides set_testing() for test isolation. If you are starting a new project, stamina is the easier choice. For complex custom retry strategies, tenacity may be more appropriate.

Does stamina work with async functions?

Yes. The @stamina.retry decorator works transparently with both sync and async functions. Simply apply the same decorator to an async def function and it will use asyncio.sleep internally instead of time.sleep, ensuring your async event loop is never blocked during retry waits.

How do I make some errors non-retryable?

Only list exceptions in the on parameter that represent transient failures. Any exception not listed propagates immediately. If you want to retry on a broad exception like Exception but exclude certain subtypes, you can raise a custom non-retryable wrapper exception inside the decorated function before calling response.raise_for_status() — for example, converting a 404 to a NotFoundError that is not in your retryable list.

How does the Prometheus integration work?

If prometheus-client is installed, stamina automatically increments a counter named stamina_retries_total on each retry, labelled by callable name and exception type. This gives you a dashboard metric showing which functions are retrying most and what errors they are seeing, without any extra instrumentation code on your part.

Should I always call set_testing in tests?

Yes, if your test suite runs decorated functions. Without stamina.set_testing(True), a test that intentionally triggers an error will wait for the full retry sequence — potentially several seconds of time.sleep calls — before failing. This is especially painful in CI environments with many tests. Use setUp/tearDown or a pytest fixture to enable and disable testing mode around each test that exercises retry-decorated code.

Conclusion

stamina brings production-grade retry logic to Python with a single decorator and zero boilerplate. We covered the core @stamina.retry decorator, configuring attempt counts and wait strategies, scoping retries to specific exception types, combining attempts and timeout for belt-and-suspenders limits, and using set_testing() to keep tests fast. The real-life poller example shows how retry logic at the page-fetch level makes an entire multi-page crawl resilient to transient failures.

Try extending the poller to write each page to a JSON file before moving to the next — that way a crash mid-crawl loses at most one page, not everything. Or add the Prometheus integration and build a Grafana panel showing which functions retry most in your production environment.

The official documentation is at stamina.hynek.me. The companion tenacity docs at tenacity.readthedocs.io are also worth reading if you need to understand what is happening under the hood.

How To Use Python difflib for Comparing Text and Sequences

How To Use Python difflib for Comparing Text and Sequences

Intermediate

You need to show what changed between two versions of a config file. Or find the closest matching product name from a misspelled user query. Or detect whether two documents are the same despite minor formatting differences. These are all sequence comparison problems, and Python’s standard library has a dedicated module for them: difflib. It is the same engine that powers many code review tools, fuzzy finders, and spell checkers.

difflib is a pure-Python standard library module (no installation needed) that computes differences between sequences. A sequence can be a list of strings (lines of a file), a list of characters, or any other ordered collection. The module provides several tools: SequenceMatcher for computing similarity ratios, Differ for human-readable line-by-line diffs, HtmlDiff for side-by-side HTML diffs, and get_close_matches() for fuzzy string matching.

This article covers the full toolkit: computing similarity scores with SequenceMatcher, generating diffs in unified and context formats, finding close matches, building an HTML diff viewer, and applying everything in a real-world config file auditor. By the end, you will have practical tools for text comparison tasks that previously required external libraries.

difflib Quick Example

# quick_difflib.py
import difflib

# Compare two versions of a config file
old = ["host = localhost\n", "port = 5432\n", "debug = False\n"]
new = ["host = db.prod.example.com\n", "port = 5432\n", "debug = False\n", "pool_size = 10\n"]

# Generate a unified diff (like git diff)
diff = difflib.unified_diff(old, new, fromfile="config.old", tofile="config.new", n=1)
print("".join(diff))

# Similarity ratio between two strings
matcher = difflib.SequenceMatcher(None, "python", "pytohn")
print(f"\nSimilarity: {matcher.ratio():.2%}")  # 0.833 = 83.3%

# Fuzzy close matches
matches = difflib.get_close_matches("pythno", ["python", "java", "ruby", "perl"])
print(f"Close matches: {matches}")

Output:

--- config.old
+++ config.new
@@ -1,2 +1,2 @@
-host = localhost
+host = db.prod.example.com
 port = 5432

Similarity: 83.33%
Close matches: ['python']

Three distinct tools shown in one example: unified diff for change tracking, SequenceMatcher for similarity scoring, and get_close_matches() for fuzzy lookup. Each addresses a different comparison need, and together they cover the majority of text comparison tasks you will encounter.

What Is difflib and What Can It Do?

difflib implements Ratcliff/Obershelp pattern matching (similar to the Gestalt approach) to find the longest common subsequences in two sequences. It does not use edit distance (Levenshtein), which means it handles block moves and multi-line changes well, making it more suitable for text files than character-level edit distance metrics.

Function / ClassInputOutputUse Case
SequenceMatcherTwo sequencesSimilarity ratio, opcodesSimilarity scoring, change detection
unified_diff()Two line listsUnified diff linesgit-style change display
context_diff()Two line listsContext diff linesTraditional diff format
DifferTwo line listsHuman-readable diffReadable change display
HtmlDiffTwo line listsHTML tableSide-by-side web display
get_close_matches()String + word listList of close matchesFuzzy search, spell check
unified_diff shows changes between code versions
unified_diff() — because “it changed somewhere” is not a code review.

SequenceMatcher: Similarity Ratios and Change Blocks

SequenceMatcher is the core engine underlying all other difflib tools. Instantiate it with two sequences and call ratio() for a 0.0-to-1.0 similarity score, or get_opcodes() to get a list of edit operations that transforms sequence A into sequence B.

# sequence_matcher.py
from difflib import SequenceMatcher

# Compare two strings character by character
a = "The quick brown fox jumps over the lazy dog"
b = "The quick brown cat jumps over the lazy dog"

matcher = SequenceMatcher(None, a, b)
print(f"Ratio: {matcher.ratio():.4f}")         # 0.9302
print(f"Quick ratio: {matcher.quick_ratio():.4f}")  # Upper bound, faster to compute

# Get the exact changes
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
    if tag != "equal":
        print(f"  {tag}: '{a[i1:i2]}' -> '{b[j1:j2]}'")

print()

# Compare lists of lines (more natural for text files)
lines_a = ["def greet(name):", "    print(f'Hello {name}')", "    return True"]
lines_b = ["def greet(name):", "    print(f'Hi {name}!')", "    return True"]

matcher2 = SequenceMatcher(None, lines_a, lines_b)
print(f"Line similarity: {matcher2.ratio():.4f}")

for tag, i1, i2, j1, j2 in matcher2.get_opcodes():
    if tag == "replace":
        print(f"  Changed line {i1+1}:")
        print(f"    FROM: {lines_a[i1:i2]}")
        print(f"    TO:   {lines_b[j1:j2]}")

Output:

Ratio: 0.9302
Quick ratio: 0.9302
  replace: 'fox' -> 'cat'

Line similarity: 0.8000
  Changed line 2:
    FROM: ["    print(f'Hello {name}')"]
    TO:   ["    print(f'Hi {name}!')"]

The opcodes are a low-level but powerful API. The five operations are "equal" (no change), "insert" (add from B), "delete" (remove from A), "replace" (change in place), and "equal". Build a custom diff renderer by iterating opcodes and coloring each section — this is exactly what code editors do for inline change highlighting.

The autojunk Parameter

By default, SequenceMatcher ignores “junk” elements: lines that appear in more than 1% of the sequence. For code files this is usually helpful (blank lines, common keywords), but for short strings it can produce unexpected results. Pass autojunk=False to disable this heuristic when comparing short strings or structured data.

Generating Diffs: unified_diff and context_diff

For human-readable change reports, unified_diff() produces the familiar ---/+++/@@@ format used by git, patch, and most code review tools.

# unified_diff_demo.py
import difflib

def compare_files(file_a: str, file_b: str, context_lines: int = 3) -> str:
    """Compare two file contents and return a unified diff string."""
    lines_a = file_a.splitlines(keepends=True)
    lines_b = file_b.splitlines(keepends=True)
    
    diff = difflib.unified_diff(
        lines_a, lines_b,
        fromfile="original.py",
        tofile="modified.py",
        n=context_lines
    )
    return "".join(diff)

original = """def calculate_discount(price, percent):
    discount = price * percent
    return discount

def apply_coupon(order, code):
    if code == "SAVE10":
        return order - 10
    return order
"""

modified = """def calculate_discount(price, percent):
    if percent > 100:
        raise ValueError("Discount cannot exceed 100%")
    discount = price * (percent / 100)
    return discount

def apply_coupon(order, code, user_id=None):
    if code == "SAVE10":
        return order - 10
    if code == "SAVE20":
        return order - 20
    return order
"""

result = compare_files(original, modified)
print(result if result else "No differences found.")

Output:

--- original.py
+++ modified.py
@@ -1,4 +1,6 @@
 def calculate_discount(price, percent):
+    if percent > 100:
+        raise ValueError("Discount cannot exceed 100%")
-    discount = price * percent
+    discount = price * (percent / 100)
     return discount
 
-def apply_coupon(order, code):
+def apply_coupon(order, code, user_id=None):
     if code == "SAVE10":
         return order - 10
+    if code == "SAVE20":
+        return order - 20
     return order

The n=context_lines parameter controls how many unchanged lines to show around each change. The default is 3. Use n=0 to show only changed lines (like a “what changed” summary), or n=999 to show the full file with changes highlighted.

get_close_matches: Fuzzy String Lookup

get_close_matches() is the simplest path to fuzzy matching: give it a word and a vocabulary list, and it returns the best matches above a similarity threshold.

# close_matches.py
from difflib import get_close_matches

vocabulary = [
    "python", "javascript", "typescript", "java", "kotlin",
    "swift", "rust", "golang", "ruby", "scala",
]

# Basic usage
print(get_close_matches("pyhton", vocabulary))     # ['python']
print(get_close_matches("jvascript", vocabulary))  # ['javascript']
print(get_close_matches("xyz", vocabulary))        # [] -- no close match

# Control sensitivity with n and cutoff
print(get_close_matches("java", vocabulary, n=3, cutoff=0.4))
# ['java', 'javascript'] -- lower cutoff finds more results

# Practical use: did-you-mean suggestion
def did_you_mean(word: str, options: list[str]) -> str | None:
    matches = get_close_matches(word, options, n=1, cutoff=0.6)
    return matches[0] if matches else None

commands = ["start", "stop", "restart", "status", "reload"]
user_input = "reestart"
suggestion = did_you_mean(user_input, commands)
if suggestion:
    print(f"Unknown command '{user_input}'. Did you mean '{suggestion}'?")

Output:

['python']
['javascript']
[]
['java', 'javascript']
Unknown command 'reestart'. Did you mean 'restart'?

The cutoff parameter (default 0.6) controls how similar a match must be to be included. A lower cutoff (0.4) catches more distant matches but produces more false positives. A higher cutoff (0.8) is stricter but misses matches with more typos. For command-line “did you mean?” suggestions, 0.6 is a reasonable starting point.

get_close_matches finds approximate string matches
get_close_matches() — 83% is close enough to be “python”.

Real-Life Example: Config File Auditor

This project compares a deployed config file against a template to detect drift, showing exactly what changed in a human-readable report.

# config_auditor.py
import difflib
from dataclasses import dataclass, field
from typing import List

@dataclass
class ConfigAuditResult:
    similarity: float
    added_lines: List[str] = field(default_factory=list)
    removed_lines: List[str] = field(default_factory=list)
    changed_sections: List[str] = field(default_factory=list)
    
    @property
    def has_drift(self) -> bool:
        return self.similarity < 1.0
    
    def summary(self) -> str:
        if not self.has_drift:
            return "No drift detected. Config matches template."
        return (
            f"Config drift detected (similarity: {self.similarity:.1%})\n"
            f"  Added lines:   {len(self.added_lines)}\n"
            f"  Removed lines: {len(self.removed_lines)}"
        )

def audit_config(template: str, deployed: str) -> ConfigAuditResult:
    """Compare deployed config against template and return audit result."""
    template_lines = template.splitlines(keepends=True)
    deployed_lines = deployed.splitlines(keepends=True)
    
    matcher = difflib.SequenceMatcher(None, template_lines, deployed_lines)
    result = ConfigAuditResult(similarity=matcher.ratio())
    
    for tag, i1, i2, j1, j2 in matcher.get_opcodes():
        if tag == "delete":
            result.removed_lines.extend(template_lines[i1:i2])
        elif tag == "insert":
            result.added_lines.extend(deployed_lines[j1:j2])
        elif tag == "replace":
            result.removed_lines.extend(template_lines[i1:i2])
            result.added_lines.extend(deployed_lines[j1:j2])
            result.changed_sections.append(template_lines[i1].strip())
    
    return result

def print_diff(template: str, deployed: str) -> None:
    """Print a unified diff between template and deployed config."""
    diff = difflib.unified_diff(
        template.splitlines(keepends=True),
        deployed.splitlines(keepends=True),
        fromfile="template.conf",
        tofile="deployed.conf",
        n=2
    )
    diff_text = "".join(diff)
    if diff_text:
        print(diff_text)
    else:
        print("Files are identical.")

# Example usage
TEMPLATE = """[server]
host = 0.0.0.0
port = 8080
workers = 4
timeout = 30

[database]
host = db.internal
port = 5432
pool_size = 10
"""

DEPLOYED = """[server]
host = 0.0.0.0
port = 9090
workers = 4
timeout = 30

[database]
host = db.internal
port = 5432
pool_size = 20
max_overflow = 5
"""

result = audit_config(TEMPLATE, DEPLOYED)
print(result.summary())
print()
print("--- Unified Diff ---")
print_diff(TEMPLATE, DEPLOYED)
print()
print(f"Changed sections: {result.changed_sections}")

Output:

Config drift detected (similarity: 83.0%)
  Added lines:   3
  Removed lines: 2

--- Unified Diff ---
--- template.conf
+++ deployed.conf
@@ -1,6 +1,6 @@
 [server]
 host = 0.0.0.0
-port = 8080
+port = 9090
 workers = 4
@@ -9,3 +9,5 @@
 host = db.internal
 port = 5432
-pool_size = 10
+pool_size = 20
+max_overflow = 5

Changed sections: ['port = 8080\n', 'pool_size = 10\n']

The ConfigAuditResult dataclass separates the raw diff data (added, removed lines) from the derived properties (has_drift, summary()). This structure makes the auditor easy to extend: add a critical_fields list to flag specific settings (like host changes) as high-severity drift.

Side-by-side diff: the smallest possible debugger.
Side-by-side diff: the smallest possible debugger.

Frequently Asked Questions

What is the difference between ratio() and quick_ratio()?

ratio() computes the exact similarity ratio by performing the full sequence comparison. quick_ratio() uses a faster upper-bound estimate that overestimates the true ratio. real_quick_ratio() is even faster but less accurate. Use quick_ratio() as a preliminary filter when you have many candidates: if quick_ratio() < threshold, skip the expensive ratio() call. This optimisation is built into get_close_matches() internally.

When should I use autojunk=False?

Disable autojunk when comparing short strings or structured data where “common” lines should not be discounted. The autojunk heuristic marks elements appearing in more than 1% of the longer sequence as junk. In a short config file, a blank line might appear in only 2 places yet qualify as junk due to the small total line count. Pass SequenceMatcher(isjunk=None, a=a, b=b, autojunk=False) to disable this behaviour.

How do I generate a side-by-side HTML diff?

Use difflib.HtmlDiff(). Call HtmlDiff().make_file(a_lines, b_lines, fromdesc, todesc) to get a complete HTML page with side-by-side tables, highlighting, and legend. Save it to a .html file and open it in a browser. This is useful for generating code review reports: just call make_file() in a loop for each changed file and write the outputs to a folder.

How does difflib compare to Levenshtein distance?

Levenshtein distance counts the minimum character-level edits (insert, delete, substitute) between two strings. difflib uses the Ratcliff/Obershelp algorithm, which finds the longest matching substrings recursively. Levenshtein is better for single-word spell-checking (it handles transpositions naturally). difflib is better for multi-line text comparison because it handles block moves gracefully. For production spell-checkers, use the python-Levenshtein or rapidfuzz library, which are implemented in C and significantly faster than difflib.

Is difflib fast enough for large files?

For files up to a few thousand lines, difflib is fast enough for interactive use. For larger files (tens of thousands of lines), consider calling SequenceMatcher once and caching the result, or use the C-based python-Levenshtein library for pure string comparison. The main performance lever is the autojunk heuristic — it is on by default and significantly speeds up comparison of files with repeated lines (like log files).

Conclusion

Python’s difflib module provides a complete text comparison toolkit without any external dependencies. You have learned how to compute similarity ratios with SequenceMatcher, generate unified and context diffs for change tracking, use get_close_matches() for fuzzy string lookup, and build a config auditor that detects and reports configuration drift. The Ratcliff/Obershelp algorithm at difflib’s core handles multi-line block moves well, making it a natural fit for file comparison tasks.

Extend the config auditor by adding an HTML diff output (using HtmlDiff().make_file()), integrating it into a CI pipeline to fail builds when critical settings drift from the template, or adapting the fuzzy matcher into a search autocomplete feature for a command-line tool. All three extensions build directly on what you have learned here.

For the full API reference, see the official difflib documentation.

How To Use Python xmltodict for XML to Dictionary Conversion

How To Use Python xmltodict for XML to Dictionary Conversion

Beginner

XML is everywhere you do not want it to be: legacy payment gateways, government data feeds, enterprise software exports, and RSS/Atom feeds. Working with Python’s built-in xml.etree.ElementTree means navigating tree nodes, iterating children, and checking for None everywhere — for what is often a simple data extraction task. What if you could treat XML like JSON and access values with dictionary keys instead?

The xmltodict library does exactly that: it parses an XML string or file into a standard Python dictionary using a single function call. Going the other way — from a dictionary back to XML — is equally simple. The library is small (one file), fast, and handles attributes, namespaces, and nested elements gracefully.

This article covers installing xmltodict, parsing XML strings and files, handling attributes and namespaces, dealing with repeated elements that may be lists or single items, converting back to XML with unparse(), and processing real-world RSS feeds. By the end, you will be able to consume any XML data source with the same ease as a JSON API.

xmltodict Quick Example

Here is the core workflow in six lines:

# quick_xmltodict.py
import xmltodict

xml_data = """

    Python Tricks
    Dan Bader
    29.99

"""

data = xmltodict.parse(xml_data)
print(data["book"]["title"])                    # Python Tricks
print(data["book"]["price"]["#text"])           # 29.99
print(data["book"]["price"]["@currency"])       # USD

Output:

Python Tricks
29.99
USD

That is the entire API for basic use. xmltodict.parse() takes a string and returns an OrderedDict (which behaves like a regular dict). XML attributes are prefixed with @ and text content mixed with attributes uses #text. The sections below cover more complex scenarios including lists, namespaces, and real-world feeds.

What Is xmltodict and How Does It Compare to ElementTree?

XML has two common parsing approaches in Python: tree-based (ElementTree, lxml) and event-based (SAX). Both require navigating the tree structure explicitly. xmltodict adds a third option: dictionary conversion, which is the simplest approach for small-to-medium XML documents where you just need to extract data.

LibraryAPI StyleLarge FilesNamespace SupportBest For
xmltodictDict accessStreaming modeYesSimple extraction, quick scripts
ElementTreeTree nodesYes (iterparse)With prefixStandard library, tree traversal
lxmlTree + XPathYesFullComplex queries, fast parsing
BeautifulSoupCSS/tag selectorsNoPartialHTML + XML web scraping

For most data engineering tasks — parsing API responses, reading config files, processing RSS feeds — xmltodict is the simplest solution. Reach for lxml when you need XPath queries or when performance on large documents matters, and for truly massive files (gigabytes), use ElementTree’s iterparse().

xmltodict.parse converts XML to Python dict
xmltodict.parse() — because nobody enjoys .find(‘child’).text

Installation

Install xmltodict with pip. It has no dependencies beyond the Python standard library.

pip install xmltodict

Verify the installation:

python -c "import xmltodict; print(xmltodict.__version__)"

Output:

0.13.0

Parsing XML: Strings and Files

The primary entry point is xmltodict.parse(), which accepts either a string or a file-like object.

Parsing XML Strings

# parsing_strings.py
import xmltodict
import json

xml = """

    
        Learning Python
        Mark Lutz
        2013
        49.99
    
    
        Fluent Python
        Luciano Ramalho
        2022
        59.99
    

"""

data = xmltodict.parse(xml)

# Navigate the structure
catalog = data["catalog"]
books = catalog["book"]  # This is a LIST when there are multiple  elements

for book in books:
    book_id = book["@id"]          # Attribute (prefixed with @)
    title = book["title"]          # Child element text
    price = float(book["price"])
    print(f"[{book_id}] {title}: ${price:.2f}")

# Pretty-print the full dict as JSON for inspection
print("\nFull structure:")
print(json.dumps(data, indent=2))

Output:

[001] Learning Python: $49.99
[002] Fluent Python: $59.99

Full structure:
{
  "catalog": {
    "book": [
      {
        "@id": "001",
        "title": "Learning Python",
        "author": "Mark Lutz",
        "year": "2013",
        "price": "49.99"
      },
      {
        "@id": "002",
        "title": "Fluent Python",
        "author": "Luciano Ramalho",
        "year": "2022",
        "price": "59.99"
      }
    ]
  }
}

Important: all values from xmltodict.parse() are strings by default, even numbers. Always convert explicitly with float(), int(), or a type-coercion step after parsing. The library deliberately avoids type inference to preserve exact values from the source XML.

Parsing XML Files

# parsing_files.py
import xmltodict

# Method 1: Pass a file object directly
with open("config.xml", "rb") as f:
    data = xmltodict.parse(f)

# Method 2: Read and parse in one step (for small files)
with open("config.xml", "r", encoding="utf-8") as f:
    data = xmltodict.parse(f.read())

# Method 3: Parse from a URL response (using requests)
import requests
response = requests.get("https://feeds.bbci.co.uk/news/rss.xml")
feed = xmltodict.parse(response.content)
channel = feed["rss"]["channel"]
print(f"Feed title: {channel['title']}")
print(f"Items: {len(channel['item'])}")

Output:

Feed title: BBC News
Items: 20

Use open(file, "rb") (binary mode) when the XML file might have a BOM (byte order mark) or unusual encoding declaration — xmltodict handles encoding detection automatically from binary input. For text mode, make sure to specify the correct encoding explicitly.

Handling the List vs Single-Item Problem

The trickiest part of xmltodict is that a repeated element becomes a list, but a single element becomes a dict. This inconsistency can cause KeyError or TypeError in production when the data volume varies.

# list_problem.py
import xmltodict

# One book: data["catalog"]["book"] is a DICT
one_book_xml = "A"
data_one = xmltodict.parse(one_book_xml)
print(type(data_one["catalog"]["book"]))  # 

# Two books: data["catalog"]["book"] is a LIST
two_books_xml = """
    A
    B
"""
data_two = xmltodict.parse(two_books_xml)
print(type(data_two["catalog"]["book"]))  # 

# Solution 1: force_list parameter
data_forced = xmltodict.parse(one_book_xml, force_list={"book"})
print(type(data_forced["catalog"]["book"]))  #  -- always a list!

# Solution 2: defensive helper
def ensure_list(value):
    if value is None:
        return []
    return value if isinstance(value, list) else [value]

books = ensure_list(data_one["catalog"].get("book"))
print(f"Books (always iterable): {len(books)}")  # 1

Output:




Books (always iterable): 1

The force_list={"book"} parameter is the cleanest solution when you know in advance which element names should always be lists. Pass a set of tag names and xmltodict wraps them in a list even when only one is present. This eliminates the type inconsistency at the source and removes the need for defensive isinstance checks throughout your code.

force_list ensures consistent list output from xmltodict
force_list={‘item’} — because one result should not crash your loop.

Converting Back to XML with unparse()

The reverse operation — dictionary to XML — uses xmltodict.unparse(). This is useful for generating XML config files, building API request bodies, or transforming data between formats.

# unparse_demo.py
import xmltodict

data = {
    "config": {
        "@version": "1.0",
        "database": {
            "host": "localhost",
            "port": "5432",
            "name": "myapp",
            "credentials": {
                "@encrypted": "true",
                "username": "admin",
                "password": "s3cr3t",
            },
        },
        "cache": {
            "backend": "redis",
            "ttl": "3600",
        },
        "features": {
            "feature": ["dark_mode", "beta_api", "experimental_ui"]
        },
    }
}

# Convert to formatted XML
xml_output = xmltodict.unparse(data, pretty=True, indent="    ")
print(xml_output)

Output:

<?xml version="1.0" encoding="utf-8"?>
<config version="1.0">
    <database>
        <host>localhost</host>
        <port>5432</port>
        <name>myapp</name>
        <credentials encrypted="true">
            <username>admin</username>
            <password>s3cr3t</password>
        </credentials>
    </database>
    <cache>
        <backend>redis</backend>
        <ttl>3600</ttl>
    </cache>
    <features>
        <feature>dark_mode</feature>
        <feature>beta_api</feature>
        <feature>experimental_ui</feature>
    </features>
</config>

The list ["dark_mode", "beta_api", "experimental_ui"] is correctly serialised as three repeated <feature> elements. Dictionary keys starting with @ become XML attributes and the #text key becomes the element’s text content — the same conventions as parsing, in reverse.

XML in. Dict out. xmltodict does the heavy lifting.
XML in. Dict out. xmltodict does the heavy lifting.

Real-Life Example: RSS Feed Parser

This project parses a real RSS feed, extracts article metadata, and formats it as a structured report.

# rss_parser.py
import xmltodict
import requests
from datetime import datetime
import re

def parse_rss_feed(url: str) -> list[dict]:
    """Fetch and parse an RSS feed, returning a list of article dicts."""
    response = requests.get(url, timeout=10, headers={"User-Agent": "Mozilla/5.0"})
    response.raise_for_status()
    
    feed = xmltodict.parse(response.content, force_list={"item"})
    channel = feed["rss"]["channel"]
    
    articles = []
    items = channel.get("item") or []
    
    for item in items:
        # Defensive parsing -- not all feeds have all fields
        title = item.get("title", "Untitled")
        link = item.get("link", "")
        description = item.get("description", "")
        pub_date_raw = item.get("pubDate", "")
        
        # Strip HTML from description
        clean_desc = re.sub(r"<[^>]+>", "", description)[:200]
        
        articles.append({
            "title": title,
            "link": link,
            "summary": clean_desc.strip(),
            "published": pub_date_raw,
        })
    
    return articles

def print_feed_report(articles: list[dict], max_items: int = 5) -> None:
    print(f"\nLatest {min(max_items, len(articles))} articles:")
    print("-" * 60)
    for article in articles[:max_items]:
        print(f"TITLE: {article['title'][:70]}")
        print(f"  URL: {article['link'][:70]}")
        if article["summary"]:
            print(f"  SUMMARY: {article['summary'][:100]}...")
        print(f"  PUBLISHED: {article['published']}")
        print()

# Real public RSS feeds for testing
FEEDS = {
    "Python Blog": "https://feeds.feedburner.com/PythonInsider",
    "Real Python": "https://realpython.com/atom.xml",
}

for name, url in FEEDS.items():
    try:
        articles = parse_rss_feed(url)
        print(f"\n=== {name} ({len(articles)} articles) ===")
        print_feed_report(articles)
    except Exception as e:
        print(f"Error fetching {name}: {e}")

Output:

=== Python Blog (10 articles) ===

Latest 5 articles:
------------------------------------------------------------
TITLE: Python 3.14 alpha 7 released
  URL: http://feedproxy.google.com/~r/PythonInsider/~3/...
  SUMMARY: We are pleased to announce the release of Python 3.14...
  PUBLISHED: Tue, 18 Feb 2025 15:30:00 +0000

TITLE: Python 3.13.1 released
  ...

The force_list={"item"} ensures the items loop always works, whether the feed has one article or hundreds. The defensive item.get("description", "") pattern handles feeds that omit optional fields. Real RSS feeds from different publishers vary significantly in which optional fields they include — defensive parsing is what keeps this script running across all of them.

Frequently Asked Questions

How do I handle XML namespaces with xmltodict?

By default, namespaces appear in keys as prefixes like "ns0:element". Pass process_namespaces=True to xmltodict.parse() to expand namespace URIs into the key names, or pass namespaces={"http://ns.uri": "short"} to map namespace URIs to your own short prefixes. For most practical use cases — SOAP APIs, Atom feeds, Office XML — the default prefix approach works fine since you can predict the prefix from the XML declaration.

Can xmltodict handle large XML files?

For files larger than ~10 MB, use the streaming API: xmltodict.parse(f, item_depth=2, item_callback=my_fn). The item_depth parameter specifies how deep the elements are that you want to stream, and item_callback is called with each item as it is parsed. Return True from the callback to continue or False to stop early. This keeps memory usage constant regardless of file size.

What encodings does xmltodict support?

xmltodict relies on the underlying expat parser (via Python’s xml.parsers.expat), which supports all standard XML encodings: UTF-8, UTF-16, ISO-8859-1, and more. When reading from a file, open in binary mode ("rb") and let xmltodict detect the encoding from the XML declaration. When parsing a string, ensure it is already decoded to Unicode first.

How do I access CDATA sections?

CDATA sections (<![CDATA[...]]>) are automatically decoded and their content is accessible as the element’s text content, just like regular text. No special handling is needed — xmltodict merges CDATA content with regular text content transparently. The #text key holds the combined value when an element has both attributes and text content (whether that text comes from CDATA or regular character data).

xmltodict converts everything to strings — how do I get proper types?

Pass a postprocessor function to xmltodict.parse(). The function receives (path, key, value) for every element and can return a modified (key, value) pair. Use it to convert numeric strings to int or float, parse date strings with datetime.fromisoformat(), or apply any other transformation. Alternatively, use Pydantic to validate and type-coerce the dictionary after parsing — this approach also handles nested structure validation cleanly.

Conclusion

xmltodict makes XML feel like JSON. You have learned how to parse XML strings and files with xmltodict.parse(), access attributes with the @ prefix and mixed content with #text, handle the single-item vs list inconsistency with force_list, convert dictionaries back to XML with xmltodict.unparse(), and apply it all to a real RSS feed parser. The defensive parsing patterns — item.get(), force_list, and stripping HTML from descriptions — are what make the RSS parser work reliably across the varied real-world feeds you will encounter.

From here, extend the RSS parser to write articles to a database (use SQLite via the sqlite3 module), add deduplication based on the article URL, or schedule it to run hourly and alert you to new articles on topics you care about. All of these extensions require only standard Python libraries alongside xmltodict.

See the xmltodict GitHub repository for the full API documentation including streaming mode and postprocessor examples.

How To Use Python unittest.mock for Mocking Dependencies

How To Use Python unittest.mock for Mocking Dependencies

Intermediate

You have written a function that sends emails, calls a third-party API, or reads from a database. You want to write unit tests for it, but running those tests would actually send emails, hit the live API, and modify real data. Worse, the tests would be slow, flaky (depending on network availability), and expensive (API calls cost money). The solution is mocking: replacing the real dependency with a controlled fake that behaves exactly how you tell it to.

Python’s unittest.mock module is the standard library’s built-in mocking toolkit. It ships with Python 3.3+ at no extra installation cost and integrates seamlessly with both unittest and pytest. The module provides Mock and MagicMock objects that record all calls made to them, plus a patch() decorator and context manager that temporarily swaps real objects with mocks in your code’s namespace.

This article covers the complete mocking workflow: creating basic mocks, configuring return values and side effects, using patch() to intercept dependencies, mocking HTTP requests, mocking time, and asserting on mock call history. By the end, you will be able to test any function in isolation, no matter what external systems it depends on.

unittest.mock Quick Example

Here is the pattern in its simplest form: a function that calls an external API, tested without hitting any network.

# quick_mock_example.py
import unittest
from unittest.mock import patch, MagicMock

def get_user_name(user_id: int) -> str:
    """Fetches a user name from an external API (real implementation)."""
    import requests
    response = requests.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    response.raise_for_status()
    return response.json()["name"]

class TestGetUserName(unittest.TestCase):
    
    @patch("quick_mock_example.requests.get")
    def test_returns_user_name(self, mock_get):
        # Configure the fake response
        mock_response = MagicMock()
        mock_response.json.return_value = {"id": 1, "name": "Leanne Graham"}
        mock_response.raise_for_status.return_value = None
        mock_get.return_value = mock_response
        
        # Call the function under test
        result = get_user_name(1)
        
        # Assertions
        self.assertEqual(result, "Leanne Graham")
        mock_get.assert_called_once_with(
            "https://jsonplaceholder.typicode.com/users/1"
        )

if __name__ == "__main__":
    unittest.main()

Output:

python -m pytest quick_mock_example.py -v

quick_mock_example.py::TestGetUserName::test_returns_user_name PASSED

1 passed in 0.04s

The test completes in 40 milliseconds with zero network calls. The @patch() decorator intercepts requests.get in the module under test and replaces it with a MagicMock. The mock records the call and returns exactly what we told it to. The deeper sections explain each component in detail.

Mock and MagicMock: The Foundation

A Mock object accepts any attribute access and any method call without raising errors. Every call is recorded. MagicMock is a subclass that additionally supports Python’s magic methods (__len__, __iter__, __enter__, etc.), making it suitable for mocking context managers and containers.

# mock_basics.py
from unittest.mock import Mock, MagicMock

# Any attribute access returns another Mock
m = Mock()
print(m.anything)           # 
print(m.foo.bar.baz())      # 

# Configure return values
m.get_price.return_value = 9.99
print(m.get_price())        # 9.99

# Raise an exception
m.failing_call.side_effect = ValueError("Something went wrong")
try:
    m.failing_call()
except ValueError as e:
    print(e)                # Something went wrong

# Check call history
m.calculate(10, 20)
print(m.calculate.called)               # True
print(m.calculate.call_count)           # 1
print(m.calculate.call_args)            # call(10, 20)

# MagicMock supports __len__, __iter__, context manager
mm = MagicMock()
mm.__len__.return_value = 5
print(len(mm))              # 5

mm.__iter__.return_value = iter([1, 2, 3])
print(list(mm))             # [1, 2, 3]

Output:



9.99
Something went wrong
True
1
call(10, 20)
5
[1, 2, 3]

The key distinction between Mock and MagicMock: use MagicMock by default (it covers everything Mock does plus magic methods), and only drop to Mock if you specifically want attribute access on undefined magic methods to raise AttributeError rather than silently returning another Mock.

Mock replaces real API calls with controlled responses
No network. No database. No problem. The mock says what we need it to say.

The patch() Function: Intercepting Dependencies

patch() is the workhorse of unittest.mock. It temporarily replaces an object in a specific module’s namespace for the duration of a test, then restores it afterward. The target string must be the full dotted path to the object as it is imported in the code under test — not where it is originally defined.

patch() as a Decorator

# patch_decorator.py
import unittest
from unittest.mock import patch, MagicMock

# --- Code under test ---
import os

def get_home_directory() -> str:
    return os.path.expanduser("~")

def read_config_file(path: str) -> dict:
    import json
    with open(path) as f:
        return json.load(f)

# --- Tests ---
class TestWithPatch(unittest.TestCase):
    
    @patch("patch_decorator.os.path.expanduser")
    def test_get_home(self, mock_expand):
        mock_expand.return_value = "/home/testuser"
        result = get_home_directory()
        self.assertEqual(result, "/home/testuser")
        mock_expand.assert_called_once_with("~")
    
    @patch("builtins.open", new_callable=MagicMock)
    @patch("patch_decorator.json.load")
    def test_read_config(self, mock_json_load, mock_open):
        mock_json_load.return_value = {"debug": True, "port": 8080}
        result = read_config_file("/fake/config.json")
        self.assertEqual(result["port"], 8080)
        mock_open.assert_called_once_with("/fake/config.json")

if __name__ == "__main__":
    unittest.main()

Output:

..
----------------------------------------------------------------------
Ran 2 tests in 0.003s
OK

When stacking multiple @patch() decorators, the mocks are passed to the test function in bottom-up order — the bottom-most decorator’s mock is the first argument. This is a common source of confusion. In the example above, mock_json_load comes from the lower @patch("patch_decorator.json.load") and mock_open from the upper @patch("builtins.open").

patch() as a Context Manager

Use the context manager form when you only need the mock for part of a test, or when you are not using a test class.

# patch_context.py
from unittest.mock import patch
import pytest

def send_notification(message: str, email: str) -> bool:
    """Sends an email notification. Returns True on success."""
    import smtplib
    with smtplib.SMTP("smtp.gmail.com", 587) as server:
        server.starttls()
        server.sendmail("noreply@example.com", email, message)
    return True

def test_send_notification_success():
    with patch("patch_context.smtplib.SMTP") as mock_smtp_class:
        mock_server = mock_smtp_class.return_value.__enter__.return_value
        mock_server.sendmail.return_value = {}
        
        result = send_notification("Hello!", "user@example.com")
        
        assert result is True
        mock_server.starttls.assert_called_once()
        mock_server.sendmail.assert_called_once_with(
            "noreply@example.com", "user@example.com", "Hello!"
        )

Output:

pytest patch_context.py -v

patch_context.py::test_send_notification_success PASSED

Mocking a context manager requires one extra step: you need to mock the object returned by __enter__. In the example above, mock_smtp_class.return_value is what smtplib.SMTP("smtp.gmail.com", 587) returns, and .__enter__.return_value is the server variable inside the with block. This chain is the standard pattern for any with statement mock.

Side Effects: Sequences, Exceptions, and Callables

The side_effect attribute gives you fine-grained control over what happens when a mock is called. You can make it raise exceptions, return different values on successive calls, or run a custom function.

# side_effects.py
from unittest.mock import Mock, patch
import unittest

class TestSideEffects(unittest.TestCase):
    
    def test_raise_on_third_call(self):
        """Simulate a flaky API that fails on the 3rd attempt."""
        mock_api = Mock()
        mock_api.side_effect = [
            {"status": "ok", "data": "first"},
            {"status": "ok", "data": "second"},
            ConnectionError("API unreachable"),
        ]
        
        self.assertEqual(mock_api()["data"], "first")
        self.assertEqual(mock_api()["data"], "second")
        with self.assertRaises(ConnectionError):
            mock_api()
    
    def test_dynamic_response(self):
        """Return different responses based on the input."""
        def dynamic(url):
            if "users" in url:
                return {"type": "user", "id": 1}
            return {"type": "unknown"}
        
        mock_get = Mock(side_effect=dynamic)
        self.assertEqual(mock_get("https://api.example.com/users/1")["type"], "user")
        self.assertEqual(mock_get("https://api.example.com/other")["type"], "unknown")

if __name__ == "__main__":
    unittest.main()

Output:

..
----------------------------------------------------------------------
Ran 2 tests in 0.001s
OK

The list-based side_effect is extremely useful for testing retry logic: set it to a list where the first N elements are exceptions and the last element is the successful response. Your retry function should exhaust the exceptions and succeed on the final call. If the mock runs out of values in the list, it raises StopIteration on the next call.

side_effect list simulates retry logic
side_effect=[error, error, success]. Retry logic tested.

Asserting on Calls

After running the code under test, verify that the mock was called correctly. The assertion methods are specific enough to catch argument order mistakes and missing calls.

# assert_calls.py
from unittest.mock import Mock, call

mock_logger = Mock()

# Simulate some calls
mock_logger.info("Server started on port 8080")
mock_logger.warning("Disk usage at 85%")
mock_logger.info("Request received from 192.168.1.1")

# Basic call assertions
mock_logger.info.assert_called()                   # Was it called at all?
mock_logger.warning.assert_called_once()           # Called exactly once?
mock_logger.error.assert_not_called()              # Was it NOT called?

# Argument assertions
mock_logger.warning.assert_called_with("Disk usage at 85%")

# Assert all calls in order
mock_logger.info.assert_has_calls([
    call("Server started on port 8080"),
    call("Request received from 192.168.1.1"),
])

# Get full call history
print(mock_logger.info.call_args_list)
# [call('Server started on port 8080'), call('Request received from 192.168.1.1')]

print(mock_logger.call_count)   # Total calls across all methods
print(mock_logger.call_args_list)  # All calls in order

Output:

[call('Server started on port 8080'), call('Request received from 192.168.1.1')]
3
[call.info('Server started on port 8080'), call.warning('Disk usage at 85%'), call.info('Request received from 192.168.1.1')]

Use assert_called_once_with() (not assert_called_with()) when you need to verify both the arguments AND that the function was called exactly once. assert_called_with() only checks the most recent call’s arguments — if the mock was called 10 times, it passes as long as the last call matches.

Real-Life Example: Testing a Weather Service Client

This project tests a complete weather service client that fetches data from an external API and caches results — no network required.

# weather_service.py
import requests
from datetime import datetime

class WeatherService:
    """Fetches weather data from an external API."""
    
    BASE_URL = "https://api.open-meteo.com/v1/forecast"
    
    def __init__(self):
        self._cache = {}
    
    def get_temperature(self, latitude: float, longitude: float) -> float:
        """Returns current temperature in Celsius. Raises on HTTP error."""
        cache_key = (round(latitude, 2), round(longitude, 2))
        if cache_key in self._cache:
            return self._cache[cache_key]
        
        params = {
            "latitude": latitude,
            "longitude": longitude,
            "current_weather": True,
        }
        response = requests.get(self.BASE_URL, params=params)
        response.raise_for_status()
        
        data = response.json()
        temp = data["current_weather"]["temperature"]
        self._cache[cache_key] = temp
        return temp
    
    def is_warm(self, latitude: float, longitude: float) -> bool:
        """Returns True if temperature is above 20 C."""
        return self.get_temperature(latitude, longitude) > 20.0

# --- Tests ---
import unittest
from unittest.mock import patch, MagicMock

class TestWeatherService(unittest.TestCase):
    
    def setUp(self):
        self.service = WeatherService()
    
    @patch("weather_service.requests.get")
    def test_get_temperature_success(self, mock_get):
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "current_weather": {"temperature": 23.5, "windspeed": 12.0}
        }
        mock_response.raise_for_status.return_value = None
        mock_get.return_value = mock_response
        
        temp = self.service.get_temperature(48.85, 2.35)
        self.assertEqual(temp, 23.5)
    
    @patch("weather_service.requests.get")
    def test_caches_result(self, mock_get):
        mock_response = MagicMock()
        mock_response.json.return_value = {"current_weather": {"temperature": 18.0}}
        mock_response.raise_for_status.return_value = None
        mock_get.return_value = mock_response
        
        self.service.get_temperature(51.5, -0.12)
        self.service.get_temperature(51.5, -0.12)  # Second call should use cache
        
        mock_get.assert_called_once()  # API called only once
    
    @patch("weather_service.requests.get")
    def test_raises_on_http_error(self, mock_get):
        mock_response = MagicMock()
        mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("404")
        mock_get.return_value = mock_response
        
        with self.assertRaises(requests.exceptions.HTTPError):
            self.service.get_temperature(0, 0)
    
    @patch.object(WeatherService, "get_temperature", return_value=22.0)
    def test_is_warm_true(self, mock_temp):
        self.assertTrue(self.service.is_warm(48.85, 2.35))
    
    @patch.object(WeatherService, "get_temperature", return_value=15.0)
    def test_is_warm_false(self, mock_temp):
        self.assertFalse(self.service.is_warm(48.85, 2.35))

if __name__ == "__main__":
    unittest.main(verbose=2)

Output:

test_caches_result ... ok
test_get_temperature_success ... ok
test_is_warm_false ... ok
test_is_warm_true ... ok
test_raises_on_http_error ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.008s
OK

Note the use of @patch.object() in the last two tests — it patches a method directly on a class instance, which is cleaner than patching through the module path when the class is in the same file. This pattern is especially useful when testing methods that call other methods on the same object, letting you test is_warm() independently of get_temperature().

Mock the dependency. Test the unit. Forever.
Mock the dependency. Test the unit. Forever.

Frequently Asked Questions

Where exactly should I patch — the import source or the import location?

Always patch where the name is used, not where it is defined. If mymodule.py does import requests and uses requests.get(), patch "mymodule.requests.get", not "requests.get". This is the single most common mocking mistake. The rule is: patch the name in the namespace of the code you are testing, because Python looks up names in their own module’s namespace at runtime.

What is autospec and when should I use it?

Pass spec=SomeClass or autospec=True to patch() to create a mock that enforces the real object’s interface. If you call the mock with wrong arguments or access an attribute that does not exist on the real object, the mock raises AttributeError or TypeError immediately. This catches tests that pass because the mock accepts anything, even when the real code would fail. Use autospec=True for production code; skip it for quick exploratory tests.

Should I use unittest.mock or pytest-mock?

Both wrap the same underlying unittest.mock machinery. The pytest-mock package provides a mocker fixture that cleans up automatically and integrates more naturally with pytest’s fixture system. If your project uses pytest, pytest-mock is slightly more ergonomic: mocker.patch("module.thing") vs the @patch decorator. Either works — the choice is stylistic.

How do I mock datetime.now()?

You cannot patch datetime.datetime.now directly because datetime is a C extension. Instead, either use the freezegun library (@freeze_time("2024-01-01") — covered in the freezegun article) or wrap datetime.now() in your own function and mock that wrapper. For most projects, freezegun is the cleaner solution since it handles all time-related calls in your code automatically.

How do I reset a mock between tests?

Call mock.reset_mock() to clear call history, or simply create a new Mock() in each test’s setUp method. If you are using @patch as a decorator, the mock is automatically fresh for each test because the decorator creates a new mock on each test run. Only manually reset mocks when you are reusing the same mock object across multiple assertions within a single test.

Conclusion

Mocking transforms slow, flaky integration tests into fast, reliable unit tests. You have covered creating Mock and MagicMock objects, configuring return values and side effects, using patch() as both a decorator and context manager, asserting on call history, and applying all of it to a realistic weather service client with five passing tests in 8 milliseconds. The key rule to remember: always patch where the name is used, not where it is defined.

Extend the weather service tests by adding a test for the cache key rounding behaviour (try coordinates that differ only in the third decimal place), and add a test that verifies the HTTP parameters passed to requests.get() using assert_called_once_with(). These edge cases are exactly what mocking was designed to cover cheaply and reliably.

For the complete API reference, see the official unittest.mock documentation.

Why Mock Dependencies?

Tests should be fast, deterministic, and isolated. A test that hits a real database is slow; one that hits an external API is flaky; one that depends on the current time is non-deterministic. Mocks replace those dependencies with predictable stand-ins:

from unittest.mock import Mock

# Create a mock object
api_client = Mock()
api_client.fetch_user.return_value = {"id": 1, "name": "Alice"}

# Use it in your code — looks like the real thing
user = api_client.fetch_user(42)
print(user)   # {'id': 1, 'name': 'Alice'}

# Inspect what happened
print(api_client.fetch_user.called)            # True
print(api_client.fetch_user.call_args)         # call(42)
print(api_client.fetch_user.call_count)        # 1

Mock auto-creates attributes — mock.anything.you.access just works. The mock records every interaction so you can assert what happened.

patch() Decorator and Context Manager

from unittest.mock import patch

# Decorator form
@patch("myapp.module.send_email")
def test_signup(mock_send):
    mock_send.return_value = True
    result = signup_user("alice@example.com")
    mock_send.assert_called_once_with("alice@example.com", "Welcome")
    assert result == "signed up"

# Context manager form
def test_other():
    with patch("myapp.module.send_email") as mock_send:
        mock_send.return_value = True
        signup_user("alice@example.com")

patch() replaces an attribute during the test, restores it after. The argument is the dotted path to where the function is LOOKED UP, not where it’s defined — a common source of confusion.

Where to Patch: The Most Common Gotcha

# myapp/services.py
from myapp.email import send_email

def signup_user(email):
    send_email(email, "Welcome")

# CORRECT — patch where send_email is USED
@patch("myapp.services.send_email")
def test_signup(mock_send):
    signup_user("alice@example.com")
    mock_send.assert_called_once()

# WRONG — this patches the definition, but services.py has its own reference
@patch("myapp.email.send_email")
def test_signup(mock_send):
    signup_user("alice@example.com")
    mock_send.assert_not_called()    # surprise — wasn't called

The rule: patch the module where the function is REFERENCED, not where it’s defined.

Mocking Side Effects

from unittest.mock import Mock, patch

# Return different values for successive calls
mock = Mock()
mock.side_effect = [1, 2, 3]
print(mock())   # 1
print(mock())   # 2
print(mock())   # 3

# Raise an exception
mock = Mock()
mock.side_effect = ConnectionError("network down")
try:
    mock()
except ConnectionError as e:
    print(e)

# Side effect as a function
def custom_logic(value):
    return value * 2 if value > 0 else 0
mock = Mock(side_effect=custom_logic)
print(mock(5))    # 10
print(mock(-3))   # 0

MagicMock vs Mock

from unittest.mock import MagicMock

# MagicMock auto-implements magic methods (__len__, __iter__, __enter__, etc.)
m = MagicMock()
m.__len__.return_value = 3
print(len(m))    # 3

# Useful for mocking context managers
with MagicMock() as m:
    pass    # __enter__ and __exit__ work

# Iteration
m.__iter__.return_value = iter([1, 2, 3])
for x in m:
    print(x)

MagicMock is Mock + sensible defaults for dunder methods. Use it for objects you’ll use with with, for, len(), etc. The plain Mock doesn’t auto-support those.

Assertions on Mock Calls

mock = Mock()
mock(1, 2, x="hi")
mock(3, 4)

# Was it called at all?
mock.assert_called()

# Was it called exactly once?
mock.assert_called_once()

# Was it called with specific arguments?
mock.assert_called_with(3, 4)              # the LAST call
mock.assert_any_call(1, 2, x="hi")          # ANY of the calls

# Was it called exactly once with specific arguments?
mock.assert_called_once_with(3, 4)          # raises — was called twice

# Call counts and arg history
mock.call_count                              # 2
mock.call_args_list                          # [call(1, 2, x='hi'), call(3, 4)]

Common Pitfalls

  • Patching the wrong path. Patch where the function is USED, not where defined. Single most common mock mistake.
  • Forgetting to reset. Mock state persists across tests if you reuse module-level mocks. Use patch() so cleanup is automatic, or call mock.reset_mock().
  • Asserting on a non-call. Calling mock.foo auto-creates a child mock; mock.foo.assert_called() succeeds even if the test code never called foo. Use spec=Class to lock down the interface.
  • Mock not iterable. Plain Mock isn’t. Use MagicMock or set mock.__iter__.return_value = iter(...).
  • Side effect with return_value. If both are set, side_effect wins. Pick one.

FAQ

Q: unittest.mock or pytest-mock?
A: pytest-mock wraps unittest.mock with a fixture (mocker) that integrates cleanly with pytest. Same engine; cleaner ergonomics for pytest users.

Q: Mock vs MagicMock vs AsyncMock?
A: Mock for basic; MagicMock for dunder methods; AsyncMock (3.8+) for async callables that return coroutines.

Q: How do I mock an async function?
A: AsyncMock(return_value=...). The patched function becomes an awaitable returning your value.

Q: spec= argument — why use it?
A: Mock(spec=SomeClass) rejects attribute access not on SomeClass. Prevents typos that pass silently. Always recommended.

Q: How do I verify call order across mocks?
A: from unittest.mock import call; mock_a.assert_has_calls([call(1), call(2)]); for cross-mock order use a manager: mgr = Mock(); mgr.attach_mock(mock_a, 'a').

Wrapping Up

Mocking is essential for fast, deterministic, isolated tests. Master patch() with the right target path, Mock vs MagicMock vs AsyncMock, and the assertion family. Use spec= to lock interfaces and catch typos. For pytest users, pytest-mock‘s mocker fixture is the cleaner ergonomics. With these tools, your test suite stays fast and your mocks don’t lie.

How To Use Python Gradio for Machine Learning Demos

How To Use Python Gradio for Machine Learning Demos

Intermediate

You have trained a machine learning model. It runs beautifully in a Jupyter notebook, produces accurate predictions, and you are proud of it. But when your product manager asks “can I try it?” the answer is an awkward “let me email you the results.” Building a proper web interface used to mean learning Flask, writing HTML forms, handling AJAX requests, and deploying a server. Gradio eliminates all of that. You describe your function’s inputs and outputs, and Gradio generates the UI automatically.

Gradio is an open-source Python library that wraps any Python function in a web interface within minutes. You define what goes in (text, images, audio, dataframes) and what comes out, and Gradio creates the appropriate input and output components automatically. It also generates a shareable public URL with one extra parameter, so you can send a link to anyone in the world without touching server configuration.

This article covers the core Gradio concepts: creating basic interfaces, using the Blocks API for custom layouts, handling images and audio, building tabbed multi-model demos, and deploying to Hugging Face Spaces. By the end, you will have a fully functional image classifier interface and a text processing pipeline — both shareable via a public URL.

Gradio in Python: Quick Example

The simplest Gradio app wraps a single Python function. Here is a text sentiment classifier that runs in under 10 lines:

# quick_gradio.py
import gradio as gr

def analyze_text(text: str) -> str:
    words = text.lower().split()
    positives = {"good", "great", "excellent", "happy", "love", "amazing"}
    negatives = {"bad", "terrible", "awful", "hate", "horrible", "poor"}
    pos = sum(1 for w in words if w in positives)
    neg = sum(1 for w in words if w in negatives)
    if pos > neg:
        return f"Positive ({pos} positive words found)"
    elif neg > pos:
        return f"Negative ({neg} negative words found)"
    return "Neutral (no strong signals)"

demo = gr.Interface(fn=analyze_text, inputs="text", outputs="text",
                    title="Simple Sentiment Analyser")
demo.launch()

To run it:

pip install gradio
python quick_gradio.py

Output (terminal + browser):

Running on local URL: http://127.0.0.1:7860
A browser tab opens with a text input field, an "Analyze" button,
and a text output box. Type "This is great!" and click -- you get:
"Positive (1 positive words found)"

This is the core Gradio pattern: any Python function in, web UI out. The inputs="text" and outputs="text" shorthand auto-creates a text area and a text output box. The sections below cover more specific component types, the Blocks API for custom layouts, and real model integration.

What Is Gradio and How Does It Differ from Streamlit?

Gradio is designed specifically around the input-process-output pattern. You give it a function, describe what types flow in and out, and it renders a form-based UI. Streamlit takes a different approach — it renders a script line by line, making it better for dashboards with complex layouts and state. Gradio excels at model demos.

FeatureGradioStreamlit
Best use caseModel demos, input-output UIsDashboards, data exploration
UI modelFunction-centric (inputs -> outputs)Script-centric (top to bottom)
SharingBuilt-in share=True public URLRequires deployment
Hugging Face integrationFirst-class (Spaces)Supported but not native
Custom layoutsBlocks APIColumns, containers
State managementState componentsst.session_state

The share=True parameter is Gradio’s killer feature for demos — it creates a temporary public URL (valid for 72 hours) that tunnels through Gradio’s servers, so a model running on your laptop becomes accessible worldwide without any port forwarding or cloud deployment.

share=True enables a public URL for Gradio demo
share=True — your model demo now has a URL. You’re welcome.

Interface Components: Inputs and Outputs

Gradio has specialised component classes for different data types. Using the full component class (instead of the shorthand string) gives you fine-grained control over labels, placeholders, file types, and styling.

Text and Number Components

# text_number_demo.py
import gradio as gr

def process(name: str, age: int, bio: str) -> str:
    return f"Hello {name}, age {age}.\nBio: {bio[:100]}..."

demo = gr.Interface(
    fn=process,
    inputs=[
        gr.Textbox(label="Your Name", placeholder="Enter full name"),
        gr.Number(label="Age", value=25, minimum=1, maximum=120),
        gr.Textbox(label="Short Bio", lines=4, placeholder="Tell us about yourself..."),
    ],
    outputs=gr.Textbox(label="Result", lines=3),
    title="User Profile Demo",
    description="Fill in the form and click Submit.",
    examples=[["Alice", 30, "Python developer who loves data science."]],
)
demo.launch()

Output (browser):

Form with name field, number stepper, and multi-line bio area
A "Result" output box below
An "Examples" row at the bottom -- click any example to auto-fill the form

The examples parameter is particularly useful for demos: it pre-populates the form with sample inputs so visitors can see your model working immediately without having to type anything. Each item in the list corresponds to one set of inputs in the same order as the inputs list.

Image and Audio Components

Gradio shines when your function processes images or audio. The gr.Image() component accepts uploads, webcam captures, or URLs and delivers a NumPy array to your function automatically.

# image_demo.py
import gradio as gr
import numpy as np
from PIL import Image


def apply_filter(image: np.ndarray, filter_type: str) -> np.ndarray:
    """Apply a simple image filter and return the result."""
    img = Image.fromarray(image)
    
    if filter_type == "Grayscale":
        img = img.convert("L �convert("RGB")
    elif filter_type == "Flip Horizontal":
        img = img.transpose(Image.FLIP_LEFT_RIGHT)
    elif filter_type == "Rotate 90":
        img = img.rotate(90, expand=True)
    elif filter_type == "Invert":
        arr = np.array(img)
        img = Image.fromarray(255 - arr)
    
    return np.array(img)

demo = gr.Interface(
    fn=apply_filter,
    inputs=[
        gr.Image(label="Input Image", type="numpy"),
        gr.Dropdown(
            choices=["Grayscale", "Flip Horizontal", "Rotate 90", "Invert"],
            label="Filter",
            value="Grayscale"
        ),
    ],
    outputs=gr.Image(label="Result"),
    title="Image Filter Demo",
)
demo.launch()

Output (browser):

Image upload zone (drag-and-drop or click to browse)
Dropdown with 4 filter options
Result image panel that updates after you click Submit
Upload any JPEG or PNG to see the filter applied

The type="numpy" parameter tells Gradio to convert the uploaded image to a NumPy array before passing it to your function. The alternative is type="pil" for a PIL Image object or type="filepath" to get the saved path. Always specify the type explicitly — the default may change between Gradio versions.

The Blocks API: Custom Layouts

The gr.Interface() shortcut is great for single-function demos, but when you need multiple functions, tabs, or custom column layouts, use the gr.Blocks() API. Blocks gives you full control over the layout with a context manager syntax.

# blocks_demo.py
import gradio as gr

def uppercase(text: str) -> str:
    return text.upper()

def count_words(text: str) -> str:
    words = text.split()
    return f"Words: {len(words)}, Characters: {len(text)}, Sentences: {text.count('.')}"

def reverse_text(text: str) -> str:
    return text[::-1]

with gr.Blocks(title="Text Tools") as demo:
    gr.Markdown("# Text Processing Toolkit")
    gr.Markdown("Three text utilities in one app.")
    
    with gr.Tab("Uppercase"):
        inp1 = gr.Textbox(label="Input", placeholder="Type something...")
        out1 = gr.Textbox(label="Uppercase Output")
        btn1 = gr.Button("Convert")
        btn1.click(fn=uppercase, inputs=inp1, outputs=out1)
    
    with gr.Tab("Word Count"):
        inp2 = gr.Textbox(label="Input", lines=5)
        out2 = gr.Textbox(label="Stats")
        btn2 = gr.Button("Count")
        btn2.click(fn=count_words, inputs=inp2, outputs=out2)
    
    with gr.Tab("Reverse"):
        with gr.Row():
            inp3 = gr.Textbox(label="Original Text")
            out3 = gr.Textbox(label="Reversed Text")
        btn3 = gr.Button("Reverse")
        btn3.click(fn=reverse_text, inputs=inp3, outputs=out3)

demo.launch()

Output (browser):

App with three tabs: Uppercase, Word Count, Reverse
Each tab has its own input, output, and button
Reverse tab uses a side-by-side Row layout
Clicking the button triggers only that tab's function -- no full page re-run

In Blocks, the trigger is explicit: btn.click(fn=..., inputs=..., outputs=...). This event-driven model is more efficient than Streamlit’s full-script-rerun approach — only the specific function connected to the clicked button executes. This matters for slow model inference where you want button 1 to trigger model A and button 2 to trigger model B independently.

gr.Blocks layout with multiple UI panels in grid
gr.Blocks() — because sometimes one tab is not enough.

Integrating a Hugging Face Model

Gradio was designed to work seamlessly with the Hugging Face ecosystem. The transformers pipeline API produces a callable that Gradio can wrap directly. Here is a complete sentiment analysis demo using a real pre-trained model.

# hf_sentiment_demo.py
import gradio as gr
from transformers import pipeline

# Load once at startup -- not inside the function
classifier = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english"
)

def classify_sentiment(text: str) -> dict:
    """Returns label and confidence score."""
    if not text.strip():
        return {"label": "No input", "score": 0.0}
    result = classifier(text)[0]
    # Return as dict for Gradio Label component
    return {result["label"]: result["score"],
            "OPPOSITE": 1 - result["score"]}

demo = gr.Interface(
    fn=classify_sentiment,
    inputs=gr.Textbox(label="Enter a sentence", placeholder="This movie was fantastic!"),
    outputs=gr.Label(num_top_classes=2, label="Sentiment"),
    title="Sentiment Analysis",
    description="Powered by DistilBERT fine-tuned on SST-2.",
    examples=[
        ["I absolutely love this product, it works great!"],
        ["This was a waste of money, terrible quality."],
        ["The package arrived on time."],
    ],
    allow_flagging="never",
)
demo.launch(share=False)  # Set share=True for a public URL

Output (browser):

Text input with example sentences below
Label component showing a bar chart of POSITIVE vs NEGATIVE confidence
Clicking an example auto-fills the input
First run downloads the DistilBERT model (~260 MB) from Hugging Face Hub

Loading the model outside the function is critical for performance. If classifier = pipeline(...)) were inside classify_sentiment(), the 260 MB model would re-download and reload from disk on every single button click. Loading at module startup means it happens once and stays in memory for all subsequent requests.

Real-Life Example: Multi-Model Text Analysis Dashboard

This project combines three NLP tasks in a single Blocks app: language detection, readability scoring, and keyword extraction — all without requiring a GPU or paid API.

Three NLP task panels in one Blocks app
Three NLP tasks. One Blocks app. Zero frontend code.
# text_analysis_dashboard.py
import gradio as gr
import re
from collections import Counter

def detect_language_heuristic(text: str) -> str:
    """Simple heuristic language detection (no external library needed)."""
    common = {
        "en": {"the", "and", "is", "in", "it", "of", "to", "a"},
        "es": {"el", "la", "los", "en", "es", "de", "que", "un"},
        "fr": {"le", "la", "les", "un", "une", "est", "en", "de"},
        "de": {"der", "die", "das", "ist", "in", "und", "ein", "zu"},
    }
    words = set(text.lower().split())
    scores = {lang: len(words & vocab) for lang, vocab in common.items()}
    best = max(scores, key=scores.get)
    return f"Detected: {best.upper()} (score: {scores[best]})"

def readability_score(text: str) -> str:
    """Flesch Reading Ease approximation."""
    sentences = max(len(re.split(r'[.!?]+', text)), 1)
    words = text.split()
    word_count = max(len(words), 1)
    syllables = sum(max(len(re.findall(r'[aeiouAEIOU]', w)), 1) for w in words)
    score = 206.835 - 1.015 * (word_count / sentences) - 84.6 * (syllables / word_count)
    score = max(0, min(100, score))
    if score >= 70:
        level = "Easy (suitable for most readers)"
    elif score >= 50:
        level = "Moderate (some education required)"
    else:
        level = "Difficult (academic or technical audience)"
    return f"Flesch Score: {score:.1f}/100 -- {level}"

def extract_keywords(text: str, top_n: int = 10) -> str:
    """Extract top keywords by frequency (excluding stopwords)."""
    stopwords = {"the","a","an","is","in","it","of","to","and","or","for",
                 "was","be","with","as","at","by","from","on","are","this"}
    words = re.findall(r'\b[a-zA-Z]{4,}\b', text.lower())
    filtered = [w for w in words if w not in stopwords]
    counts = Counter(filtered).most_common(top_n)
    return "\n".join(f"{word}: {count}" for word, count in counts) or "No keywords found"

with gr.Blocks(title="Text Analysis Dashboard", theme=gr.themes.Soft()) as demo:
    gr.Markdown("## Text Analysis Dashboard")
    gr.Markdown("Paste any text to analyse its language, readability, and keywords.")
    
    with gr.Row():
        text_input = gr.Textbox(
            label="Input Text",
            lines=8,
            placeholder="Paste any text here...",
            scale=2
        )
        with gr.Column(scale=1):
            lang_output = gr.Textbox(label="Language")
            read_output = gr.Textbox(label="Readability")
    
    keywords_output = gr.Textbox(label="Top Keywords", lines=6)
    analyse_btn = gr.Button("Analyse", variant="primary")
    
    def analyse_all(text):
        return detect_language_heuristic(text), readability_score(text), extract_keywords(text)
    
    analyse_btn.click(
        fn=analyse_all,
        inputs=text_input,
        outputs=[lang_output, read_output, keywords_output]
    )
    
    gr.Examples(
        examples=["Python is a high-level programming language known for its clear syntax."],
        inputs=text_input
    )

demo.launch()

Output (browser):

Large text input on the left, two output boxes stacked on the right
Full-width keywords box below
Primary blue "Analyse" button
Example text pre-loaded at the bottom

Paste "Python is a high-level programming language..." and click:
Language: Detected: EN (score: 3)
Readability: Flesch Score: 54.2/100 -- Moderate
Keywords: language: 2 / python: 1 / high: 1 / level: 1

The key pattern here is a single function analyse_all() that calls the three individual functions and returns a tuple. Gradio maps each element of the return tuple to the corresponding output component in the outputs list. This avoids three separate button clicks — one analysis run populates all three panels simultaneously.

Frequently Asked Questions

How do I share a Gradio app publicly?

Add share=True to demo.launch(share=True). Gradio creates a temporary tunnel through its servers and prints a URL like https://abc123.gradio.live. This URL is valid for 72 hours and accessible from any device with internet access. For a permanent public URL, deploy to Hugging Face Spaces — create a new Space, select “Gradio” as the SDK, upload your script and requirements.txt, and the Space handles hosting for free.

Can I add authentication to a Gradio app?

Yes. Pass auth=("username", "password") to demo.launch() to enable HTTP basic auth, or pass a list of tuples for multiple users: auth=[("alice", "pass1"), ("bob", "pass2")]. For OAuth or token-based auth, deploy to Hugging Face Spaces, which supports organisation-level access restrictions through the Spaces settings panel without any code changes.

Can Gradio handle asynchronous functions?

Yes — Gradio supports async def functions natively. If your function calls an async API (like aiohttp or an async database client), define it as async def my_fn(text): ... and Gradio runs it in its event loop automatically. This is useful for long-running model inference where you want the UI to remain responsive while waiting for the result.

How do I handle file downloads in Gradio?

Return a file path string from your function and use gr.File() as the output component. Gradio serves the file and gives the user a download link. For example, if your function generates a PDF report and saves it to a temp file, return the temp file path and the output component handles the rest. Use Python’s tempfile module to create the temp path: with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f: ....

My model is slow — how do I handle concurrent requests?

Call demo.queue() before demo.launch(). The queue serialises requests, shows users their position in the queue, and prevents timeouts on long-running functions. Without queueing, a slow inference function blocks all other requests. For GPU models, consider max_batch_size on demo.queue() to batch concurrent requests together for efficiency.

Conclusion

Gradio transforms the “it works in my notebook” problem into “here is a link to try it yourself” in minutes. You have covered the full workflow: wrapping simple functions with gr.Interface(), building custom multi-tab layouts with gr.Blocks(), handling images and audio components, integrating Hugging Face models, and deploying with share=True. The text analysis dashboard demonstrates how to combine multiple functions into a single clean interface.

The next logical extensions are deploying the dashboard to Hugging Face Spaces for permanent hosting, connecting a real Hugging Face model for language detection, and adding a gr.State() component to keep a history of previous analyses in the session. All three require only small additions to the code you have already written.

Explore the full component library and layout options in the official Gradio documentation.

How To Use Python Streamlit for Interactive Data Apps

How To Use Python Streamlit for Interactive Data Apps

Intermediate

You’ve built a useful Python script — maybe a data cleaner, a forecasting model, or an internal report. Now your team wants to use it without typing a single command. Traditionally that means picking up Flask or Django, learning a templating language, writing HTML and CSS, and deploying a real web app. Streamlit is the antidote: a single Python file becomes a polished web UI with widgets, charts, and state — no frontend, no templates, no JavaScript.

This guide covers the practical parts of Streamlit you actually need: laying out widgets, handling user input, displaying dataframes and charts, controlling reactivity with session state and caching, and shipping the app somewhere your team can reach it. By the end you’ll know how to turn any analysis script into an interactive tool in a couple of hundred lines.

Streamlit Quick Example

Here’s a working Streamlit app in 15 lines. Install with pip install streamlit, save the file, and run with streamlit run app.py:

# File: app.py
import streamlit as st
import pandas as pd

st.title("Sales Explorer")

uploaded = st.file_uploader("Upload a CSV", type="csv")
if uploaded:
    df = pd.read_csv(uploaded)
    region = st.selectbox("Filter by region", ["All"] + sorted(df["region"].unique()))
    if region != "All":
        df = df[df["region"] == region]
    st.metric("Total sales", f"${df['amount'].sum():,.0f}")
    st.dataframe(df)
    st.bar_chart(df.groupby("product")["amount"].sum())

This single file gives you: a title, file upload, a dropdown filter, a metric tile, a sortable dataframe view, and a bar chart — all auto-styled and responsive. Every interaction (uploading a file, picking a region) reruns the script top-to-bottom with the new inputs. That rerun model is the core mental model of Streamlit.

What Is Streamlit and Why Use It?

Streamlit turns any Python script into a web app by intercepting calls like st.title(), st.button(), and st.dataframe() and rendering them as HTML on a tiny built-in server. The framework runs your script from top to bottom every time a widget value changes, so you write code as if it were a synchronous batch job — no callbacks, no event handlers, no state machines.

It’s a fit when you want to:

  • Wrap a notebook into a shareable tool that non-developers can use.
  • Build an internal dashboard backed by SQL queries or model inference.
  • Demo a machine-learning model with a UI for tweaking inputs.
  • Replace an Excel spreadsheet that your team passes around via email.

It’s not a fit when you need:

  • Multi-user state (auth, per-user data, real-time collaboration).
  • SEO-friendly public pages (Streamlit apps are client-rendered and don’t index well).
  • Fine-grained UI control (custom CSS animations, complex form validation, drag-and-drop).

For internal tools and quick analytical UIs, Streamlit is unmatched in time-to-prototype. For consumer-facing apps, reach for FastAPI + a real frontend.

Streamlit: full UI from a single .py file. Magic, basically.
Streamlit: full UI from a single .py file. Magic, basically.

Core Widgets: Inputs, Buttons, Sliders

Every widget returns its current value. You wire up logic by branching on those values — no event handlers required:

# File: widgets_demo.py
import streamlit as st

st.header("Widget Sampler")

# Text inputs
name = st.text_input("Your name", value="Pubs")
notes = st.text_area("Notes", height=120)

# Numeric input + slider
age = st.number_input("Age", min_value=0, max_value=120, value=30)
budget = st.slider("Budget", 0, 10000, value=2500, step=100)

# Selection
plan = st.selectbox("Plan", ["Free", "Pro", "Team", "Enterprise"])
features = st.multiselect("Features", ["API", "Webhooks", "SSO", "SLA"])

# Boolean inputs
agree = st.checkbox("I agree to the terms")
priority = st.radio("Priority", ["Low", "Medium", "High"], horizontal=True)

# Date + time
event_date = st.date_input("Event date")

# Action buttons
if st.button("Submit"):
    if not agree:
        st.error("You must agree to the terms")
    else:
        st.success(f"Submitted: {name}, {plan}, ${budget}, {features}")

Notice how the entire form is just a sequence of variable assignments and an if st.button(): block. There’s no onClick, no form submission handler — Streamlit handles all of that. Each widget needs a unique label (used as both the visible label and the internal identifier).

Layout: Columns, Tabs, and Sidebars

The default layout is a single vertical stream of content. For anything more interesting, use columns, tabs, and the sidebar:

# File: layout_demo.py
import streamlit as st

st.title("Inventory Dashboard")

# Sidebar — controls
with st.sidebar:
    st.header("Filters")
    category = st.selectbox("Category", ["Beverages", "Snacks", "Frozen"])
    show_low_stock = st.checkbox("Only low stock")

# Main area — two columns of metrics
col1, col2, col3 = st.columns(3)
col1.metric("Total SKUs", 1248, "+12 vs last week")
col2.metric("Out of Stock", 47, "-3", delta_color="inverse")
col3.metric("Avg Margin", "23.4%", "+1.2pp")

# Tabbed views below
tab1, tab2, tab3 = st.tabs(["Sales", "Inventory", "Suppliers"])

with tab1:
    st.subheader("Sales Trend")
    st.line_chart({"week_1": 4200, "week_2": 4800, "week_3": 5100, "week_4": 4900})

with tab2:
    st.subheader("Stock Levels")
    st.write("...")

with tab3:
    st.subheader("Supplier Performance")
    st.write("...")

The with blocks scope subsequent calls to that container — anything inside with tab1: shows up only on the Sales tab. st.columns(3) returns three column objects; calling col1.metric(...) places the metric in that column. The sidebar is global — anything inside with st.sidebar: appears in the persistent left panel regardless of which main-area tab is active.

Every widget is a reactive Python variable.
Every widget is a reactive Python variable.

Displaying DataFrames, Tables, and Charts

Streamlit recognizes pandas DataFrames and renders them as sortable, scrollable, search-friendly grids. Same with charts — pass a DataFrame to st.line_chart, st.bar_chart, or st.area_chart and you get an interactive plot with zero configuration:

# File: dataframes_demo.py
import streamlit as st
import pandas as pd
import numpy as np

# Generate sample data
np.random.seed(42)
df = pd.DataFrame({
    "date": pd.date_range("2026-01-01", periods=90),
    "revenue": np.random.normal(5000, 800, 90).cumsum(),
    "users": np.random.poisson(120, 90),
    "region": np.random.choice(["NA", "EU", "APAC"], 90),
})

st.header("Q1 Performance")

# Interactive dataframe — sortable, searchable, column-resizable
st.dataframe(df, use_container_width=True, hide_index=True)

# Static table — no interactivity but renders faster
st.subheader("Top 5 Days")
st.table(df.nlargest(5, "revenue")[["date", "revenue", "users"]])

# Charts auto-detect numeric columns
st.subheader("Revenue Over Time")
st.line_chart(df.set_index("date")["revenue"])

st.subheader("Users by Region")
st.bar_chart(df.groupby("region")["users"].sum())

For richer plots, drop in matplotlib, plotly, or altair — Streamlit accepts all three via st.pyplot(), st.plotly_chart(), and st.altair_chart(). Plotly and altair are interactive (zoom, pan, hover tooltips) and integrate seamlessly with the Streamlit layout system.

Session State: Persisting Data Across Reruns

Streamlit reruns your entire script on every interaction. That’s clean for stateless UIs but tricky for anything that needs to persist — a shopping cart, an undo history, an authentication token. The solution is st.session_state, a dict-like object that survives between reruns:

# File: counter.py
import streamlit as st

# Initialize state (only runs once per session)
if "count" not in st.session_state:
    st.session_state.count = 0
    st.session_state.history = []

st.title("Counter with History")

col1, col2, col3 = st.columns(3)
if col1.button("➖ Decrement"):
    st.session_state.count -= 1
    st.session_state.history.append(("dec", st.session_state.count))
if col2.button("➕ Increment"):
    st.session_state.count += 1
    st.session_state.history.append(("inc", st.session_state.count))
if col3.button("Reset"):
    st.session_state.count = 0
    st.session_state.history.append(("reset", 0))

st.metric("Current Value", st.session_state.count)
st.write("Recent actions:")
for action, value in reversed(st.session_state.history[-10:]):
    st.text(f"  {action} -> {value}")

The if "count" not in st.session_state: block is the standard idiom for initialization — it runs once when the session starts, never again. After that, treat st.session_state like a regular dict that magically persists.

@st.cache_data: rerun the script, skip the slow parts.
@st.cache_data: rerun the script, skip the slow parts.

Caching: Fast Reruns with @st.cache_data

Because the entire script reruns on every interaction, expensive operations (file loads, SQL queries, ML inference) would re-execute repeatedly without caching. Streamlit’s @st.cache_data decorator memoizes function results based on arguments:

# File: caching_demo.py
import streamlit as st
import pandas as pd
import time

@st.cache_data
def load_sales_data(year: int) -> pd.DataFrame:
    \"\"\"Slow data load — only runs when year changes.\"\"\"
    time.sleep(2)  # simulating slow database query
    return pd.read_parquet(f"sales_{year}.parquet")

@st.cache_data(ttl=300)  # invalidate after 5 minutes
def fetch_live_prices(symbol: str) -> dict:
    \"\"\"Refetch prices at most every 5 minutes.\"\"\"
    import requests
    r = requests.get(f"https://api.example.com/quote/{symbol}", timeout=5)
    return r.json()

st.title("Sales Dashboard")
year = st.selectbox("Year", [2024, 2025, 2026])
df = load_sales_data(year)              # 2s the first time, instant after
prices = fetch_live_prices("AAPL")      # cached for 5 minutes
st.dataframe(df)
st.metric("AAPL Price", f"${prices['price']:.2f}")

Use @st.cache_data for serializable return values (DataFrames, dicts, primitives). Use @st.cache_resource for non-serializable resources like database connections or loaded ML models — those should be shared across all sessions instead of cached per-input.

Forms: Batch Multiple Inputs Before Rerunning

By default, every widget change triggers a script rerun. That’s wasteful when you have a form with 8 fields — you don’t want to rerun 8 times as the user fills it in. Wrap the inputs in st.form() and Streamlit batches them until the submit button is clicked:

# File: forms_demo.py
import streamlit as st

st.title("New Order")

with st.form("order_form"):
    customer = st.text_input("Customer name")
    email = st.text_input("Email")
    product = st.selectbox("Product", ["Widget", "Gadget", "Gizmo"])
    quantity = st.number_input("Quantity", min_value=1, value=1)
    notes = st.text_area("Order notes")
    submitted = st.form_submit_button("Submit Order")

    if submitted:
        if not customer or not email:
            st.error("Customer name and email are required")
        else:
            st.success(f"Order placed: {quantity} × {product} for {customer}")

Inside the form, widget values DON’T trigger reruns when changed. Only the form_submit_button click does. This is the right pattern for any input set where partial submissions would be confusing or expensive.

Deploying a Streamlit App

Three common deployment paths:

  • Streamlit Community Cloud (free for public repos). Push your code to GitHub, link the repo at share.streamlit.io, and you get a public URL in minutes. Best for demos and small internal tools.
  • Self-hosted on a small VPS. Run streamlit run app.py --server.port 8501 --server.address 0.0.0.0 behind nginx or Caddy with TLS. Add a basic auth header if it’s an internal tool. ~$5/month.
  • Containerize and deploy on Cloud Run / Fly.io / Render. Write a tiny Dockerfile (FROM python:3.12-slim + pip install + CMD streamlit run app.py) and deploy as a single-container service. Scales automatically.

For multi-user authentication, put a reverse proxy in front (Cloudflare Access, Pomerium, oauth2-proxy). Streamlit itself has no built-in auth.

Common Pitfalls

  • Forgetting that the script reruns top-to-bottom. Any code outside a function executes on every interaction. If you have a heavy import or model load at module level, it’s re-checked every time — use @st.cache_resource to load once.
  • Mutating cached return values. @st.cache_data returns a reference, not a copy. If you modify the returned DataFrame, you’ve corrupted the cache. Make a copy: df = load_sales_data(year).copy().
  • Using widget keys for state. Each widget has an auto-generated key based on its label. If you have two st.text_input("name") calls, Streamlit raises a DuplicateWidgetID error. Add an explicit key= argument or change the labels.
  • Logging into st.write. st.write is for user-facing output. Use Python’s logging module (with a real handler) for application logs — they appear in the terminal where you ran streamlit run.
  • Long-running computations blocking the UI. Streamlit is single-threaded per session. A 30-second computation blocks all widgets until it finishes. Use st.spinner() for a loading indicator and consider st.write_stream() for chunked output.

FAQ

Q: Is Streamlit production-ready?
A: For internal tools and dashboards, absolutely. Plenty of companies run Streamlit apps in production for analytics, ops dashboards, and ML model demos. For public-facing apps with thousands of concurrent users, you’d want to put it behind a reverse proxy with rate-limiting, and benchmark carefully — each session ties up a Python thread.

Q: Streamlit vs Dash vs Gradio?
A: Streamlit has the shortest learning curve and prettiest defaults. Dash gives you fine-grained control via a callback system (steeper learning curve, more like writing React). Gradio is purpose-built for ML model demos — auto-generates an interface from a function signature. Pick Streamlit for data dashboards, Gradio for ML demos, Dash when you need exact control.

Q: Can I customize the look beyond the default theme?
A: Yes. Set a .streamlit/config.toml with custom theme colors, fonts, and dark mode. For deeper changes, inject CSS via st.markdown(custom_css, unsafe_allow_html=True) — but this is unsupported and breaks across Streamlit upgrades.

Q: How do I handle file uploads bigger than the default limit?
A: Increase server.maxUploadSize in .streamlit/config.toml (default is 200 MB). For genuinely large files (multi-GB), use a presigned URL flow — upload directly to S3 from the browser and pass the S3 key to Streamlit.

Q: Does Streamlit support WebSockets / real-time updates?
A: Limited — Streamlit uses WebSockets internally, but you don’t get push-from-server primitives. For live data, use st.experimental_rerun() on a timer, or streamlit-autorefresh for declarative polling. For true real-time streams, consider a separate WebSocket server alongside Streamlit.

Wrapping Up

Streamlit’s superpower is collapsing the front-end stack into a single Python file. You write a script, you get a web app. For the 80% of internal tools and analytics dashboards where that’s all you need, it’s hard to beat. The 20% where you need fine-grained UI control or true multi-user state — that’s where Dash, FastAPI + a real frontend, or a full SaaS framework start to win.

The official Streamlit documentation has the complete API reference and a gallery of community apps you can study. For tutorials on related Python topics, see below.

How To Use Python trio for Structured Concurrency

How To Use Python trio for Structured Concurrency

Advanced

Python’s asyncio is powerful but has a well-known problem: it is easy to accidentally “fire and forget” tasks that run in the background with no guarantee they will be waited for, no guarantee their errors will be caught, and no easy way to cancel them when something goes wrong. Tasks can leak, exceptions can silently disappear, and cancellation can leave your program in an inconsistent state. These are not just theoretical problems — they cause real bugs in production async code.

trio is an alternative async library built around a concept called structured concurrency. In trio, all concurrent tasks are managed through a nursery — a scope that guarantees all tasks it spawns will be finished before the nursery exits. No task can outlive the nursery that created it. Errors always propagate to the right place. Cancellation is clean and predictable. The result is async code that is much easier to reason about and debug.

This article covers how to install and run trio programs, how to use nurseries for concurrent tasks, how to handle errors and cancellation, how trio’s memory channels replace asyncio queues, and how trio compares to asyncio. By the end you will understand structured concurrency and be able to write trio programs that handle concurrency correctly from day one.

Concurrent Tasks with trio: Quick Example

Here is the simplest trio program that runs two tasks concurrently using a nursery:

# quick_trio.py
import trio

async def task_a():
    print("Task A: starting")
    await trio.sleep(1)
    print("Task A: done after 1 second")

async def task_b():
    print("Task B: starting")
    await trio.sleep(0.5)
    print("Task B: done after 0.5 seconds")

async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(task_a)
        nursery.start_soon(task_b)
    print("Both tasks complete!")

trio.run(main)

Output:

Task A: starting
Task B: starting
Task B: done after 0.5 seconds
Task A: done after 1 second
Both tasks complete!

trio.open_nursery() creates a scope where both tasks run concurrently. The async with block does not exit until both tasks are done. This is the core guarantee of structured concurrency: the nursery always waits for its children. The sections below go deeper into error handling, cancellation, channels, and real-world patterns.

What Is trio and Why Use It?

trio is a Python async library designed around the principle that concurrent code should be structured the same way sequential code is: with clear entry and exit points, predictable control flow, and reliable error propagation. It was created as a response to the implicit complexity in asyncio task management.

Featureasynciotrio
Concurrent tasksasyncio.create_task()nursery.start_soon()
Task lifetimeCan outlive their creatorAlways bounded by nursery
Error propagationMay be silently droppedAlways propagated to nursery
CancellationComplex, error-proneClean, scope-based
CommunicationQueue, Event, ConditionMemory channels (send/receive)
Timeoutasyncio.wait_for()trio.move_on_after(), trio.fail_after()

Install with pip:

# pip install trio

import trio
print(trio.__version__)
0.25.0
Sudo Sam drawing structured concurrency nursery diagram
Structured concurrency: tasks live in nurseries, not in the void.

Understanding Nurseries

A nursery is trio’s central concept. It is a context manager that owns a group of concurrent tasks. When you enter the nursery block, you can spawn tasks. When the block exits (the body of the async with finishes), trio waits for all spawned tasks to finish before continuing. If any task raises an exception, the nursery cancels all remaining tasks and re-raises the exception.

# trio_nurseries.py
import trio

async def fetch_data(url, delay):
    """Simulate fetching data from a URL."""
    print(f"Fetching {url}...")
    await trio.sleep(delay)  # simulate network delay
    print(f"Done: {url}")
    return f"data from {url}"

async def main():
    results = []

    async with trio.open_nursery() as nursery:
        # Spawn three concurrent "fetches"
        nursery.start_soon(fetch_data, "https://api.example.com/users", 1.0)
        nursery.start_soon(fetch_data, "https://api.example.com/posts", 0.5)
        nursery.start_soon(fetch_data, "https://api.example.com/comments", 0.8)

    # Code here runs AFTER all three tasks finish
    print("All fetches complete. Continuing with results.")

trio.run(main)

Output:

Fetching https://api.example.com/users...
Fetching https://api.example.com/posts...
Fetching https://api.example.com/comments...
Done: https://api.example.com/posts
Done: https://api.example.com/comments
Done: https://api.example.com/users
All fetches complete. Continuing with results.

All three fetches start immediately and run concurrently. They complete in order of their delay, not the order they were started. The “All fetches complete” line only prints after the slowest task (users, 1.0s) finishes. This guarantee — that the nursery always waits for all children — is what makes trio programs safe to reason about.

Error Handling in Nurseries

In asyncio, an exception in a background task can be silently lost if you do not explicitly await the task and check for errors. In trio, any exception in a child task immediately cancels all sibling tasks and propagates to the nursery scope. You cannot accidentally swallow errors.

# trio_errors.py
import trio

async def good_task():
    print("Good task: running")
    await trio.sleep(2)
    print("Good task: done")

async def failing_task():
    print("Failing task: about to fail")
    await trio.sleep(0.5)
    raise ValueError("Something went wrong in the task!")

async def main():
    try:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(good_task)
            nursery.start_soon(failing_task)
    except* ValueError as eg:
        print(f"Caught error group: {eg.exceptions}")

trio.run(main)

Output:

Good task: running
Failing task: about to fail
Failing task: about to fail -- ValueError raised
Good task cancelled (sibling failed)
Caught error group: [ValueError('Something went wrong in the task!')]

When failing_task raises ValueError, trio immediately cancels good_task and collects all exceptions into an ExceptionGroup. The except* syntax (Python 3.11+) handles exception groups. For older Python, use except trio.MultiError. The key insight is that no exception disappears silently — trio ensures every error is seen and handled.

Debug Dee catching falling exception objects from multiple tasks
ExceptionGroup: when two tasks fail at once, you get both errors. Novel concept.

Timeouts and Cancellation Scopes

Cancellation in trio is handled through cancellation scopes. Every nursery is itself a cancellation scope. You can also create explicit scopes with trio.move_on_after() (continue after timeout) or trio.fail_after() (raise exception after timeout).

# trio_cancellation.py
import trio

async def slow_operation():
    print("Starting slow operation...")
    await trio.sleep(10)  # Would take 10 seconds
    print("This line will never print if cancelled")

async def main():
    # move_on_after: cancel the block after N seconds, then continue
    print("-- move_on_after example --")
    with trio.move_on_after(2) as cancel_scope:
        await slow_operation()

    if cancel_scope.cancelled_caught:
        print("Operation timed out -- continuing with partial result")

    # fail_after: cancel and raise TooSlowError after N seconds
    print("\n-- fail_after example --")
    try:
        with trio.fail_after(1):
            await slow_operation()
    except trio.TooSlowError:
        print("Operation failed: took too long")

trio.run(main)

Output:

-- move_on_after example --
Starting slow operation...
Operation timed out -- continuing with partial result

-- fail_after example --
Starting slow operation...
Operation failed: took too long

Use move_on_after when a timeout is acceptable — for example, fetching optional metadata that you will skip if it is slow. Use fail_after when the operation is required and a timeout means something is wrong. The cancel_scope.cancelled_caught attribute tells you whether the timeout actually fired, so you can distinguish a normal exit from a cancelled exit.

Memory Channels for Task Communication

Tasks in a nursery often need to pass data to each other. trio provides memory channels as the safe, built-in way to do this. A channel has a send end and a receive end. Sending is async if the channel is full; receiving is async if the channel is empty. This ensures proper backpressure.

# trio_channels.py
import trio

async def producer(send_channel, items):
    """Produce items and send them through the channel."""
    async with send_channel:
        for item in items:
            print(f"Producing: {item}")
            await send_channel.send(item)
            await trio.sleep(0.1)  # simulate work

async def consumer(receive_channel, name):
    """Receive and process items from the channel."""
    async with receive_channel:
        async for item in receive_channel:
            print(f"Consumer {name} processing: {item}")
            await trio.sleep(0.2)  # simulate processing

async def main():
    send_channel, receive_channel = trio.open_memory_channel(max_buffer_size=5)

    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel, range(6))
        nursery.start_soon(consumer, receive_channel, "A")

    print("All items processed!")

trio.run(main)

Output:

Producing: 0
Consumer A processing: 0
Producing: 1
Producing: 2
Consumer A processing: 1
Producing: 3
Producing: 4
Producing: 5
Consumer A processing: 2
Consumer A processing: 3
Consumer A processing: 4
Consumer A processing: 5
All items processed!

The async with send_channel and async with receive_channel context managers ensure the channel is properly closed when the task finishes. When the send end is closed, the receiver’s async for loop exits cleanly. Use trio.open_memory_channel(0) for a rendezvous channel (send blocks until a receiver is ready) or a positive integer for a buffered channel.

Cache Katie managing producer-consumer conveyor belt
open_memory_channel(5): a buffer of five, because ‘infinite queue’ is just a memory leak in disguise.

Real-Life Example: Concurrent URL Health Checker

# trio_url_checker.py
import trio
import urllib.request
import urllib.error
from dataclasses import dataclass
from typing import List

@dataclass
class HealthResult:
    url: str
    status: int = 0
    ok: bool = False
    error: str = ""
    latency_ms: float = 0.0

async def check_url(url: str, results: list, timeout: float = 5.0):
    """Check a single URL and record the result."""
    start = trio.current_time()
    try:
        # trio doesn't have built-in HTTP, use thread for blocking call
        response_code = await trio.to_thread.run_sync(
            lambda: urllib.request.urlopen(url, timeout=timeout).getcode()
        )
        latency = (trio.current_time() - start) * 1000
        results.append(HealthResult(
            url=url, status=response_code,
            ok=(200 <= response_code < 300), latency_ms=latency
        ))
    except urllib.error.HTTPError as e:
        results.append(HealthResult(url=url, status=e.code, ok=False, error=str(e)))
    except Exception as e:
        results.append(HealthResult(url=url, ok=False, error=str(e)))

async def check_all(urls: List[str], concurrency: int = 5) -> List[HealthResult]:
    """Check all URLs concurrently with a limit on parallel requests."""
    results = []
    limiter = trio.CapacityLimiter(concurrency)

    async def bounded_check(url):
        async with limiter:
            await check_url(url, results)

    async with trio.open_nursery() as nursery:
        for url in urls:
            nursery.start_soon(bounded_check, url)

    return sorted(results, key=lambda r: r.url)

async def main():
    urls = [
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/delay/1",
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/users/1",
    ]

    print(f"Checking {len(urls)} URLs...\n")
    with trio.fail_after(15):
        results = await check_all(urls, concurrency=3)

    print(f"{'URL':<50} {'Status':>8} {'OK':>5} {'Latency':>10}")
    print("-" * 78)
    for r in results:
        status = r.status if r.status else "ERR"
        latency = f"{r.latency_ms:.0f}ms" if r.ok else r.error[:15]
        print(f"{r.url:<50} {str(status):>8} {'Yes' if r.ok else 'No':>5} {latency:>10}")

trio.run(main)

Output:

Checking 5 URLs...

URL                                                Status    OK    Latency
------------------------------------------------------------------------------
https://httpbin.org/delay/1                           200   Yes      1043ms
https://httpbin.org/status/200                        200   Yes        89ms
https://httpbin.org/status/404                        404    No  HTTP Error
https://jsonplaceholder.typicode.com/posts/1          200   Yes       134ms
https://jsonplaceholder.typicode.com/users/1          200   Yes       128ms

This checker runs all URL checks concurrently, limited to 3 at a time by trio.CapacityLimiter. The entire batch fails with TooSlowError if it takes more than 15 seconds. The trio.to_thread.run_sync() call offloads the blocking HTTP call to a thread without blocking the trio event loop. You could extend this to send Slack alerts, write results to a database, or retry failed URLs with backoff.

Frequently Asked Questions

Should I use trio or asyncio?

For new projects where you want the cleanest possible async code and do not need compatibility with existing asyncio libraries, trio is excellent. For projects that use FastAPI, aiohttp, or other asyncio-based frameworks, stick with asyncio — trio is not compatible with the asyncio event loop. The anyio library provides an abstraction that works on both trio and asyncio backends if you need portability.

How do I make HTTP requests in trio?

trio does not include an HTTP client. Use httpx with the trio backend: install httpx[trio] and use httpx.AsyncClient() inside your trio program. For simple cases, trio.to_thread.run_sync() offloads any blocking HTTP call to a thread without blocking the event loop, as shown in the real-life example above.

How do I return values from nursery tasks?

trio tasks cannot directly return values to the nursery (unlike asyncio’s gather() which collects return values). The idiomatic approach is to pass a shared list or use a memory channel. Tasks append results to a shared list (as in the URL checker example), and the caller reads from the list after the nursery exits. Alternatively, use a send channel inside tasks and a receive loop outside the nursery.

What does “cancel-safe” mean and why does it matter?

A function is cancel-safe if it behaves correctly even when cancelled mid-execution. trio can cancel any awaitable at any await point. If your code holds a lock, writes to a file, or modifies shared state across multiple awaits, cancellation mid-way can leave things in an inconsistent state. trio’s built-in primitives (channels, locks, events) are cancel-safe by design. When writing your own code, avoid long operations across multiple awaits without proper cleanup using try/finally or code>shield().

Can I use trio with regular threading?

Yles. trio.to_thread.run_sync(func) runs a blocking function in a thread pool without blocking the event loop. trio.from_thread.run_sync(async_func) calls async trio functions from a thread. These bridge the sync/async boundary cleanly and are the recommended way to use blocking libraries (like database drivers or legacy HTTP clients) inside trio programs.

Conclusion

The trio library brings structured concurrency to Python async programming. You learned how nurseries guarantee task lifetime and error propagation, how cancellation scopes handle timeouts cleanly, how memory channels enable safe producer-consumer patterns, and how CapacityLimiter controls concurrency. The URL health checker showed all these concepts working together in a realistic scenario.

The structured concurrency model takes some getting used to, but the payoff is async code that behaves predictably even in error and cancellation scenarios. The next step is to convert one small asyncio program to trio and observe how the error handling and task lifetime guarantees change your debugging experience. The trio documentation is exceptionally detailed and includes explanations of the design decisions behind each API choice.

How To Use Python textdistance for String Similarity

How To Use Python textdistance for String Similarity

Intermediate

Comparing strings for similarity is one of those problems that sounds simple until you actually need to do it in production. Are “colour” and “color” the same? How similar are “python” and “pyhon” after a typo? Should “McDonald’s” match “McDonalds” in a search? The naive approach — checking for exact equality — handles none of these cases. You need distance metrics: mathematical measures of how different two strings are.

The textdistance library gives you over 30 string distance and similarity algorithms in one package with a consistent API. Levenshtein, Jaro-Winkler, Hamming, Jaccard, Cosine similarity, and many more are all accessible with the same interface. You can compare the results of multiple algorithms side-by-side, choose the right one for your use case, and even use hardware-accelerated backends for performance.

This article covers how to install and use textdistance, the most important algorithms and when to use each, how to normalize scores to 0-1 similarity, how to pick the right metric for your problem, and how to build a fuzzy search system. By the end you will be able to implement string matching for autocomplete, deduplication, spell checking, and data cleaning tasks.

String Similarity with textdistance: Quick Example

Here is a minimal example comparing two strings using Levenshtein distance:

# quick_textdistance.py
import textdistance

# Levenshtein: edit distance (how many changes to go from a to b)
dist = textdistance.levenshtein("python", "pyhton")
print(f"Edit distance: {dist}")

# Normalized similarity: 0.0 (completely different) to 1.0 (identical)
sim = textdistance.levenshtein.normalized_similarity("python", "pyhton")
print(f"Similarity: {sim:.2f}")

# Compare multiple algorithms at once
algorithms = [
    textdistance.levenshtein,
    textdistance.jaro_winkler,
    textdistance.jaccard,
]
a, b = "colour", "color"
for alg in algorithms:
    sim = alg.normalized_similarity(a, b)
    print(f"{alg.__class__.__name__:20s}: {sim:.3f}")

Output:

Edit distance: 2
Similarity: 0.67
Levenshtein          : 0.833
JaroWinkler          : 0.933
Jaccard              : 0.800

The edit distance of 2 means two character swaps turn “python” into “pyhton”. Normalized to 0-1, they are 67% similar. The sections below cover each algorithm family, when to use them, and practical application patterns.

What Is textdistance and Why Use It?

textdistance is a Python library that implements text distance algorithms in a unified interface. Every algorithm exposes the same methods: .distance(a, b) for the raw distance, .similarity(a, b) for the raw similarity, and .normalized_similarity(a, b) for a 0-1 score. This makes it easy to experiment with multiple metrics using the same code.

Algorithm FamilyBest ForExample Use Case
Edit distance (Levenshtein, Hamming)Typos, OCR errorsSpell checking, fuzzy search
Sequence (Jaro, Jaro-Winkler)Name matchingDeduplicating customer records
Token (Jaccard, Sorensen)Document similarityDetecting duplicate content
Phonetic (Soundex, Metaphone)Sound-alike matchingName search ignoring spelling
Compression-based (NCD)Arbitrary similarityLanguage detection

Install with pip:

# pip install textdistance

import textdistance
print(textdistance.__version__)
4.6.3
Debug Dee measuring string distance between colour and color
distance=1. Close enough for government work, and also for fuzzy search.

Edit Distance Algorithms

Edit distance measures how many character-level operations are needed to transform one string into another. Levenshtein allows insertions, deletions, and substitutions. Hamming only counts substitutions (strings must be the same length). Damerau-Levenshtein also allows transpositions (swapping adjacent characters), making it better for catching typing errors.

# textdistance_edit.py
import textdistance

pairs = [
    ("kitten", "sitting"),    # classic example
    ("python", "pyhton"),     # transposition typo
    ("colour", "color"),      # British/American spelling
    ("algorithm", "altgorithm"),  # insertion typo
]

print(f"{'Pair':<35} {'Levenshtein':>12} {'Damerau':>10} {'Similarity':>12}")
print("-" * 72)
for a, b in pairs:
    lev = textdistance.levenshtein.distance(a, b)
    dam = textdistance.damerau_levenshtein.distance(a, b)
    sim = textdistance.levenshtein.normalized_similarity(a, b)
    print(f"'{a}' vs '{b}'{'':>{35 - len(a) - len(b) - 9}} {lev:>12} {dam:>10} {sim:>12.3f}")

Output:

Pair                                Levenshtein    Damerau   Similarity
------------------------------------------------------------------------
'kitten' vs 'sitting'                         3          3        0.615
'python' vs 'pyhton'                          2          1        0.667
'colour' vs 'color'                           1          1        0.833
'algorithm' vs 'altgorithm'                   2          2        0.800

Notice how Damerau-Levenshtein scores “pyhton” as distance 1 (one transposition) while Levenshtein scores it as 2 (one deletion + one insertion). For spell checking where typos often involve swapped adjacent characters, Damerau-Levenshtein is the better choice. For OCR correction where characters are independently misread (not swapped), standard Levenshtein is usually sufficient.

Jaro-Winkler for Name Matching

Jaro-Winkler is specifically designed for short strings like personal names. It gives extra weight to matching prefixes — the assumption being that people are more likely to match on the beginning of a name. It returns a value between 0 and 1 directly (no normalization needed).

# textdistance_jaro.py
import textdistance

# Name matching examples
name_pairs = [
    ("John Smith", "Jon Smith"),      # first name variant
    ("McDonald", "MacDonald"),         # prefix variant
    ("Williams", "Williamson"),        # suffix addition
    ("Catherine", "Katherine"),        # spelling variant
    ("Robert", "Bob"),                 # nickname (low similarity expected)
]

jaro = textdistance.jaro
jw = textdistance.jaro_winkler

print(f"{'Name pair':<40} {'Jaro':>8} {'JaroWinkler':>13}")
print("-" * 64)
for a, b in name_pairs:
    print(f"'{a}' vs '{b}'"[:40].ljust(40),
          f"{jaro.normalized_similarity(a, b):>8.3f}",
          f"{jw.normalized_similarity(a, b):>13.3f}")

Output:

Name pair                               Jaro  JaroWinkler
----------------------------------------------------------------
'John Smith' vs 'Jon Smith'            0.985        0.991
'McDonald' vs 'MacDonald'              0.917        0.958
'Williams' vs 'Williamson'             0.944        0.944
'Catherine' vs 'Katherine'             0.926        0.926
'Robert' vs 'Bob'                      0.556        0.556

Jaro-Winkler correctly identifies that “McDonald” and “MacDonald” are very similar (0.958) while correctly flagging “Robert” vs “Bob” as low similarity (0.556). This matches human intuition about name variants. For customer record deduplication, a threshold of 0.90+ on Jaro-Winkler is a common starting point for flagging likely duplicates for human review.

API Alice at control panel with similarity threshold sliders
Jaro-Winkler 0.90+: where ‘Jon’ becomes ‘John’ and no customer is lost twice.

Token-Based Similarity for Longer Text

For longer strings — sentences, documents, product descriptions — character-level distance metrics become slow and less meaningful. Token-based metrics split strings into words (or character n-grams) and measure set overlap. Jaccard similarity is the most common: it divides the size of the intersection by the size of the union.

# textdistance_jaccard.py
import textdistance

# Product description deduplication
desc1 = "Python programming for beginners complete guide 2024"
desc2 = "Complete Python programming guide for beginners 2024"
desc3 = "Java programming for beginners complete guide 2024"
desc4 = "Cooking pasta with fresh tomatoes Italian style"

pairs = [
    (desc1, desc2, "reordered words"),
    (desc1, desc3, "different language"),
    (desc1, desc4, "different topic"),
]

jac = textdistance.jaccard
cos = textdistance.cosine

print(f"{'Comparison':<25} {'Jaccard':>10} {'Cosine':>10}")
print("-" * 47)
for a, b, label in pairs:
    print(f"{label:<25} {jac.normalized_similarity(a, b):>10.3f} {cos.normalized_similarity(a, b):>10.3f}")

Output:

Comparison                   Jaccard     Cosine
-----------------------------------------------
reordered words                0.800      0.800
different language             0.600      0.600
different topic                0.000      0.000

Jaccard correctly identifies that the reordered description (same words, different order) is 80% similar, the Java version shares 60% of its vocabulary, and the pasta description shares nothing. For content deduplication or finding near-duplicate product listings, token-based metrics with a threshold around 0.7-0.8 catch most duplicates without false positives.

Choosing the Right Algorithm

The most common mistake when implementing fuzzy matching is picking the most well-known algorithm (usually Levenshtein) for every use case. Here is a practical guide:

ProblemRecommended AlgorithmWhy
Spell checkingDamerau-LevenshteinCatches transpositions (common typos)
Name deduplicationJaro-WinklerPrefix-weighted, good for names
Product searchJaro-Winkler or JaccardHandles abbreviations and word order
Document similarityJaccard or CosineToken-based, order-independent
DNA/code sequencesHammingFixed-length substitution counting
Phonetic matchingSoundex or MetaphoneSound-alike rather than spelling

When in doubt, compare multiple algorithms on a sample of your actual data and measure how often each one agrees with a human judgment. No algorithm is universally best — the right choice depends on your specific strings, language, and error patterns.

Real-Life Example: Fuzzy Product Search Engine

# fuzzy_search.py
import textdistance

# Product catalog
CATALOG = [
    "Python Programming Complete Guide",
    "Learning JavaScript for Beginners",
    "Data Science with Pandas and NumPy",
    "Machine Learning Fundamentals",
    "Web Development with Flask",
    "Advanced SQL Database Design",
    "Docker and Kubernetes DevOps",
    "React Frontend Development",
]

def fuzzy_search(query, catalog, threshold=0.4, top_n=3):
    """Find catalog items similar to query using multiple algorithms."""
    results = []
    for item in catalog:
        # Use Jaro-Winkler for word-level matching
        jw_sim = textdistance.jaro_winkler.normalized_similarity(
            query.lower(), item.lower()
        )
        # Use Jaccard for token overlap
        jac_sim = textdistance.jaccard.normalized_similarity(
            query.lower(), item.lower()
        )
        # Average the two scores
        combined = (jw_sim + jac_sim) / 2
        if combined >= threshold:
            results.append((item, combined))

    # Sort by score descending
    results.sort(key=lambda x: x[1], reverse=True)
    return results[:top_n]

# Test with various query types
queries = [
    "pythn programming",      # typo
    "machne learning",         # typo
    "docker kubernetes",       # exact keywords
    "sql databse",             # typo
]

for query in queries:
    print(f"\nQuery: '{query}'")
    matches = fuzzy_search(query, CATALOG)
    if matches:
        for item, score in matches:
            print(f"  {score:.3f}  {item}")
    else:
        print("  No matches found")

Output:

Query: 'pythn programming'
  0.812  Python Programming Complete Guide

Query: 'machne learning'
  0.734  Machine Learning Fundamentals

Query: 'docker kubernetes'
  0.821  Docker and Kubernetes DevOps

Query: 'sql databse'
  0.681  Advanced SQL Database Design

The combined Jaro-Winkler and Jaccard score catches typos (Jaro-Winkler handles character-level errors) and keyword matching (Jaccard handles token overlap). Adjust the threshold parameter to trade off between recall (catching more but with more false positives) and precision (fewer results but more accurate). For production, you would pre-compute scores against the full catalog and cache results for common queries.

Loop Larry typing misspelled query with fuzzy matches shown
fuzzy_search(‘pythn’): because users never spell things right, and that’s fine.

Frequently Asked Questions

Is textdistance fast enough for large datasets?

textdistance can use C-accelerated backends like python-Levenshtein or jellyfish if they are installed. Install them alongside textdistance for a significant speedup on large comparisons. For searching a catalog of thousands of items, computing distances in pure Python is fast enough. For millions of comparisons (e.g., deduplicating a full customer database), use vectorized approaches with libraries like polyfuzz or pre-index with a tool like Elasticsearch that has built-in fuzzy matching.

How do I choose the right similarity threshold?

There is no universal threshold — it depends on your data and algorithm. The practical approach: collect a sample of 50-100 pairs that should match and 50-100 that should not, compute similarity scores, and plot the distribution. The ideal threshold sits in the gap between the two distributions. Start at 0.85 for name matching with Jaro-Winkler, 0.70 for document similarity with Jaccard, and 0.80 for general fuzzy search. Adjust based on how many false positives and false negatives you observe.

When should I use phonetic algorithms like Soundex?

Use phonetic algorithms when you want to match strings that sound the same regardless of spelling. “Smith” and “Smyth” are phonetically identical. “Cathy” and “Kathy” sound alike. Phonetic matching is valuable for name search in databases where historical records have inconsistent spelling, or in voice-to-text applications where the input is a transcription. textdistance implements Soundex, Metaphone, and NYSIIS.

Does textdistance work with non-English text?

Yes. Edit distance algorithms (Levenshtein, Hamming) work on any Unicode string regardless of language. Token-based algorithms (Jaccard, Cosine) split on whitespace by default, which works for space-separated languages but not for languages like Chinese or Japanese that do not use spaces. For those languages, use character n-gram tokenization: pass qval=1 or qval=2 to the constructor to compare character unigrams or bigrams instead of words.

What is the difference between similarity and normalized similarity?

.similarity() returns a raw score whose range depends on the algorithm (Levenshtein returns an integer; Jaro returns 0-1). .normalized_similarity() always returns a float between 0.0 and 1.0, where 1.0 means identical. Use normalized similarity when you want to compare scores across different algorithms or set a consistent threshold. Use raw distance when you need the actual edit count for display or downstream calculations.

Conclusion

The textdistance library gives you over 30 string similarity algorithms with a consistent, easy-to-use interface. You learned how to use edit distance algorithms for spell checking, Jaro-Winkler for name matching, Jaccard for document similarity, and how to combine multiple metrics for robust fuzzy search. The product search example showed how to build a practical fuzzy search engine in under 30 lines of Python.

The next step is to apply textdistance to a real deduplication or search problem in your own data. Start by sampling your data, trying two or three algorithms, and measuring which one best matches your human judgment on what should and should not match. The textdistance documentation on GitHub includes a comparison table of all 30+ algorithms with complexity information and typical use cases.