How To Use Python chardet for Character Encoding Detection

How To Use Python chardet for Character Encoding Detection

Intermediate

If you have ever opened a CSV file and seen garbled characters like a EUR" or \xc3\xa9 where accented letters should be, you have hit an encoding problem. Character encoding is one of the most frustrating silent failures in data pipelines — files load without errors but the content is wrong. Python’s default assumption is UTF-8, but a huge proportion of legacy files, especially from Windows systems, are encoded in latin-1, Windows-1252, or Shift-JIS. The chardet library detects the encoding automatically so you can fix these problems reliably.

chardet is a port of the encoding detection algorithm from Mozilla Firefox. It analyzes the byte patterns in your data and returns the most likely encoding along with a confidence score. You install it with a single pip command and use it with just a few lines of code. No manual byte inspection needed.

In this article, you will learn how to detect encodings with chardet, use it to open files correctly, detect encodings in bulk across directories, build a file encoding converter, and handle the edge cases that trip up beginners. By the end, you will have a reliable toolkit for dealing with any encoding problem you encounter in real data work.

Detecting Encoding: Quick Example

# detect_encoding.py
import chardet

# Detect encoding of a byte string
data = b'\xc3\xa9l\xc3\xa8ve'  # "eleve" in UTF-8 with accents
result = chardet.detect(data)
print(result)

# Decode it correctly using the detected encoding
text = data.decode(result['encoding'])
print("Decoded text:", text)

Output:

{'encoding': 'utf-8', 'confidence': 0.99, 'language': ''}
Decoded text: eleve

The detect function returns a dict with three keys: encoding (the detected charset), confidence (0.0 to 1.0 — how certain chardet is), and language (a hint for multi-byte encodings like Chinese or Japanese). The confidence score is your reliability signal — anything above 0.8 is generally trustworthy for western encodings.

What Is Character Encoding and Why Does It Break?

A character encoding is a mapping between characters (letters, symbols, emoji) and bytes. UTF-8 is the modern universal standard — it encodes all Unicode characters and is the default in Python 3. But hundreds of older encodings exist, each covering a subset of characters in their own byte layout. When software writes a file in Windows-1252 but Python reads it as UTF-8, certain byte sequences are invalid UTF-8 and Python raises a UnicodeDecodeError — or worse, silently misinterprets them.

EncodingCommon SourceProblem Symptom
UTF-8Linux, Mac, modern WindowsUsually fine — Python default
Windows-1252 (cp1252)Windows apps, Excel exportsSmart quotes, euro sign garbled
latin-1 (iso-8859-1)Western European legacy systemsAccented characters broken
Shift-JIS / EUC-JPJapanese systemsJapanese text appears as symbols
GB2312 / GBKChinese systemsChinese text scrambled

chardet solves this by treating encoding detection as a statistical problem — it looks at byte frequency distributions and byte-order marks (BOMs) to make an educated guess. It works best with longer strings (100+ bytes) and can struggle with very short snippets that do not contain enough pattern data.

Debug Dee detecting character encoding
UnicodeDecodeError: the error that means you assumed UTF-8 again.

Opening Files with Auto-Detected Encoding

The most common real-world use is opening a file whose encoding you do not know. Read the raw bytes, detect the encoding, then decode:

# open_detected.py
import chardet

def read_file_auto(filepath: str) -> str:
    """Read a text file with automatic encoding detection."""
    with open(filepath, 'rb') as f:  # Read as raw bytes
        raw = f.read()

    result = chardet.detect(raw)
    encoding = result['encoding']
    confidence = result['confidence']

    if confidence < 0.7:
        print(f"Warning: Low confidence ({confidence:.0%}) for {encoding}")

    print(f"Detected: {encoding} ({confidence:.0%} confidence)")
    return raw.decode(encoding, errors='replace')

# Test with a windows-1252 encoded file
text = read_file_auto('legacy_report.csv')
print(text[:100])

Output:

Detected: Windows-1252 (98% confidence)
Name,Department,Salary
Francois Dupont,Finance,52000
Rene Müller,Engineering,67000

The errors='replace' argument in decode substitutes the Unicode replacement character (U+FFFD) for any bytes that still cannot be decoded, instead of raising an error. This is a safe fallback for production pipelines where you cannot afford crashes, though you should log any replacement occurrences for later review.

Incremental Detection for Large Files

For large files, loading everything into memory just to detect encoding is wasteful. chardet's UniversalDetector lets you feed data in chunks and stop as soon as confidence is high enough:

# incremental_detect.py
from chardet.universaldetector import UniversalDetector

def detect_encoding_streaming(filepath: str, chunk_size: int = 4096) -> dict:
    """Detect encoding of a large file without loading it all into memory."""
    detector = UniversalDetector()
    with open(filepath, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            detector.feed(chunk)
            if detector.done:  # Confident enough -- stop early
                break
    detector.close()
    return detector.result

result = detect_encoding_streaming('large_log_file.txt')
print(f"Encoding: {result['encoding']}, Confidence: {result['confidence']:.1%}")

Output:

Encoding: UTF-8-SIG, Confidence: 100.0%

UTF-8-SIG means the file has a UTF-8 byte-order mark (BOM) -- a three-byte sequence at the start (\xef\xbb\xbf) that some Windows tools add. Python's utf-8-sig codec strips the BOM automatically when decoding, which is almost always what you want. Without chardet, you would have to check for the BOM manually.

Bulk Encoding Converter

Batch converting files to UTF-8
Batch converting 400 files to UTF-8. Let it cook.

When cleaning up a directory of legacy files, you often need to convert them all to UTF-8. This script scans a folder, detects each file's encoding, and converts non-UTF-8 files in place:

# bulk_convert.py
import chardet
from pathlib import Path

def convert_to_utf8(folder: str, extensions: list) -> None:
    """Convert all matching files in a folder to UTF-8."""
    base = Path(folder)
    converted = 0
    skipped = 0
    errors = 0

    for ext in extensions:
        for filepath in base.rglob(f'*.{ext}'):
            try:
                raw = filepath.read_bytes()
                result = chardet.detect(raw)
                encoding = result['encoding']

                if encoding is None:
                    print(f"Could not detect: {filepath.name}")
                    errors += 1
                    continue

                if encoding.lower().replace('-', '') in ('utf8', 'utf8sig', 'ascii'):
                    skipped += 1
                    continue  # Already UTF-8 compatible

                # Decode with detected encoding, re-encode as UTF-8
                text = raw.decode(encoding, errors='replace')
                filepath.write_text(text, encoding='utf-8')
                print(f"Converted {filepath.name}: {encoding} -> UTF-8")
                converted += 1

            except Exception as e:
                print(f"Error processing {filepath.name}: {e}")
                errors += 1

    print(f"\nDone: {converted} converted, {skipped} skipped, {errors} errors")

convert_to_utf8('./data', extensions=['csv', 'txt', 'log'])

Output:

Converted customers.csv: Windows-1252 -> UTF-8
Converted old_report.txt: ISO-8859-1 -> UTF-8
Converted access.log: ascii -> UTF-8

Done: 3 converted, 12 skipped, 0 errors

Real-Life Example: CSV Pipeline with Encoding Validation

CSV pipeline with encoding validation
detect() before you decode(). Every time.

This pipeline safely reads CSV files from multiple sources, handles encoding detection, logs any suspicious files, and outputs a clean UTF-8 combined CSV:

# csv_pipeline.py
import chardet
import csv
import io
from pathlib import Path
from dataclasses import dataclass

@dataclass
class FileReport:
    path: str
    encoding: str
    confidence: float
    row_count: int
    warning: str = ''

def process_csv_folder(input_folder: str, output_file: str) -> list:
    reports = []
    all_rows = []
    headers = None

    for csv_path in Path(input_folder).glob('*.csv'):
        raw = csv_path.read_bytes()
        result = chardet.detect(raw)
        encoding = result['encoding'] or 'utf-8'
        confidence = result['confidence']

        warning = ''
        if confidence < 0.8:
            warning = f'Low confidence ({confidence:.0%})'

        try:
            text = raw.decode(encoding, errors='replace')
            reader = csv.DictReader(io.StringIO(text))
            rows = list(reader)

            if headers is None and rows:
                headers = reader.fieldnames

            all_rows.extend(rows)
            reports.append(FileReport(
                path=csv_path.name,
                encoding=encoding,
                confidence=confidence,
                row_count=len(rows),
                warning=warning
            ))
        except Exception as e:
            reports.append(FileReport(
                path=csv_path.name,
                encoding=encoding,
                confidence=confidence,
                row_count=0,
                warning=f'Parse error: {e}'
            ))

    # Write combined output as UTF-8
    if all_rows and headers:
        with open(output_file, 'w', encoding='utf-8', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=headers)
            writer.writeheader()
            writer.writerows(all_rows)

    return reports

reports = process_csv_folder('./input_csvs', 'combined_output.csv')
for r in reports:
    status = f"WARNING: {r.warning}" if r.warning else "OK"
    print(f"{r.path}: {r.encoding} ({r.confidence:.0%}) | {r.row_count} rows | {status}")

Output:

sales_q1.csv: Windows-1252 (96%) | 142 rows | OK
customers.csv: UTF-8 (99%) | 89 rows | OK
legacy_data.csv: ISO-8859-1 (73%) | 31 rows | WARNING: Low confidence (73%)

Frequently Asked Questions

What if chardet detects the wrong encoding?

chardet is statistical, not deterministic -- it can be wrong, especially for short strings or files with predominantly ASCII content (since ASCII is a subset of many encodings, chardet cannot distinguish them). If you know the likely source of your files (e.g., all from a Windows 10 system in Germany), you can use the detected encoding as a hint and fall back to cp1252 or iso-8859-1 if the result looks wrong. Always validate the decoded output visually for critical data.

What confidence threshold should I use?

A confidence above 0.9 is generally reliable for common western encodings. For UTF-8 and ASCII, you typically see 0.99+. For ambiguous encodings like Windows-1252 vs latin-1 (which share most of their byte space), confidence may be 0.7-0.85. For production pipelines, log any detection below 0.85 and manually review those files. For interactive tools, warn the user and offer to retry with a different encoding.

Why does chardet fail on very short strings?

Encoding detection is statistical -- it needs enough bytes to observe characteristic patterns. A 10-byte string like a name or short code may not contain enough distinguishing bytes. As a rule of thumb, chardet needs at least 100-200 bytes for reliable detection, and 1KB+ for high confidence with multi-byte encodings. For short strings, you are better off knowing the source system's encoding and hardcoding it.

What is a BOM and should I remove it?

A Byte Order Mark (BOM) is a special marker at the start of a file that identifies the encoding. UTF-8-SIG files have a 3-byte BOM that Excel adds when saving CSV files. Python's utf-8-sig codec handles this automatically -- it strips the BOM on read and adds it on write. If you use the plain utf-8 codec, the BOM shows up as the character  at the start of your string, which can cause problems when parsing CSV files (the first column header gets a weird prefix).

Is there a faster alternative to chardet?

Yes -- charset-normalizer is a modern, actively maintained alternative that is often more accurate and faster than chardet for modern encodings. It is included as a dependency of the requests library. The API is compatible: from charset_normalizer import detect. For new projects, consider charset-normalizer over chardet, as chardet has been less actively maintained in recent years.

Conclusion

You now know how to use chardet to handle the encoding chaos that comes with real-world data. We covered basic detection with chardet.detect(), opening files safely with auto-detected encodings, streaming detection with UniversalDetector for large files, bulk conversion of legacy file directories to UTF-8, and a complete CSV pipeline that reports and handles encoding issues gracefully.

The most important takeaway: always read files as raw bytes (open(path, 'rb')) before deciding the encoding, never assume UTF-8 for data you did not create yourself, and treat low confidence scores as a signal to inspect the output manually. Encoding bugs are silent -- a file that "opens fine" but has garbled special characters is worse than one that crashes with a clear error.

See the chardet documentation for the full list of supported encodings and language detection capabilities.

How To Use Python boto3 for AWS S3 File Operations

How To Use Python boto3 for AWS S3 File Operations

Intermediate

If you have ever built a Python app that needs to store files, images, or backups in the cloud, AWS S3 is one of the most reliable options available. Whether you are saving user uploads, archiving log files, or distributing static assets, S3 gives you virtually unlimited storage with simple, consistent access via the boto3 library. The challenge is knowing how to connect your Python code to S3 correctly — and that is exactly what this article covers.

The boto3 library is the official AWS SDK for Python. It handles authentication, request signing, and all the low-level HTTP details, so you can focus on your application logic. You will need an AWS account and an IAM user with S3 access, but once those are in place, boto3 makes S3 operations surprisingly straightforward.

In this article, you will learn how to configure boto3 credentials, create and list S3 buckets, upload and download files, manage object metadata, work with presigned URLs, and build a practical file sync script that mirrors a local folder to S3. By the end, you will have a working toolkit for integrating S3 into any Python project.

Using boto3 with S3: Quick Example

Before diving into setup, here is a minimal example that uploads a file to S3 and downloads it back — the two most common operations you will need in any project:

# s3_quick.py
import boto3

s3 = boto3.client('s3', region_name='us-east-1')
bucket = 'my-python-bucket-demo'

# Upload a file
s3.upload_file('hello.txt', bucket, 'hello.txt')
print("Uploaded hello.txt to S3")

# Download it back
s3.download_file(bucket, 'hello.txt', 'hello_downloaded.txt')
print("Downloaded hello.txt from S3")

Output:

Uploaded hello.txt to S3
Downloaded hello.txt from S3

The upload_file method takes three arguments: local file path, bucket name, and S3 object key (the file name in S3). The download_file method reverses this: bucket, S3 key, local destination. The rest of this article builds on these two core operations with more realistic scenarios.

What Is boto3 and Why Use It for S3?

boto3 is the official AWS SDK for Python, maintained by Amazon. It provides two levels of access: the low-level client interface, which maps almost directly to the AWS REST API, and the higher-level resource interface, which wraps common operations in convenient Python objects. For S3, both are widely used — clients give you full control; resources make simple tasks more readable.

InterfaceStyleBest ForExample
clientLow-level, dict-basedFull control, presigned URLs, metadatas3.put_object(...)
resourceObject-orientedSimple CRUD, bucket/object iterationbucket.upload_file(...)

For most projects, you will use the client interface — it gives you access to every S3 feature. The resource interface is convenient for quick scripts but does not support all operations. Throughout this article, we will use the client interface so the patterns work everywhere.

Setting up IAM credentials for boto3
IAM credentials in order. Bucket policy reviewed twice. Ready.

Setting Up boto3 and AWS Credentials

Install boto3 with pip:

# terminal
pip install boto3

Output:

Successfully installed boto3-1.34.x botocore-1.34.x

Next, configure your AWS credentials. The recommended approach for development is the AWS credentials file. boto3 automatically reads from ~/.aws/credentials — you never need to hardcode keys in your code:

# ~/.aws/credentials  (create this file manually or run: aws configure)
[default]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY
region = us-east-1

For production, use IAM roles attached to your EC2 instance or Lambda function instead of hardcoded keys — boto3 picks these up automatically with no code changes. If you are testing locally without the credentials file, you can pass them directly to the client, but never commit keys to source control:

# s3_explicit_credentials.py
import boto3

# Only for local testing -- use IAM roles or credentials file in production
s3 = boto3.client(
    's3',
    region_name='us-east-1',
    aws_access_key_id='YOUR_KEY',
    aws_secret_access_key='YOUR_SECRET'
)
print("Client created:", s3.meta.endpoint_url or 'default AWS endpoint')

Output:

Client created: None

Creating and Listing S3 Buckets

A bucket is a top-level container in S3 — like a root folder. Every object in S3 lives inside a bucket. Bucket names must be globally unique across all AWS accounts, lowercase, and between 3-63 characters. Here is how to create a bucket and verify it exists:

# s3_buckets.py
import boto3

s3 = boto3.client('s3', region_name='us-east-1')

# Create a new bucket (us-east-1 uses a different API than other regions)
bucket_name = 'my-python-demo-bucket-2026'

try:
    s3.create_bucket(Bucket=bucket_name)
    print(f"Created bucket: {bucket_name}")
except s3.exceptions.BucketAlreadyOwnedByYou:
    print(f"Bucket already exists and is yours: {bucket_name}")

# List all your buckets
response = s3.list_buckets()
print("\nYour S3 buckets:")
for bucket in response['Buckets']:
    print(f"  {bucket['Name']} (created: {bucket['CreationDate'].date()})")

Output:

Created bucket: my-python-demo-bucket-2026

Your S3 buckets:
  my-python-demo-bucket-2026 (created: 2026-05-02)

Note that for regions other than us-east-1, you must pass a CreateBucketConfiguration parameter specifying the region — boto3 will raise a InvalidLocationConstraint error otherwise. For example: s3.create_bucket(Bucket=name, CreateBucketConfiguration={'LocationConstraint': 'eu-west-1'}).

Uploading files with boto3
create_bucket() — because mkdir -p doesn’t scale to petabytes.

Uploading and Downloading Files

boto3 provides three upload methods, each suited to different situations. upload_file reads from disk, upload_fileobj reads from a file-like object, and put_object uploads raw bytes or strings. For most use cases, upload_file is the right choice because it automatically uses multipart upload for large files:

# s3_upload.py
import boto3
import os

s3 = boto3.client('s3', region_name='us-east-1')
bucket = 'my-python-demo-bucket-2026'

# Upload a local file
s3.upload_file(
    Filename='report.csv',      # local path
    Bucket=bucket,
    Key='reports/2026/report.csv',  # S3 path (key)
    ExtraArgs={'ContentType': 'text/csv'}
)
print("Uploaded report.csv")

# Upload from an in-memory bytes object
import io
data = io.BytesIO(b"name,score\nAlice,95\nBob,87\n")
s3.upload_fileobj(data, bucket, 'reports/2026/scores.csv')
print("Uploaded scores from memory")

# List objects in a prefix
response = s3.list_objects_v2(Bucket=bucket, Prefix='reports/')
for obj in response.get('Contents', []):
    print(f"  {obj['Key']} ({obj['Size']} bytes)")

Output:

Uploaded report.csv
Uploaded scores from memory
  reports/2026/report.csv (1024 bytes)
  reports/2026/scores.csv (28 bytes)

Downloading follows the same pattern with mirrored methods. Use download_file for disk, download_fileobj for streaming into memory, and get_object when you need the raw response body or metadata alongside the content:

# s3_download.py
import boto3
import io

s3 = boto3.client('s3', region_name='us-east-1')
bucket = 'my-python-demo-bucket-2026'

# Download to disk
s3.download_file(bucket, 'reports/2026/report.csv', 'local_report.csv')
print("Downloaded to local_report.csv")

# Download into memory (no temp file)
buffer = io.BytesIO()
s3.download_fileobj(bucket, 'reports/2026/scores.csv', buffer)
buffer.seek(0)
content = buffer.read().decode('utf-8')
print("In-memory content:", content[:40])

# Get object with metadata
response = s3.get_object(Bucket=bucket, Key='reports/2026/report.csv')
print("Content-Type:", response['ContentType'])
print("Last-Modified:", response['LastModified'])

Output:

Downloaded to local_report.csv
In-memory content: name,score
Alice,95
Bob,87

Content-Type: text/csv
Last-Modified: 2026-05-02 10:00:00+00:00

Deleting and Copying Objects

Deleting and copying objects are single API calls. Copying is particularly useful for server-side operations — moving objects between buckets, creating versioned backups, or changing storage classes without downloading and re-uploading:

# s3_delete_copy.py
import boto3

s3 = boto3.client('s3', region_name='us-east-1')
bucket = 'my-python-demo-bucket-2026'

# Copy an object within the same bucket
s3.copy_object(
    CopySource={'Bucket': bucket, 'Key': 'reports/2026/report.csv'},
    Bucket=bucket,
    Key='reports/archive/report_backup.csv'
)
print("Copied report.csv to archive/")

# Delete a single object
s3.delete_object(Bucket=bucket, Key='reports/2026/scores.csv')
print("Deleted scores.csv")

# Delete multiple objects at once (more efficient than one-by-one)
s3.delete_objects(
    Bucket=bucket,
    Delete={
        'Objects': [
            {'Key': 'reports/2026/report.csv'},
            {'Key': 'reports/archive/report_backup.csv'},
        ]
    }
)
print("Deleted 2 objects in batch")

Output:

Copied report.csv to archive/
Deleted scores.csv
Deleted 2 objects in batch

Always use delete_objects for batch deletes — it saves API calls and is significantly faster than calling delete_object in a loop when removing dozens or hundreds of files.

Copying files between S3 buckets
delete_objects() takes a list. Your loop takes a credit card.

Generating Presigned URLs

Presigned URLs allow you to grant temporary, time-limited access to private S3 objects without making them public. They are essential for scenarios like letting users download their own files, sharing reports with external stakeholders, or accepting uploads directly from browsers — all without exposing your AWS credentials:

# s3_presigned.py
import boto3
from botocore.exceptions import ClientError

s3 = boto3.client('s3', region_name='us-east-1')
bucket = 'my-python-demo-bucket-2026'

# Generate a presigned download URL (valid for 1 hour)
try:
    url = s3.generate_presigned_url(
        'get_object',
        Params={'Bucket': bucket, 'Key': 'reports/2026/report.csv'},
        ExpiresIn=3600  # seconds
    )
    print("Download URL (expires in 1 hour):")
    print(url[:80] + "...")
except ClientError as e:
    print(f"Error generating URL: {e}")

# Generate a presigned POST URL for browser uploads
post = s3.generate_presigned_post(
    bucket,
    'uploads/user_file.pdf',
    Fields={'Content-Type': 'application/pdf'},
    Conditions=[['content-length-range', 0, 10485760]],  # max 10MB
    ExpiresIn=600  # 10 minutes
)
print("\nPresigned POST fields:", list(post['fields'].keys()))

Output:

Download URL (expires in 1 hour):
https://my-python-demo-bucket-2026.s3.amazonaws.com/reports/2026/report.csv?X-Amz-Algorithm=AWS4...

Presigned POST fields: ['key', 'AWSAccessKeyId', 'policy', 'signature', 'Content-Type']

The presigned POST URL is different from the GET URL — it returns a dictionary with a URL and fields that your frontend must include in a multipart form submission. This lets you accept direct browser-to-S3 uploads without routing files through your server, which saves bandwidth and infrastructure cost.

Real-Life Example: Local Folder to S3 Sync Script

boto3 S3 pipeline with API Alice
S3 sync: because rsync doesn’t have a free tier.

This script syncs a local folder to an S3 bucket, uploading only new or changed files based on file size and last-modified time. It is the kind of utility you would use for automated backups or deploying static websites:

# s3_sync.py
import boto3
import os
import hashlib
from pathlib import Path
from botocore.exceptions import ClientError

def get_local_files(folder: str) -> dict:
    """Return {relative_path: file_size} for all files in folder."""
    result = {}
    base = Path(folder)
    for path in base.rglob('*'):
        if path.is_file():
            rel = str(path.relative_to(base)).replace('\\', '/')
            result[rel] = path.stat().st_size
    return result

def get_s3_files(s3_client, bucket: str, prefix: str) -> dict:
    """Return {key: size} for all objects under prefix."""
    result = {}
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []):
            key = obj['Key']
            if prefix:
                key = key[len(prefix):]
            result[key] = obj['Size']
    return result

def sync_folder_to_s3(local_folder: str, bucket: str, prefix: str = '') -> None:
    s3 = boto3.client('s3', region_name='us-east-1')
    local = get_local_files(local_folder)
    remote = get_s3_files(s3, bucket, prefix)

    uploaded = 0
    skipped = 0

    for rel_path, local_size in local.items():
        s3_key = (prefix + rel_path) if prefix else rel_path
        remote_size = remote.get(rel_path)

        if remote_size == local_size:
            skipped += 1
            continue  # File unchanged, skip

        local_full = os.path.join(local_folder, rel_path)
        s3.upload_file(local_full, bucket, s3_key)
        print(f"  Uploaded: {rel_path}")
        uploaded += 1

    print(f"\nSync complete: {uploaded} uploaded, {skipped} skipped")

# Run the sync
sync_folder_to_s3('./my_project', 'my-python-demo-bucket-2026', prefix='backups/')

Output:

  Uploaded: README.md
  Uploaded: src/main.py
  Uploaded: src/utils.py

Sync complete: 3 uploaded, 2 skipped

This script uses a paginator for the S3 listing, which is important — S3 list_objects_v2 returns at most 1000 objects per call, and the paginator handles the continuation tokens automatically. You can extend this script to add deletion of remote files that no longer exist locally, or to use ETag-based comparison for more accurate change detection.

Frequently Asked Questions

Why do I get a redirect error when creating buckets in non-us-east-1 regions?

S3 bucket creation in regions other than us-east-1 requires passing a CreateBucketConfiguration dict with the LocationConstraint set to your target region. Without it, you get a IllegalLocationConstraintException. The us-east-1 region is the exception — it does not accept the configuration parameter. To write region-agnostic code, conditionally add CreateBucketConfiguration only when the region is not us-east-1.

How does boto3 find credentials automatically?

boto3 follows a credential chain in this order: environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY), the ~/.aws/credentials file, IAM instance profiles (for EC2), and ECS task role credentials. In production on AWS, always use IAM roles instead of hardcoded keys — the SDK picks them up automatically with zero configuration changes in your code.

How do I upload large files efficiently?

boto3’s upload_file automatically uses multipart upload for files larger than 8MB (the default threshold). You can configure the threshold and part size via a TransferConfig object: config = boto3.s3.transfer.TransferConfig(multipart_threshold=8388608, max_concurrency=10), then pass it as s3.upload_file(..., Config=config). For very large files (GB+), increasing max_concurrency significantly improves throughput.

How do I make an object publicly accessible?

Set the ACL to 'public-read' in ExtraArgs during upload: s3.upload_file(file, bucket, key, ExtraArgs={'ACL': 'public-read'}). However, newer AWS accounts have “Block Public Access” enabled by default at the account level. You may need to disable this in the S3 console under your bucket’s Permissions tab. For most use cases, presigned URLs are a better alternative to public objects because they are time-limited and don’t require changing bucket policies.

How can I reduce S3 storage costs?

Use lifecycle rules to automatically transition objects to cheaper storage classes (Standard-IA after 30 days, Glacier after 90 days) or delete them after a retention period. Set these up via the S3 console or with s3.put_bucket_lifecycle_configuration(). Also enable S3 Intelligent-Tiering for data with unpredictable access patterns — it automatically moves objects between access tiers with no retrieval fees.

Conclusion

You now have a complete toolkit for S3 operations in Python using boto3. We covered authentication via credentials files and IAM roles, bucket creation and listing, uploading and downloading files with all three method variants, batch delete and server-side copy, generating presigned URLs for secure temporary access, and a practical folder sync script that handles both new and changed files efficiently with pagination support.

The next step is to extend the sync script with deletion support (remove remote objects not present locally) and ETag-based change detection using MD5 checksums for more accurate diffing. You can also add progress callbacks to track upload progress for large files using boto3’s callback parameter in upload_file.

For the full boto3 S3 documentation including all available operations, storage classes, and IAM policy examples, see the official boto3 S3 reference.

How To Use Python msgspec for Fast JSON Serialization

How To Use Python msgspec for Fast JSON Serialization

Intermediate

JSON serialization is in the hot path of almost every Python web service: every API response encodes data to JSON, every incoming request decodes it. If your service handles thousands of requests per second, JSON encoding time adds up fast. Python’s built-in json module is correct and convenient, but it was not built for speed — and libraries like orjson are fast but handle only basic Python types. What if you want both speed and type safety?

msgspec is a high-performance serialization library that encodes and decodes JSON (and MessagePack) 5-10x faster than the standard library while also providing automatic type validation. You define your data shape once using msgspec.Struct — like a Pydantic model but with a smaller memory footprint and faster instantiation — and msgspec handles encoding, decoding, and validation in a single C-extension call.

This article covers installing msgspec, defining Struct classes, encoding and decoding JSON, using type annotations for automatic validation, handling optional and nested fields, working with MessagePack, and benchmarking against the standard library. By the end you will have a complete toolkit for high-performance, type-safe JSON serialization in Python.

msgspec Quick Example

# quick_msgspec.py
import msgspec
import msgspec.json

class User(msgspec.Struct):
    name: str
    email: str
    age: int
    is_active: bool = True

# Encode to JSON bytes
user = User(name="Alice", email="alice@example.com", age=30)
encoded = msgspec.json.encode(user)
print(encoded)
print(type(encoded))

# Decode from JSON bytes -- returns a User instance with validation
raw = b'{"name":"Bob","email":"bob@example.com","age":25}'
decoded = msgspec.json.decode(raw, type=User)
print(decoded)
print(type(decoded))
b'{"name":"Alice","email":"alice@example.com","age":30,"is_active":true}'
<class 'bytes'>
User(name='Bob', email='bob@example.com', age=25, is_active=True)
<class '__main__.User'>

The encode() function serializes a Struct (or any supported Python type) to JSON bytes. The decode() function deserializes JSON bytes and validates them against a type — if the JSON does not match the expected shape, a ValidationError is raised. Unlike the stdlib json module, the output is bytes, not str, which is what HTTP servers and most network libraries expect anyway.

Python msgspec tutorial illustration 1
5x faster than json.dumps(). Your API deserves better.

What Is msgspec and When Should You Use It?

msgspec is a C-extension library by Jim Crist-Harif that provides both serialization performance and type safety. It is designed as a faster, lower-footprint alternative to Pydantic for use cases where you need validated deserialization at high throughput.

LibraryJSON SpeedValidationMemoryType
json (stdlib)BaselineNoLowdict/list
orjson5-10x fasterNoLowdict/list
msgspec5-10x fasterYesVery lowStruct
Pydantic v22-3x fasterYes (rich)HigherBaseModel

Use msgspec when you need high-throughput JSON encoding/decoding with type safety and low memory usage — particularly in web APIs (FastAPI, Starlette), event streaming, and data pipeline code where JSON parsing is in the hot path. Use Pydantic when you need rich validators, field aliases, custom serializers, or deep ecosystem integration. Use orjson when you need maximum speed with no schema requirements.

Installation

pip install msgspec
Successfully installed msgspec-0.18.6

msgspec ships as a pre-compiled C extension for Linux, macOS, and Windows on both CPython and PyPy. Import with import msgspec for Struct definitions and import msgspec.json for JSON operations.

Defining Structs

A msgspec.Struct is a fast, memory-efficient data class. It uses Python type annotations to define its fields and generates optimized __init__, __repr__, __eq__, and encoder/decoder hooks automatically.

# struct_basics.py
import msgspec
from typing import Optional, List
from datetime import datetime

class Address(msgspec.Struct):
    street: str
    city: str
    country: str
    postal_code: str = ""          # Default value

class Order(msgspec.Struct):
    order_id: str
    customer_name: str
    items: List[str]
    total: float
    shipping_address: Address       # Nested Struct
    created_at: datetime            # datetime is supported natively
    notes: Optional[str] = None    # Optional field

# Create instances
addr = Address(street="123 Main St", city="Austin", country="US", postal_code="78701")
order = Order(
    order_id="ORD-001",
    customer_name="Alice Smith",
    items=["Widget A", "Gadget B"],
    total=89.99,
    shipping_address=addr,
    created_at=datetime(2026, 4, 30, 10, 0, 0),
)

print(order)
print()
print(f"Order ID: {order.order_id}")
print(f"City: {order.shipping_address.city}")
print(f"Notes: {order.notes}")   # None -- the default
Order(order_id='ORD-001', customer_name='Alice Smith', items=['Widget A', 'Gadget B'], total=89.99, shipping_address=Address(street='123 Main St', city='Austin', country='US', postal_code='78701'), created_at=datetime.datetime(2026, 4, 30, 10, 0), notes=None)

Order ID: ORD-001
City: Austin
Notes: None

Structs are immutable by default (frozen). They have no __dict__, which makes them significantly more memory-efficient than regular Python objects or dataclasses. Access fields as attributes. Nested Structs, lists, dicts, and Python’s standard types (datetime, UUID, Decimal) are all supported as field types.

Encoding: Struct to JSON

Use msgspec.json.encode() to serialize any Struct or supported Python value to JSON bytes:

# encoding.py
import msgspec
import msgspec.json
from datetime import datetime

class Product(msgspec.Struct):
    id: int
    name: str
    price: float
    in_stock: bool
    tags: list

product = Product(id=1, name="Widget Pro", price=29.99,
                  in_stock=True, tags=["electronics", "gadgets"])

# encode returns bytes
json_bytes = msgspec.json.encode(product)
print(json_bytes)
print(type(json_bytes))

# Decode bytes to string if needed
json_str = json_bytes.decode("utf-8")
print(json_str)

# Encode a plain Python dict (msgspec handles these too)
data = {"key": "value", "num": 42, "flag": True}
print(msgspec.json.encode(data))

# Encode a list of Structs
products = [
    Product(id=i, name=f"Product {i}", price=9.99 * i,
            in_stock=i % 2 == 0, tags=[])
    for i in range(1, 4)
]
print(msgspec.json.encode(products))
b'{"id":1,"name":"Widget Pro","price":29.99,"in_stock":true,"tags":["electronics","gadgets"]}'
<class 'bytes'>
{"id":1,"name":"Widget Pro","price":29.99,"in_stock":true,"tags":["electronics","gadgets"]}
b'{"key":"value","num":42,"flag":true}'
b'[{"id":1,...},{"id":2,...},{"id":3,...}]'

Field names in the JSON output match the Struct attribute names exactly. If you need different JSON field names (e.g., camelCase in JSON, snake_case in Python), use msgspec.Struct with the rename option or rename individual fields with msgspec.field(name="camelCaseName").

Decoding with Validation

The decode() function deserializes JSON and validates it against your type annotation simultaneously. Invalid data raises a descriptive msgspec.ValidationError:

# decoding_validation.py
import msgspec
import msgspec.json
from typing import Optional, List

class UserProfile(msgspec.Struct):
    user_id: int
    username: str
    email: str
    score: float
    tags: List[str]
    bio: Optional[str] = None

# Decode valid JSON
valid_json = b'''
{
  "user_id": 42,
  "username": "alice",
  "email": "alice@example.com",
  "score": 9.5,
  "tags": ["python", "developer"],
  "bio": "Python enthusiast"
}
'''

profile = msgspec.json.decode(valid_json, type=UserProfile)
print(f"User: {profile.username}, Score: {profile.score}")
print(f"Tags: {profile.tags}")
print()

# Decode with missing optional field -- OK
minimal = b'{"user_id":1,"username":"bob","email":"b@b.com","score":7.0,"tags":[]}'
p2 = msgspec.json.decode(minimal, type=UserProfile)
print(f"Bio (optional): {p2.bio}")   # None
print()

# Decode with wrong type -- raises ValidationError
try:
    bad = b'{"user_id":"not_an_int","username":"x","email":"x@x.com","score":1.0,"tags":[]}'
    msgspec.json.decode(bad, type=UserProfile)
except msgspec.ValidationError as e:
    print(f"Validation error: {e}")

# Decode with missing required field -- raises ValidationError
try:
    missing = b'{"user_id":1,"username":"alice"}'
    msgspec.json.decode(missing, type=UserProfile)
except msgspec.ValidationError as e:
    print(f"Missing field error: {e}")
User: alice, Score: 9.5
Tags: ['python', 'developer']

Bio (optional): None

Validation error: Expected `int`, got `str` - at `$.user_id`
Missing field error: Object missing required field `email` - at `$`

The validation error messages include a JSON path ($.user_id) that tells you exactly which field failed and why. This replaces the typical pattern of calling json.loads() and then manually checking types — msgspec does both in one step, at C speed. The path notation ($ for the root, $.field for a specific field, $.list[0] for a list element) makes debugging API input validation errors straightforward.

Python msgspec tutorial illustration 2
ValidationError at $.user_id — caught at the door, not in production.

Reusable Encoder and Decoder Objects

For high-throughput code, create Encoder and Decoder objects once and reuse them. This avoids per-call setup overhead and enables encoder customization:

# encoder_decoder.py
import msgspec
import msgspec.json

class Event(msgspec.Struct):
    event_type: str
    payload: dict
    timestamp: float

# Create once at module level -- reuse for every request
encoder = msgspec.json.Encoder()
decoder = msgspec.json.Decoder(Event)

# Encode
event = Event(event_type="user_login", payload={"user_id": 42}, timestamp=1714483200.0)
data = encoder.encode(event)
print(data)

# Decode
raw = b'{"event_type":"purchase","payload":{"order_id":"123"},"timestamp":1714483260.0}'
decoded = decoder.decode(raw)
print(decoded)
print(f"Type: {decoded.event_type}, Order: {decoded.payload.get('order_id')}")
b'{"event_type":"user_login","payload":{"user_id":42},"timestamp":1714483200.0}'
Event(event_type='purchase', payload={'order_id': '123'}, timestamp=1714483260.0)
Type: purchase, Order: 123

Reusable encoder/decoder objects are the pattern to use in web servers (FastAPI, Flask) where the same type is encoded or decoded on every request. Create them at module level (outside of request handlers) so the per-type setup cost is paid once at startup, not on every request.

Real-Life Example: FastAPI Response Serialization

Here is how to use msgspec for high-performance JSON responses in a FastAPI application:

# fastapi_msgspec.py
# Install: pip install fastapi uvicorn msgspec
from fastapi import FastAPI
from fastapi.responses import Response
import msgspec
import msgspec.json
from typing import List
from datetime import datetime

app = FastAPI()

class Product(msgspec.Struct):
    id: int
    name: str
    price: float
    in_stock: bool
    updated_at: datetime

# Prebuilt encoder -- created once at module load time
encoder = msgspec.json.Encoder()

# Sample data (in a real app, this comes from a database)
PRODUCTS = [
    Product(id=i, name=f"Product {i}", price=round(9.99 * i, 2),
            in_stock=i % 3 != 0, updated_at=datetime(2026, 4, 30))
    for i in range(1, 101)
]

@app.get("/products")
def list_products() -> Response:
    """Return all products as JSON -- using msgspec for fast encoding."""
    return Response(
        content=encoder.encode(PRODUCTS),
        media_type="application/json"
    )

@app.get("/products/{product_id}")
def get_product(product_id: int) -> Response:
    """Return a single product by ID."""
    product = next((p for p in PRODUCTS if p.id == product_id), None)
    if product is None:
        return Response(content=b'{"error":"not found"}',
                        media_type="application/json", status_code=404)
    return Response(
        content=encoder.encode(product),
        media_type="application/json"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
# Run with: python fastapi_msgspec.py
# GET /products returns 100 products as JSON
# GET /products/1 returns a single product

By bypassing FastAPI’s built-in JSON serialization (which uses the stdlib json module via Pydantic) and using a msgspec Encoder directly, you get 5-10x faster serialization for response bodies. The key pattern: return fastapi.responses.Response with pre-encoded bytes instead of returning a Python dict and letting FastAPI encode it. This is the approach used by high-traffic Python APIs to shave milliseconds off every response.

Python msgspec tutorial illustration 3
5x faster serialization on every request. At 10,000 req/s, that’s 50,000 ms saved per second.

Frequently Asked Questions

When should I use msgspec instead of Pydantic?

Use msgspec when performance is the primary concern and you do not need Pydantic’s ecosystem features: validators on individual fields, aliases, computed fields, custom serializers, or deep framework integration (Django, SQLAlchemy). msgspec Structs are faster to create, use less memory, and encode/decode faster than Pydantic models. Use Pydantic when you need its rich validation API or when your framework requires it (FastAPI’s dependency injection uses Pydantic models natively).

What is MessagePack and when should I use it instead of JSON?

MessagePack is a binary serialization format that is more compact and faster to parse than JSON. Use msgspec.msgpack.encode() and msgspec.msgpack.decode() for internal service-to-service communication where both sides speak Python. MessagePack is about 30% smaller than equivalent JSON for typical data and faster to encode/decode. Stick with JSON when the data needs to be human-readable, logged, or consumed by a non-msgspec client.

How do I make a Struct mutable?

Pass frozen=False to the Struct class: class MyStruct(msgspec.Struct, frozen=False). By default, Structs are frozen (immutable). Mutable Structs allow field assignment (obj.field = value) and can be used where you need to update individual fields after construction. Frozen Structs are hashable (can be used as dict keys or in sets); mutable Structs are not.

Does msgspec support Struct inheritance?

Yes, with limitations. A Struct subclass inherits all parent fields. However, a subclass cannot override parent fields or change their types. Struct inheritance is useful for adding fields to a base type without redefining everything, but it is not as flexible as Python class inheritance. For polymorphic types, use typing.Union with a tag field to distinguish subtypes during decoding.

msgspec encodes to bytes, but I need a str. How do I convert?

Call .decode() on the bytes: json_str = msgspec.json.encode(obj).decode("utf-8"). This adds one small allocation, but the total is still faster than json.dumps(). Most HTTP servers (WSGI/ASGI) accept bytes directly in the response body, so the conversion is often unnecessary in practice.

Conclusion

msgspec gives you the best of both worlds: the speed of a C-extension JSON library and the type safety of a schema validation library. You have seen how to define Struct classes with type annotations, encode and decode JSON with automatic validation, use reusable Encoder/Decoder objects for high-throughput code, handle optional and nested fields, and wire msgspec into FastAPI for fast API responses. The official documentation at jcristharif.com/msgspec covers advanced topics including custom hooks, YAML/TOML support, and the full type system.

How To Use Python Faker for Generating Test Data

How To Use Python Faker for Generating Test Data

Intermediate

Testing with real user data is a privacy nightmare. Testing with obviously fake data like “test@test.com” and “John Doe” makes your UI look broken in screenshots and demos. You need realistic-looking data: proper names, valid-format emails, real city names, plausible phone numbers, and dates that fall in sensible ranges — without ever touching a real person’s information.

The Faker library generates realistic synthetic data for exactly this purpose. It produces names, addresses, emails, phone numbers, company names, job titles, credit card numbers, UUIDs, dates, lorem ipsum text, and dozens of other data types — all in 70+ locales so you can generate German addresses, Japanese names, or Brazilian phone numbers with a simple parameter change.

This article covers installing Faker, generating basic data types, using locales, seeding for reproducibility, generating bulk data efficiently, creating custom providers, and building a complete test data factory. By the end you will be able to populate any test database, seed any demo environment, and write fixtures for any data shape your tests need.

Python Faker: Quick Example

# quick_faker.py
from faker import Faker

fake = Faker()

print(fake.name())
print(fake.email())
print(fake.address())
print(fake.phone_number())
print(fake.date_of_birth(minimum_age=18, maximum_age=65))
Jennifer Smith
jennifer.smith@example.org
742 Evergreen Terrace
Springfield, IL 62701
(555) 867-5309
1988-07-14

Create a Faker() instance once, then call any provider method on it. Each call generates a new random value. The interface is designed to be readable — fake.name() returns a full name, fake.email() returns an email — so test data generation code reads almost like plain English.

Python faker tutorial illustration 1
Fake data that looks real. Real data that gets you fired. Choose wisely.

What Is Faker and When Should You Use It?

Faker is a Python port of the Ruby Faker gem, and provides “fake but realistic” data. This is different from random data (which looks obviously artificial) and different from real data (which carries privacy and compliance risks).

Data SourceRealisticPrivacy SafeReproducible
Faker (seeded)YesYesYes
Faker (unseeded)YesYesNo
random stringsNoYesWith seed
Real production dataYesNoYes

Use Faker in unit tests that need realistic-looking input data, when seeding a development database, when creating demo environments for client presentations, and when writing test fixtures that need varied but consistently-shaped data. Faker is not suitable for generating security tokens, cryptographic keys, or data that needs to satisfy complex business logic constraints — for that, use factory libraries like factory_boy or model_bakery that integrate with your ORM.

Installation

pip install Faker
Successfully installed Faker-24.3.0

Note the capital F in Faker for pip install. The import is also capitalized: from faker import Faker.

Core Providers: What Faker Can Generate

Faker organizes its data generators into “providers” — groups of related methods. Here are the most commonly used ones:

# core_providers.py
from faker import Faker

fake = Faker()

# Person data
print("=== Person ===")
print(fake.first_name(), fake.last_name())
print(fake.name())
print(fake.prefix(), fake.suffix())
print(fake.job())
print(fake.company())
print()

# Contact data
print("=== Contact ===")
print(fake.email())
print(fake.phone_number())
print(fake.url())
print(fake.user_name())
print()

# Location data
print("=== Location ===")
print(fake.address())
print(fake.city(), fake.state(), fake.postcode())
print(fake.country())
print(fake.latitude(), fake.longitude())
print()

# Date and time data
print("=== Dates ===")
print(fake.date_this_year())
print(fake.date_of_birth(minimum_age=21, maximum_age=60))
print(fake.date_time_between(start_date="-1y", end_date="now"))
print()

# Internet data
print("=== Internet ===")
print(fake.ipv4())
print(fake.mac_address())
print(fake.user_agent())
print()

# Finance data
print("=== Finance ===")
print(fake.credit_card_number())
print(fake.currency_code())
print(fake.pricetag())
=== Person ===
Jennifer Smith
Dr. Michael Johnson Jr.
Data Scientist
Tech Solutions Inc.

=== Contact ===
m.smith@example.com
(555) 123-4567
https://example.org/path
jennifer_s

=== Location ===
123 Main St
Springfield, IL 62701
United States
41.8781 -87.6298

=== Dates ===
2026-03-15
1985-04-22
2025-11-03 14:32:01

=== Internet ===
192.168.1.47
aa:bb:cc:dd:ee:ff
Mozilla/5.0 (Windows NT 10.0)

=== Finance ===
4532015112830366
USD
$47.99

Faker has over 100 provider methods in the standard library. The complete list is in the official documentation. Providers are grouped by theme: faker.providers.person, faker.providers.address, faker.providers.internet, and so on. When you call fake.email(), Faker uses the internet provider internally.

Using Locales for International Data

Pass a locale string to Faker() to generate data that matches a specific region’s conventions: Japanese names, French addresses, German phone number formats, etc.

# locales.py
from faker import Faker

# Single locale
fake_de = Faker("de_DE")   # German
fake_ja = Faker("ja_JP")   # Japanese
fake_br = Faker("pt_BR")   # Brazilian Portuguese

print("=== German ===")
print(fake_de.name())
print(fake_de.address())
print(fake_de.phone_number())
print()

print("=== Japanese ===")
print(fake_ja.name())
print(fake_ja.address())
print()

print("=== Brazilian ===")
print(fake_br.name())
print(fake_br.cpf())   # CPF is a Brazilian ID number

# Multiple locales -- randomly picks from all of them
fake_multi = Faker(["en_US", "de_DE", "fr_FR", "ja_JP"])
print("\n=== Multi-locale (random each call) ===")
for _ in range(5):
    print(fake_multi.name())
=== German ===
Klaus Muller
Hauptstrasse 15, 80331 Munchen
+49 89 12345678

=== Japanese ===
Yamamoto Taro
Tokyo-to, Shinjuku-ku

=== Brazilian ===
Carlos Silva
123.456.789-09

=== Multi-locale (random each call) ===
Alice Dupont
Taro Yamamoto
Heinrich Braun
Jennifer Smith
Marie Martin

The multi-locale mode selects randomly from the provided locales on each call. This is useful when testing an application that serves international users — your test data will naturally include a mix of name formats, address styles, and character sets, revealing encoding bugs and layout issues that single-locale test data would miss.

Python faker tutorial illustration 2
Faker(‘ja_JP’) — your app has an international bug you haven’t found yet.

Seeding for Reproducible Test Data

By default, Faker generates different data on every run. For tests where you need the same data every time (stable test fixtures, snapshot tests, regression tests), use a seed:

# seeded_faker.py
from faker import Faker

# Seed makes output deterministic
Faker.seed(42)
fake = Faker()

print("Run 1:")
for _ in range(3):
    print(f"  {fake.name()} | {fake.email()}")

# Reset and re-seed -- same output
Faker.seed(42)
fake2 = Faker()

print("\nRun 2 (same seed):")
for _ in range(3):
    print(f"  {fake2.name()} | {fake2.email()}")
Run 1:
  Jennifer Smith | j.smith@example.org
  Michael Johnson | m.johnson@example.com
  Sarah Davis | sarah.d@example.net

Run 2 (same seed):
  Jennifer Smith | j.smith@example.org
  Michael Johnson | m.johnson@example.com
  Sarah Davis | sarah.d@example.net

Faker.seed() is a class-level call that sets the seed for all Faker instances. Use a fixed seed in your test setup to guarantee that test data is identical across runs, environments, and CI servers. Use no seed (or time-based seed) when you want different data every run to catch more edge cases. The tradeoff: seeded tests are stable and debuggable; unseeded tests provide broader coverage but can have intermittent failures.

Generating Bulk Data for Database Seeding

Generating large datasets for database seeding or performance testing requires generating thousands of records efficiently:

# bulk_seeding.py
from faker import Faker
import json
import time

fake = Faker()
Faker.seed(0)

def generate_users(count):
    """Generate a list of fake user records."""
    users = []
    for _ in range(count):
        users.append({
            "id":         fake.uuid4(),
            "username":   fake.user_name(),
            "email":      fake.email(),
            "full_name":  fake.name(),
            "job":        fake.job(),
            "city":       fake.city(),
            "country":    fake.country_code(),
            "joined_at":  fake.date_time_between(
                              start_date="-2y", end_date="now"
                          ).isoformat(),
            "is_active":  fake.boolean(chance_of_getting_true=80),
            "score":      round(fake.pyfloat(min_value=0, max_value=100, right_digits=2), 2),
        })
    return users

start = time.monotonic()
users = generate_users(10_000)
elapsed = time.monotonic() - start

print(f"Generated {len(users):,} users in {elapsed:.3f}s")
print(f"Sample record:")
print(json.dumps(users[0], indent=2))
Generated 10,000 users in 1.842s
Sample record:
{
  "id": "a3f2c1d4-...",
  "username": "jennifer_s42",
  "email": "j.smith@example.com",
  "full_name": "Jennifer Smith",
  "job": "Data Scientist",
  "city": "Austin",
  "country": "US",
  "joined_at": "2025-03-15T14:32:01",
  "is_active": true,
  "score": 73.45
}

Faker generates 10,000 records in about 2 seconds — fast enough for most seeding use cases. For performance-critical bulk generation (millions of records), consider using Faker in a multiprocessing pool or pre-generating data to a file. Note that fake.boolean(chance_of_getting_true=80) gives you weighted random booleans — 80% True, 20% False — which produces more realistic distributions than 50/50.

Python faker tutorial illustration 3
10,000 test users in 1.8 seconds. Your QA team has no more excuses.

Real-Life Example: Pytest Fixture Factory

Here is how to wire Faker into a pytest fixture factory — a reusable pattern that generates test objects for different scenarios:

# test_user_service.py
import pytest
from faker import Faker
from dataclasses import dataclass
from typing import Optional

fake = Faker()
Faker.seed(999)   # Stable test data across runs

@dataclass
class User:
    id: int
    name: str
    email: str
    age: int
    is_active: bool = True

class UserFactory:
    """Factory for creating fake User objects in tests."""
    _id_counter = 1

    @classmethod
    def create(cls, **overrides) -> User:
        """Create a User with fake data. Pass kwargs to override specific fields."""
        defaults = {
            "id":        cls._id_counter,
            "name":      fake.name(),
            "email":     fake.email(),
            "age":       fake.random_int(min=18, max=70),
            "is_active": True,
        }
        cls._id_counter += 1
        return User(**{**defaults, **overrides})

    @classmethod
    def create_batch(cls, count: int, **overrides):
        return [cls.create(**overrides) for _ in range(count)]

# Simple service to test
def get_active_users(users):
    return [u for u in users if u.is_active]

def get_adult_users(users):
    return [u for u in users if u.age >= 18]

# Tests
def test_get_active_users():
    active = UserFactory.create_batch(3, is_active=True)
    inactive = UserFactory.create_batch(2, is_active=False)
    all_users = active + inactive

    result = get_active_users(all_users)
    assert len(result) == 3
    assert all(u.is_active for u in result)

def test_get_adult_users():
    adults = UserFactory.create_batch(4, age=25)
    minors = UserFactory.create_batch(2, age=16)
    all_users = adults + minors

    result = get_adult_users(all_users)
    assert len(result) == 4

def test_single_user_override():
    # Override just the email -- all other fields are fake
    user = UserFactory.create(email="specific@test.com")
    assert user.email == "specific@test.com"
    assert len(user.name) > 0   # name is still fake

if __name__ == "__main__":
    # Quick sanity check without pytest
    test_get_active_users()
    test_get_adult_users()
    test_single_user_override()
    print("All tests passed!")
All tests passed!

The factory pattern with **overrides is the standard approach: each test specifies only the fields it cares about and lets Faker fill in the rest. This keeps tests focused and readable. When you need to test a specific email format, you pass email="..."; everything else stays realistic. The _id_counter ensures each factory-created object has a unique ID across all tests in the session.

Frequently Asked Questions

How do I generate unique values (no duplicates)?

Use fake.unique.email() — the unique proxy ensures no value is repeated within the current Faker instance’s session. It tracks all previously returned values and retries until it finds a new one. Call fake.unique.clear() to reset the uniqueness tracking if you need to generate fresh values. Note that uniqueness is per-instance and per-provider: fake.unique.email() tracks emails separately from fake.unique.name().

Can I add my own custom data generators to Faker?

Yes. Subclass BaseProvider and add it: fake.add_provider(MyProvider). Your provider methods become available as fake.my_method(). This is useful for domain-specific data: product SKUs, internal ID formats, company-specific status codes. The Faker documentation has a full example of a custom provider.

How do I generate data in a specific format like a US SSN?

Use fake.ssn() from the en_US locale. Faker has locale-specific providers for national ID formats, postal codes, phone number formats, and currency. If a provider does not exist for your format, use fake.numerify("###-##-####") to generate a numeric pattern where # is replaced by a random digit, or fake.bothify("??##") which replaces ? with a letter and # with a digit.

Is Faker fast enough for generating millions of records?

Faker generates roughly 5,000-10,000 records per second for typical use cases. For millions of records, use multiprocessing.Pool to parallelize across CPU cores, or generate data in batches and write to disk incrementally. For very large datasets, consider mimesis, an alternative to Faker that claims 10x faster generation by avoiding Python’s slow string operations in some providers.

Why does my seeded Faker still produce different results between runs?

The most common cause: calling Faker.seed(n) after creating the instance, or creating multiple instances before seeding. The correct order is: call Faker.seed(n) first, then create your Faker() instance. If your tests run in a different order between sessions, the number of Faker calls before your test code can also vary, shifting the output. Use a fixture that resets the seed before each test: @pytest.fixture(autouse=True) def seed_faker(): Faker.seed(42).

Conclusion

Faker is the standard tool for generating realistic synthetic test data in Python. You have seen how to use core providers for names, emails, addresses, and dates; how locales let you generate international data; how seeding makes test data deterministic; how to generate bulk datasets for database seeding; and how the factory pattern with **overrides integrates cleanly into pytest. The official Faker documentation at faker.readthedocs.io has the complete list of providers across all 70+ supported locales.

How To Use Python Watchdog for Filesystem Monitoring

How To Use Python Watchdog for Filesystem Monitoring

Intermediate

Your application needs to react when a file changes: reload configuration when config.yaml is updated, trigger processing when a new CSV appears in a watched folder, or invalidate a cache when a template file is modified. Polling with a loop and os.stat() works but wastes CPU, misses rapid changes, and is painful to write correctly. There is a better way.

The watchdog library provides cross-platform filesystem monitoring using native OS APIs (inotify on Linux, kqueue on macOS, ReadDirectoryChangesW on Windows) with a clean Python event handler interface. You define what to watch and what to do, and watchdog calls your handler the moment a change happens — no polling, no missed events.

This article covers installing watchdog, creating a basic file watcher, filtering events by file type and path, using the pattern-based PatternMatchingEventHandler, watching multiple directories, debouncing rapid events, and building a real-world auto-processing pipeline. By the end you will have a complete toolkit for reacting to filesystem changes in any Python application.

Python watchdog: Quick Example

Here is a minimal watchdog script that prints a message whenever any file in the current directory changes:

# quick_watch.py
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"Modified: {event.src_path}")

    def on_created(self, event):
        if not event.is_directory:
            print(f"Created:  {event.src_path}")

observer = Observer()
observer.schedule(MyHandler(), path=".", recursive=False)
observer.start()

print("Watching current directory. Press Ctrl+C to stop.")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()
Watching current directory. Press Ctrl+C to stop.
Created:  ./test.txt
Modified: ./test.txt
Modified: ./config.json

The pattern is always: subclass FileSystemEventHandler, override the event methods you care about, create an Observer, schedule the handler with observer.schedule(), and call observer.start(). The observer runs in a background thread; your main thread stays free for other work.

Python watchdog tutorial illustration 1
inotify under the hood. Your CPU says thank you.

What Is watchdog and How Does It Work?

watchdog is a Python library that wraps OS-level filesystem notification APIs to detect file and directory changes in real time. Unlike a polling approach that checks file modification times on a timer, watchdog is event-driven: the OS notifies watchdog the instant a change occurs.

PlatformWatchdog BackendAPI Used
LinuxInotifyObserverinotify
macOSFSEventsObserverFSEvents / kqueue
WindowsWindowsApiObserverReadDirectoryChangesW
All (fallback)PollingObserverstat() polling

When you use from watchdog.observers import Observer, watchdog automatically selects the best backend for your platform. The PollingObserver is the cross-platform fallback and works everywhere, but it uses stat() polling so it is less efficient. The native backends are event-driven and have essentially zero CPU overhead when files are not changing.

Installation

pip install watchdog
Successfully installed watchdog-4.0.1

On Linux, watchdog uses the kernel’s inotify API which has limits on the number of watched files. If you watch large directory trees, you may need to increase the inotify limit: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p.

Understanding Event Types

watchdog fires different event types depending on what happened. Override the corresponding method in your handler:

# event_types.py
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time

class VerboseHandler(FileSystemEventHandler):
    """Override all event methods to see what watchdog reports."""

    def on_created(self, event):
        kind = "directory" if event.is_directory else "file"
        print(f"CREATED  [{kind}]: {event.src_path}")

    def on_deleted(self, event):
        kind = "directory" if event.is_directory else "file"
        print(f"DELETED  [{kind}]: {event.src_path}")

    def on_modified(self, event):
        if not event.is_directory:   # Ignore directory modify events (very noisy)
            print(f"MODIFIED [file]: {event.src_path}")

    def on_moved(self, event):
        print(f"MOVED    : {event.src_path} -> {event.dest_path}")

    def on_any_event(self, event):
        # Called for every event -- useful for logging
        pass   # Commented out to avoid duplicating the above

observer = Observer()
observer.schedule(VerboseHandler(), path="./watched", recursive=True)
observer.start()
print("Watching ./watched recursively...")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()
Watching ./watched recursively...
CREATED  [file]: watched/report.csv
MODIFIED [file]: watched/report.csv
CREATED  [file]: watched/subdir/data.json
DELETED  [file]: watched/old_file.txt
MOVED    : watched/draft.txt -> watched/final.txt

Directory modification events fire whenever a file inside the directory is created or deleted (because the directory’s metadata changes). They are almost always noise for file-processing use cases, which is why filtering with if not event.is_directory is the standard pattern. The on_moved event includes both event.src_path (original path) and event.dest_path (new path), making it easy to handle renames.

Filtering by File Pattern with PatternMatchingEventHandler

Instead of writing if/else logic in your handler to filter file types, use PatternMatchingEventHandler. It handles pattern filtering for you and only calls your methods for files that match.

# pattern_handler.py
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import time

class CSVHandler(PatternMatchingEventHandler):
    def __init__(self):
        super().__init__(
            patterns=["*.csv", "*.tsv"],      # Only watch CSV and TSV files
            ignore_patterns=["*.tmp", "~*"],  # Skip temp files
            ignore_directories=True,          # Skip directory events
            case_sensitive=False              # Case-insensitive on all platforms
        )

    def on_created(self, event):
        print(f"New data file: {event.src_path}")
        self.process_file(event.src_path)

    def on_modified(self, event):
        print(f"Updated data file: {event.src_path}")
        self.process_file(event.src_path)

    def process_file(self, filepath):
        """Simulate processing the file."""
        print(f"  -> Processing {filepath}...")

observer = Observer()
observer.schedule(CSVHandler(), path="./data_inbox", recursive=False)
observer.start()
print("Watching ./data_inbox for CSV/TSV files...")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()
Watching ./data_inbox for CSV/TSV files...
New data file: ./data_inbox/sales_april.csv
  -> Processing ./data_inbox/sales_april.csv...
New data file: ./data_inbox/inventory.tsv
  -> Processing ./data_inbox/inventory.tsv...

PatternMatchingEventHandler uses Unix shell-style wildcards (*, ?, [seq]) via Python’s fnmatch module. The ignore_patterns parameter prevents processing of temporary files that editors create while saving (many editors write to a ~file.txt or file.txt.tmp before renaming to the final name, which would otherwise trigger your handler multiple times per save).

Python watchdog tutorial illustration 2
ignore_patterns=[‘*.tmp’] — because editors are messy and so are you.

Debouncing Rapid Events

A common problem: when a user saves a large file, watchdog may fire on_modified several times in rapid succession as the OS writes data in chunks. If your handler does expensive work (like reloading a config or triggering a build), you want to process it only once after the writes settle down.

# debounce_handler.py
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import threading
import time

class DebouncedHandler(PatternMatchingEventHandler):
    def __init__(self, debounce_seconds=0.5):
        super().__init__(patterns=["*.json", "*.yaml"],
                         ignore_directories=True)
        self.debounce_seconds = debounce_seconds
        self._timers = {}   # path -> threading.Timer

    def on_any_event(self, event):
        if event.is_directory:
            return
        path = event.src_path
        # Cancel existing timer for this path (reset the debounce window)
        if path in self._timers:
            self._timers[path].cancel()
        # Start a new timer
        timer = threading.Timer(
            self.debounce_seconds,
            self._process,
            args=[path, event.event_type]
        )
        timer.start()
        self._timers[path] = timer

    def _process(self, path, event_type):
        """Called once after rapid events settle down."""
        print(f"[{event_type}] Processing settled: {path}")
        del self._timers[path]

observer = Observer()
observer.schedule(DebouncedHandler(debounce_seconds=0.5), path="./config", recursive=False)
observer.start()
print("Watching ./config with 500ms debounce...")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()
Watching ./config with 500ms debounce...
[modified] Processing settled: ./config/settings.json

The debounce pattern uses threading.Timer to delay execution. Each time an event fires for the same path, the existing timer is cancelled and a new one is started. Only after the events stop for 500ms (no new events in the debounce window) does _process() actually run. This ensures your handler runs exactly once per “save session” regardless of how many OS-level write events the save generates.

Real-Life Example: Auto-Processing Data Inbox

Here is a production-style script that watches an inbox folder for new CSV files, processes them, and moves them to a processed archive — a common pattern in data pipeline and ETL workflows.

# data_inbox_processor.py
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import os
import shutil
import time
import threading
from datetime import datetime

INBOX_DIR  = "./inbox"
DONE_DIR   = "./processed"
ERROR_DIR  = "./errors"

# Create directories if they don't exist
for d in [INBOX_DIR, DONE_DIR, ERROR_DIR]:
    os.makedirs(d, exist_ok=True)

class InboxHandler(PatternMatchingEventHandler):
    def __init__(self):
        super().__init__(
            patterns=["*.csv"],
            ignore_patterns=["*.tmp", ".~*"],
            ignore_directories=True
        )
        # Debounce: wait 1 second after last event before processing
        self._pending = {}

    def on_created(self, event):
        self._schedule(event.src_path)

    def on_moved(self, event):
        # File was renamed/moved into the watched folder
        if event.dest_path.endswith(".csv"):
            self._schedule(event.dest_path)

    def _schedule(self, path):
        if path in self._pending:
            self._pending[path].cancel()
        t = threading.Timer(1.0, self._process, args=[path])
        t.start()
        self._pending[path] = t

    def _process(self, filepath):
        """Process a single CSV file."""
        self._pending.pop(filepath, None)
        if not os.path.exists(filepath):
            return  # File disappeared before we could process it

        filename = os.path.basename(filepath)
        print(f"[{datetime.now():%H:%M:%S}] Processing: {filename}")
        try:
            row_count = self._count_rows(filepath)
            print(f"  Rows: {row_count}")

            # Archive to processed folder with timestamp
            ts = datetime.now().strftime("%Y%m%d_%H%M%S")
            dest = os.path.join(DONE_DIR, f"{ts}_{filename}")
            shutil.move(filepath, dest)
            print(f"  Archived to: {dest}")

        except Exception as e:
            print(f"  ERROR: {e}")
            dest = os.path.join(ERROR_DIR, filename)
            shutil.move(filepath, dest)
            print(f"  Moved to errors: {dest}")

    def _count_rows(self, filepath):
        """Count non-header rows in a CSV file."""
        with open(filepath, "r") as f:
            return sum(1 for line in f) - 1   # Subtract header row

if __name__ == "__main__":
    observer = Observer()
    observer.schedule(InboxHandler(), path=INBOX_DIR, recursive=False)
    observer.start()
    print(f"Data inbox processor running.")
    print(f"  Watching: {os.path.abspath(INBOX_DIR)}")
    print(f"  Archive : {os.path.abspath(DONE_DIR)}")
    print(f"  Errors  : {os.path.abspath(ERROR_DIR)}")
    print("Drop CSV files into the inbox folder to process them.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
    print("Processor stopped.")
Data inbox processor running.
  Watching: /home/user/project/inbox
  Archive : /home/user/project/processed
  Errors  : /home/user/project/errors
Drop CSV files into the inbox folder to process them.
[10:15:33] Processing: sales_data.csv
  Rows: 847
  Archived to: processed/20260430_101534_sales_data.csv

This is the pattern used in ETL pipelines, document processing systems, and data ingestion services. The 1-second debounce handles editors that write temp files before the final rename, the on_moved override catches files copied from another location, and the error directory ensures bad files are preserved for investigation rather than silently dropped.

Python watchdog tutorial illustration 3
Filesystem events are free. Your polling loop costs you every second.

Frequently Asked Questions

What does recursive=True do and when should I use it?

When recursive=True is passed to observer.schedule(), watchdog monitors all subdirectories under the watched path, not just the top-level directory. Use it when files may arrive in subdirectories. Be careful with very large directory trees — on Linux, each subdirectory consumes an inotify watch slot. For deep trees with thousands of subdirectories, increase the inotify limit or use a more targeted path.

When should I use PollingObserver instead of Observer?

Use PollingObserver when watching network-mounted filesystems (NFS, SMB/CIFS, Docker volume mounts) because inotify and FSEvents do not receive events for remote changes. Import it with from watchdog.observers.polling import PollingObserver. It polls every second by default; pass a timeout argument to change the interval. Also use it when running in Docker containers where the native filesystem events may not propagate from the host.

No, watchdog watches the path as given and does not follow symlinks by default. If you watch a symlink, you watch the link itself, not the target directory. To watch the real path, resolve the symlink first with os.path.realpath(path) before passing it to observer.schedule().

Can I watch multiple directories with one observer?

Yes. Call observer.schedule() multiple times with different paths and handlers. One Observer can manage dozens of watches efficiently. You can also use the same handler for multiple paths. The event.src_path attribute always tells you which file triggered the event, so you can differentiate between watched directories in a single handler if needed.

Why does my handler fire twice when I save a file in a text editor?

Most text editors use an atomic save pattern: write to a temporary file, then rename it to the final name. This generates two events: a created event for the temp file and a moved event for the rename. Some editors also fire an extra modified event. Use debouncing (as shown above) or filter by filename to ignore temp files (ignore_patterns=["*.tmp", ".~*", "#*"]). The debounce approach is more robust because temp file naming conventions vary by editor.

Conclusion

watchdog gives you efficient, event-driven filesystem monitoring using the native OS APIs on every platform. You have seen how to subclass FileSystemEventHandler for full control, use PatternMatchingEventHandler for clean file-type filtering, implement debouncing to handle rapid successive events, and build a complete data inbox pipeline that watches, processes, archives, and handles errors. The official watchdog documentation at python-watchdog.readthedocs.io covers advanced topics like custom observers and event queue management.

How To Use APScheduler for Task Scheduling in Python

How To Use APScheduler for Task Scheduling in Python

Intermediate

Every application eventually needs to run tasks on a schedule: send a nightly report, clean up expired cache entries every hour, poll an API every 30 seconds, or run a database backup at 2 AM every Sunday. Python’s built-in sched module is too low-level, and spinning up a cron job for every small task in a Python app adds operational overhead. APScheduler fills this gap perfectly.

APScheduler (Advanced Python Scheduler) is a mature, battle-tested scheduling library that runs inside your Python process. It supports three trigger types — interval, cron, and date — and three execution modes: blocking, background thread, and async. It works in any Python application without needing an external daemon or message broker.

This article covers installing APScheduler, running your first interval job, using cron expressions for time-based scheduling, running one-off delayed jobs with the date trigger, managing jobs at runtime (pause, resume, remove), persisting jobs across restarts with job stores, and combining everything into a production-ready task scheduler. By the end you will have a complete scheduling toolkit that fits inside any Python application.

APScheduler: Quick Example

Here is a working APScheduler script that runs a job every 5 seconds in the background while your main program continues:

# quick_scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
import time

def my_job():
    print("Job running!")

scheduler = BackgroundScheduler()
scheduler.add_job(my_job, "interval", seconds=5)
scheduler.start()

print("Scheduler started. Press Ctrl+C to stop.")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()
    print("Scheduler stopped.")
Scheduler started. Press Ctrl+C to stop.
Job running!
Job running!
Job running!
^CScheduler stopped.

The BackgroundScheduler runs jobs in a daemon thread, so your main thread stays free. The add_job() method takes the function to call, the trigger type ("interval"), and trigger-specific keyword arguments. The start() and shutdown() calls control the scheduler lifecycle.

Python apscheduler tutorial illustration 1
scheduler.add_job() — because hardcoded sleep() loops are a cry for help.

What Is APScheduler and When Should You Use It?

APScheduler is an in-process task scheduler for Python applications. “In-process” means the scheduler lives inside your Python program — no Redis, no Celery worker, no cron daemon required. This makes it ideal for lightweight automation tasks that belong inside your application.

ToolBest ForRequiresIn-Process
APSchedulerApp-embedded schedulingNothing externalYes
Celery BeatDistributed task queuesRedis or RabbitMQNo
cron (OS)System-level scriptsUnix/Linux OSNo
schedule libSimple interval jobsNothing externalYes
rq-schedulerQueue-backed schedulingRedisNo

Use APScheduler when you want scheduling built into your Python application without external dependencies. It is the right choice for a Flask/FastAPI web app that needs background jobs, a data pipeline script that runs tasks on varying schedules, or any service that needs more power than the simple schedule library but does not need the full Celery stack.

Installation

Install APScheduler with pip:

pip install apscheduler
Successfully installed APScheduler-3.10.4

APScheduler 3.x is the stable release as of 2026. APScheduler 4.x is in development and has a different API. This article uses 3.x. Verify your version with python -c "import apscheduler; print(apscheduler.__version__)".

Choosing a Scheduler Type

APScheduler provides different scheduler classes depending on how you want to run jobs:

# scheduler_types.py
# BlockingScheduler -- takes over the main thread, good for standalone scripts
from apscheduler.schedulers.background import BackgroundScheduler   # runs in a daemon thread
from apscheduler.schedulers.blocking import BlockingScheduler        # blocks main thread
# from apscheduler.schedulers.asyncio import AsyncIOScheduler        # for asyncio apps

# For most apps: BackgroundScheduler
scheduler = BackgroundScheduler()

# For a standalone scheduler script with nothing else to do:
# scheduler = BlockingScheduler()
# scheduler.start()  # This blocks until scheduler.shutdown() is called

print("BackgroundScheduler and BlockingScheduler imported OK")
BackgroundScheduler and BlockingScheduler imported OK

BackgroundScheduler is the most flexible choice: it runs in a daemon thread so your main thread stays free to do other work (serve HTTP requests, wait for user input, etc.). BlockingScheduler makes sense for a script whose only job is running scheduled tasks. AsyncIOScheduler is for FastAPI or any asyncio-based application where you need to await the scheduled functions.

The Three Trigger Types

APScheduler has three built-in trigger types. Understanding when to use each one is the key to using APScheduler effectively.

Interval Trigger — Run Every N Units of Time

The interval trigger runs a job repeatedly with a fixed time gap between executions. Use it for polling, heartbeats, and periodic cleanup tasks.

# interval_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import datetime

def cleanup_expired_sessions():
    print(f"[{datetime.now():%H:%M:%S}] Cleaning up expired sessions...")

def poll_api():
    print(f"[{datetime.now():%H:%M:%S}] Polling external API...")

scheduler = BackgroundScheduler()

# Run every 10 seconds
scheduler.add_job(cleanup_expired_sessions, "interval", seconds=10)

# Run every 2 minutes -- combine time units
scheduler.add_job(poll_api, "interval", minutes=2, seconds=30)

# interval with start_date -- delay first run
scheduler.add_job(
    poll_api,
    "interval",
    hours=1,
    start_date="2026-05-01 08:00:00"
)

scheduler.start()
print("Scheduler running...")
time.sleep(25)
scheduler.shutdown()
print("Done.")
Scheduler running...
[10:00:00] Cleaning up expired sessions...
[10:00:10] Cleaning up expired sessions...
[10:00:20] Cleaning up expired sessions...
Done.

Interval trigger arguments include weeks, days, hours, minutes, seconds, and microseconds. You can combine them: hours=1, minutes=30 runs the job every 90 minutes. The optional start_date delays the first run until a specific datetime.

Python apscheduler tutorial illustration 2
Every 10 seconds, forever. Your server’s new roommate.

Cron Trigger — Run on a Schedule

The cron trigger uses cron-style expressions to run jobs at specific times, days of the week, or calendar positions. It is the right tool for “run at 3 AM every Sunday” or “run on the 1st of every month”.

# cron_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import datetime

def generate_weekly_report():
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Generating weekly sales report...")

def send_daily_digest():
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Sending daily digest email...")

def monthly_db_backup():
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Running monthly database backup...")

scheduler = BackgroundScheduler()

# Every Monday at 8:00 AM
scheduler.add_job(generate_weekly_report, "cron",
                  day_of_week="mon", hour=8, minute=0)

# Every day at 6:30 PM
scheduler.add_job(send_daily_digest, "cron",
                  hour=18, minute=30)

# First day of every month at 2:00 AM
scheduler.add_job(monthly_db_backup, "cron",
                  day=1, hour=2, minute=0)

# Every weekday (Mon-Fri) at noon
scheduler.add_job(
    send_daily_digest, "cron",
    day_of_week="mon-fri", hour=12, minute=0
)

scheduler.start()
print("Cron scheduler running. Ctrl+C to stop.")
try:
    while True:
        time.sleep(60)
except KeyboardInterrupt:
    scheduler.shutdown()
Cron scheduler running. Ctrl+C to stop.
[2026-05-01 02:00] Running monthly database backup...
[2026-05-01 06:30] Sending daily digest email...

Cron trigger fields include year, month, day, week, day_of_week, hour, minute, and second. Each field accepts integers, ranges ("1-5"), lists ("1,15"), steps ("*/2" for every 2 units), and names ("mon-fri"). Omitted fields default to * (every value), so hour=12 alone means “every day at noon every minute of that hour” — you almost always want to specify minute=0 alongside hour.

Date Trigger — Run Once at a Specific Time

The date trigger schedules a job to run exactly once at a specified datetime. Use it for deferred execution, one-off reminders, or delayed notifications.

# date_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import time

def send_welcome_email(user_email):
    print(f"Sending welcome email to {user_email}")

def process_payment(order_id, amount):
    print(f"Processing payment for order {order_id}: ${amount}")

scheduler = BackgroundScheduler()

# Run in 5 seconds (simulate a delayed notification)
run_at = datetime.now() + timedelta(seconds=5)
scheduler.add_job(send_welcome_email, "date",
                  run_date=run_at,
                  args=["alice@example.com"])

# Run in 10 seconds with keyword arguments
scheduler.add_job(process_payment, "date",
                  run_date=datetime.now() + timedelta(seconds=10),
                  kwargs={"order_id": "ORD-001", "amount": 49.99})

scheduler.start()
print(f"Jobs scheduled. Waiting...")
time.sleep(15)
scheduler.shutdown()
print("Done.")
Jobs scheduled. Waiting...
Sending welcome email to alice@example.com
Processing payment for order ORD-001: $49.99
Done.

The args parameter passes positional arguments to the job function; kwargs passes keyword arguments. This lets you schedule parameterized jobs dynamically — for example, scheduling a welcome email when a user registers, with their email address baked into the scheduled call.

Managing Jobs at Runtime

APScheduler gives you full control over jobs after they are scheduled. You can pause, resume, modify, and remove jobs while the scheduler is running.

# job_management.py
from apscheduler.schedulers.background import BackgroundScheduler
import time

def counter_job():
    print(f"Tick!")

scheduler = BackgroundScheduler()

# Add job and capture its ID
job = scheduler.add_job(counter_job, "interval", seconds=3, id="my_counter")
print(f"Added job: {job.id}")

scheduler.start()
time.sleep(7)

# Pause the job
scheduler.pause_job("my_counter")
print("Job paused.")
time.sleep(5)  # Nothing fires during pause

# Resume the job
scheduler.resume_job("my_counter")
print("Job resumed.")
time.sleep(7)

# Modify the interval to every 1 second
scheduler.reschedule_job("my_counter", trigger="interval", seconds=1)
print("Job rescheduled to every 1 second.")
time.sleep(5)

# List all jobs
for j in scheduler.get_jobs():
    print(f"Job: {j.id} | Next run: {j.next_run_time}")

# Remove the job
scheduler.remove_job("my_counter")
print("Job removed.")

scheduler.shutdown()
Added job: my_counter
Tick!
Tick!
Job paused.
Job resumed.
Tick!
Tick!
Job rescheduled to every 1 second.
Tick!
Tick!
Tick!
Job: my_counter | Next run: 2026-04-30 10:00:15+00:00
Job removed.

Always use explicit id strings when adding jobs you plan to manage later. Without an ID, APScheduler generates a UUID, making it hard to reference the job later. The get_jobs() method returns a list of all active Job objects with properties like id, next_run_time, and func.

Python apscheduler tutorial illustration 3
pause_job(), resume_job(), remove_job() — the runtime controls cron never gave you.

Real-Life Example: Automated Site Health Monitor

Here is a complete monitoring script that checks a website’s health on multiple schedules — a fast heartbeat check every minute, a detailed check every 15 minutes, and a daily summary report.

# site_monitor.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
import urllib.request
import urllib.error

SITE_URL = "https://httpbin.org/status/200"   # Use httpbin for testing
stats = {"checks": 0, "failures": 0, "last_status": None}

def heartbeat_check():
    """Quick ping every 60 seconds."""
    try:
        req = urllib.request.urlopen(SITE_URL, timeout=5)
        stats["last_status"] = req.getcode()
        stats["checks"] += 1
        print(f"[{datetime.now():%H:%M:%S}] Heartbeat OK ({req.getcode()})")
    except urllib.error.URLError as e:
        stats["failures"] += 1
        print(f"[{datetime.now():%H:%M:%S}] Heartbeat FAILED: {e.reason}")

def detailed_check():
    """Measure response time every 15 minutes."""
    import time as t
    start = t.monotonic()
    try:
        urllib.request.urlopen(SITE_URL, timeout=10)
        elapsed = t.monotonic() - start
        print(f"[{datetime.now():%H:%M:%S}] Detailed check: {elapsed:.3f}s response time")
    except urllib.error.URLError as e:
        print(f"[{datetime.now():%H:%M:%S}] Detailed check FAILED: {e.reason}")

def daily_summary():
    """Log daily stats at midnight."""
    uptime = 0.0
    if stats["checks"] > 0:
        uptime = (stats["checks"] - stats["failures"]) / stats["checks"] * 100
    print(f"\n=== DAILY SUMMARY [{datetime.now():%Y-%m-%d}] ===")
    print(f"  Total checks  : {stats['checks']}")
    print(f"  Failures      : {stats['failures']}")
    print(f"  Uptime        : {uptime:.2f}%")
    print(f"  Last status   : {stats['last_status']}\n")
    # Reset stats for the next day
    stats.update({"checks": 0, "failures": 0})

scheduler = BackgroundScheduler()
scheduler.add_job(heartbeat_check, "interval", seconds=60,  id="heartbeat")
scheduler.add_job(detailed_check,  "interval", minutes=15,  id="detailed")
scheduler.add_job(daily_summary,   "cron",     hour=0, minute=0, id="daily")

scheduler.start()
print(f"Site monitor started for {SITE_URL}")
print("Running checks... (Ctrl+C to stop)")

try:
    # Run heartbeat immediately on start
    heartbeat_check()
    detailed_check()
    while True:
        time.sleep(30)
except KeyboardInterrupt:
    daily_summary()
    scheduler.shutdown()
    print("Monitor stopped.")
Site monitor started for https://httpbin.org/status/200
[10:00:00] Heartbeat OK (200)
[10:00:00] Detailed check: 0.312s response time
[10:01:00] Heartbeat OK (200)
...
^C
=== DAILY SUMMARY [2026-04-30] ===
  Total checks  : 3
  Failures      : 0
  Uptime        : 100.00%
  Last status   : 200
Monitor stopped.

This script uses httpbin.org as a real test endpoint — it always returns HTTP 200, so you can verify the monitoring logic works without setting up your own server. Replace SITE_URL with your actual site and extend daily_summary() to write to a log file or send an email via smtplib.

Python apscheduler tutorial illustration 4
Uptime: 100%. Stress: 0%. Cron job: replaced.

Frequently Asked Questions

What happens if a job misfire (the scheduler was down when it should have run)?

By default, APScheduler has a misfire_grace_time of 1 second. If a job is more than 1 second late, it is skipped and logged as a misfire. You can increase this: scheduler.add_job(my_job, "cron", hour=8, misfire_grace_time=3600) gives the job a 1-hour window to run after its scheduled time. Set misfire_grace_time=None to always run the job, no matter how late.

How do I prevent a job from running again if the previous run is still going?

Use max_instances=1 in add_job(): scheduler.add_job(my_job, "interval", seconds=30, max_instances=1). This prevents a second instance of the job from starting if the previous one has not finished. The default is 1 for most trigger types, but setting it explicitly makes your intent clear. For CPU-bound jobs, also consider setting executor to a process pool executor.

How do I make jobs survive application restarts?

Use a jobstore. APScheduler supports SQLite, PostgreSQL, MongoDB, and Redis job stores. For SQLite: scheduler = BackgroundScheduler(jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')}). Jobs stored in the database survive restarts. Note that the job function must still exist and be importable when the scheduler starts — the job store saves the schedule, not the code.

How do I use APScheduler with FastAPI or asyncio?

Use AsyncIOScheduler instead of BackgroundScheduler. Start it in FastAPI’s lifespan function: scheduler.start() at startup and scheduler.shutdown() at shutdown. If your scheduled function is a coroutine (async def), APScheduler will await it automatically when using AsyncIOScheduler.

How do I schedule jobs in a specific timezone?

Pass the timezone argument to the scheduler or to the job: BackgroundScheduler(timezone="America/New_York") sets the default timezone. For a specific job: scheduler.add_job(my_job, "cron", hour=8, timezone="Europe/London"). APScheduler uses the pytz library for timezone handling — install it separately if needed.

Conclusion

APScheduler gives you three powerful scheduling primitives — interval for repeating tasks, cron for calendar-based schedules, and date for one-off deferred execution — all running inside your Python process without any external infrastructure. You have seen how to choose between BackgroundScheduler and BlockingScheduler, manage jobs at runtime with pause/resume/remove, handle misfires and overlapping runs, and build a complete site monitoring application. The official APScheduler documentation at apscheduler.readthedocs.io covers advanced topics including distributed job stores and custom executors.

How To Use Python tabulate for Pretty-Printing Tables

How To Use Python tabulate for Pretty-Printing Tables

Intermediate

You have a Python list of dicts — maybe query results, CSV data, or API responses — and you need to show it to someone. Printing raw lists looks like a data dump. Writing custom string-formatting code for every project wastes time. What you actually want is a table: aligned columns, clear headers, and readable rows.

The tabulate library solves this in one function call. It takes any list of rows (lists, dicts, or dataframe-style data) and turns it into a clean table in any of 30+ formats: plain text, GitHub Markdown, HTML, LaTeX, and more. It is a zero-dependency pure-Python package installable in seconds.

This article covers everything you need to know: installing tabulate, formatting basic tables, controlling column alignment and number formatting, choosing output formats, using headers with dict data, handling missing values, and building a real-world reporting script. By the end you will be able to output professional-looking tables from any Python data structure with a single line of code.

Python tabulate: Quick Example

Before diving into details, here is tabulate in action — a complete working script you can run immediately:

# quick_tabulate.py
from tabulate import tabulate

data = [
    ["Alice",  "Engineering", 95000],
    ["Bob",    "Marketing",   72000],
    ["Carol",  "Engineering", 105000],
    ["Dave",   "HR",          68000],
]
headers = ["Name", "Department", "Salary"]

print(tabulate(data, headers=headers, tablefmt="grid"))
+-------+--------------+----------+
| Name  | Department   |   Salary |
+=======+==============+==========+
| Alice | Engineering  |    95000 |
+-------+--------------+----------+
| Bob   | Marketing    |    72000 |
+-------+--------------+----------+
| Carol | Engineering  |  105000  |
+-------+--------------+----------+
| Dave  | HR           |    68000 |
+-------+--------------+----------+

The tabulate() function takes your data as the first argument, an optional headers list, and a tablefmt string that controls the visual style. Numbers are right-aligned automatically, strings are left-aligned, and all columns are padded to match the widest value.

The sections below cover every format option, alignment control, and real-world use case you will encounter.

Python tabulate tutorial illustration 1
Raw print() statements: for people who enjoy suffering.

What Is tabulate and Why Use It?

tabulate is a Python library that converts tabular data (rows and columns) into formatted text tables. It was created by Sergey Astanin and has been downloaded hundreds of millions of times because it solves a universal problem: Python makes it easy to collect data but tedious to display it cleanly.

Compare the alternatives:

ApproachCode RequiredOutput QualityFormat Options
print() + f-stringsMany linesPoor (manual alignment)None
pandas to_string()Requires pandasGoodLimited
tabulate1 lineExcellent30+ formats
rich TableSeveral linesExcellent (colors)Terminal only

tabulate shines when you need output that can go into a terminal report, a Markdown README, an HTML email, or a LaTeX document — all from the same data structure with just a format string change. When you need colored terminal output, the rich library is a better choice; for everything else, tabulate is the fastest path to a readable table.

Installing tabulate

tabulate is a pure-Python package with no dependencies. Install it with pip:

pip install tabulate
Successfully installed tabulate-0.9.0

To use the WIDE_CHARS_MODE or CJK character support, install the optional extra:

pip install tabulate[widechars]

Import it with a single line: from tabulate import tabulate. The main function is also accessible as tabulate.tabulate if you prefer the module-style import.

Basic Usage: Lists, Tuples, and Headers

The simplest input is a list of lists (or list of tuples). Each inner list is one row, and each element in that list is a cell value. Pass headers as a list of column names to add a header row.

# basic_table.py
from tabulate import tabulate

# List of lists -- most basic input format
inventory = [
    ["Widget A", 150, 4.99],
    ["Widget B", 30,  12.50],
    ["Gadget X", 5,   99.00],
]

# Plain text -- no borders, just spacing
print(tabulate(inventory, headers=["Product", "Qty", "Price"]))
Product     Qty    Price
--------  -----  -------
Widget A    150     4.99
Widget B     30    12.5
Gadget X      5    99

Without a tablefmt argument, tabulate defaults to "simple" format: dashes separate the header from the data, and columns are space-padded. Notice that numeric columns are right-aligned automatically. String columns are left-aligned. This default behavior handles mixed data types correctly without any configuration.

Choosing a Table Format

The tablefmt parameter is where tabulate becomes powerful. Here are the most useful formats:

# formats_demo.py
from tabulate import tabulate

data = [["Python", 1991, "Guido van Rossum"],
        ["Rust",   2010, "Graydon Hoare"],
        ["Go",     2009, "Google"]]
h = ["Language", "Year", "Creator"]

# GitHub Markdown -- paste directly into README files
print("--- github ---")
print(tabulate(data, headers=h, tablefmt="github"))
print()

# Grid -- heavy borders, great for terminals
print("--- grid ---")
print(tabulate(data, headers=h, tablefmt="grid"))
print()

# HTML -- paste into web pages or emails
print("--- html ---")
print(tabulate(data, headers=h, tablefmt="html"))
print()

# Pipe -- GitHub/GitLab compatible Markdown
print("--- pipe ---")
print(tabulate(data, headers=h, tablefmt="pipe"))
--- github ---
| Language   |   Year | Creator         |
|------------|--------|-----------------|
| Python     |   1991 | Guido van Rossum|
| Rust       |   2010 | Graydon Hoare   |
| Go         |   2009 | Google          |

--- grid ---
+------------+--------+-----------------+
| Language   |   Year | Creator         |
+============+========+=================+
| Python     |   1991 | Guido van Rossum|
+------------+--------+-----------------+
| Rust       |   2010 | Graydon Hoare   |
+------------+--------+-----------------+
| Go         |   2009 | Google          |
+------------+--------+-----------------+

--- html ---
<table>
<thead>
<tr><th>Language</th><th style="text-align: right;">  Year</th><th>Creator</th></tr>
</thead>
<tbody>
<tr><td>Python  </td><td style="text-align: right;">  1991</td><td>Guido van Rossum</td></tr>
...
</tbody>
</table>

--- pipe ---
| Language   |   Year | Creator          |
|:-----------|-------:|:-----------------|
| Python     |   1991 | Guido van Rossum |
| Rust       |   2010 | Graydon Hoare    |
| Go         |   2009 | Google           |

The "html" format produces valid HTML table markup ready to paste into an email or web page. The "github" and "pipe" formats produce Markdown tables that render on GitHub, GitLab, and most Markdown renderers. The "grid" format uses + and - border characters and is the most readable in terminal output. Other available formats include "rst" (ReStructuredText), "latex", "tsv", "mediawiki", and many more — run tabulate.tabulate_formats to see the full list.

Python tabulate tutorial illustration 2
30+ table formats. One function call. Your boss thinks you worked hard.

Using Dictionaries as Input

If your data comes from a database query or API call, it is likely a list of dicts rather than a list of lists. tabulate handles this with the headers="keys" shortcut, which uses the dictionary keys as column headers automatically.

# dict_table.py
from tabulate import tabulate

employees = [
    {"name": "Alice",  "role": "Engineer",  "years": 4},
    {"name": "Bob",    "role": "Designer",  "years": 2},
    {"name": "Carol",  "role": "Manager",   "years": 8},
    {"name": "Dave",   "role": "Engineer",  "years": 1},
]

# headers="keys" extracts dict keys as column headers
print(tabulate(employees, headers="keys", tablefmt="simple"))
print()

# headers="firstrow" treats the first item as the header row
rows = [
    ["Name", "Role", "Years"],     # header row
    ["Alice", "Engineer", 4],
    ["Bob",   "Designer", 2],
]
print(tabulate(rows, headers="firstrow", tablefmt="grid"))
name     role         years
-------  ---------  -------
Alice    Engineer         4
Bob      Designer         2
Carol    Manager          8
Dave     Engineer         1

+-------+-----------+---------+
| Name  | Role      |   Years |
+=======+===========+=========+
| Alice | Engineer  |       4 |
+-------+-----------+---------+
| Bob   | Designer  |       2 |
+-------+-----------+---------+

The headers="keys" option is the most convenient for real-world data because it eliminates the need to maintain a separate header list. The column order follows the dict insertion order (guaranteed in Python 3.7+). If you need a different column order, build the rows as lists and pass an explicit headers list instead.

Number Formatting and Alignment

By default, integers and floats are right-aligned and displayed as-is. For financial or scientific data you often need specific decimal places or thousands separators. The floatfmt and intfmt parameters control this.

# number_format.py
from tabulate import tabulate

financial_data = [
    ["Q1", 1245890.50,  0.0834],
    ["Q2", 2312045.25,  0.1521],
    ["Q3", 987654.00,   0.0612],
    ["Q4", 3102450.75,  0.2341],
]

# floatfmt controls ALL float columns
# Use standard Python format specs
print(tabulate(
    financial_data,
    headers=["Quarter", "Revenue", "Growth"],
    tablefmt="grid",
    floatfmt=("", ",.2f", ".2%")   # tuple: different format per column
))
+----------+-------------------+----------+
| Quarter  | Revenue           | Growth   |
+==========+===================+==========+
| Q1       | 1,245,890.50      | 8.34%    |
+----------+-------------------+----------+
| Q2       | 2,312,045.25      | 15.21%   |
+----------+-------------------+----------+
| Q3       | 987,654.00        | 6.12%    |
+----------+-------------------+----------+
| Q4       | 3,102,450.75      | 23.41%   |
+----------+-------------------+----------+

When floatfmt is a tuple, each element applies to the corresponding column. An empty string "" means “use the default format”. This gives you per-column control: currency columns get comma separators and 2 decimal places, percentage columns get the % suffix. The format strings follow Python’s standard format() mini-language, so any valid format spec works here.

Python tabulate tutorial illustration 3
floatfmt=’.2f’ — because ‘1.0000000001’ is not a number, it’s a cry for help.

Handling Missing Values and None

Real data is messy. tabulate handles None and missing values gracefully with the missingval parameter, which lets you substitute a display string for any cell that is None.

# missing_values.py
from tabulate import tabulate

# Some cells are None -- common in database results
survey_data = [
    {"respondent": "User A", "age": 28,   "city": "Austin",   "score": 8.5},
    {"respondent": "User B", "age": None, "city": "Boston",   "score": None},
    {"respondent": "User C", "age": 34,   "city": None,       "score": 7.2},
    {"respondent": "User D", "age": 45,   "city": "Chicago",  "score": 9.0},
]

print(tabulate(
    survey_data,
    headers="keys",
    tablefmt="simple",
    missingval="N/A",      # replace None with "N/A"
    floatfmt=".1f"
))
respondent      age  city       score
----------  -------  -------  -------
User A           28  Austin       8.5
User B          N/A  Boston       N/A
User C           34  N/A          7.2
User D           45  Chicago      9.0

Without missingval, None cells render as the empty string, which can make columns look misaligned or confuse readers. Setting missingval="N/A" (or "--" or any string you prefer) makes the absence of data explicit and keeps the table easy to read. This is especially important when outputting to Markdown or HTML where empty cells can be visually ambiguous.

Controlling Column Alignment

While tabulate auto-aligns based on data type, you can override alignment explicitly with the colalign parameter. Valid values per column are "left", "right", "center", and "decimal".

# alignment.py
from tabulate import tabulate

scores = [
    ["Alice",   95,  "A"],
    ["Bob",     82,  "B"],
    ["Carol",   91,  "A"],
    ["Dave",    67,  "D"],
]

print(tabulate(
    scores,
    headers=["Student", "Score", "Grade"],
    tablefmt="pipe",
    colalign=("left", "right", "center")
))
| Student   |   Score | Grade   |
|:----------|--------:|:-------:|
| Alice     |      95 |    A    |
| Bob       |      82 |    B    |
| Carol     |      91 |    A    |
| Dave      |      67 |    D    |

In "pipe" format, the alignment is encoded in the separator line using :--, --:, and :-: syntax — the standard Markdown table alignment syntax. In other formats like "grid" or "simple", spaces are used for visual alignment. The "decimal" alignment option aligns numbers on the decimal point, which is useful for mixing integers and floats in the same column.

Python tabulate tutorial illustration 4
colalign=(‘left’,’right’,’center’) — the alignment you needed three sprints ago.

Real-Life Example: Git-Style Commit Log Reporter

Here is a practical script that simulates a commit log display tool — the kind of utility you would use in a CI pipeline script, a code review tool, or a command-line dashboard.

# commit_reporter.py
from tabulate import tabulate
from datetime import datetime, timedelta
import random

def generate_commit_log(num_commits=10):
    """Simulate a git commit log as a list of dicts."""
    authors = ["alice", "bob", "carol", "dave"]
    messages = [
        "Fix null pointer in auth module",
        "Add unit tests for parser",
        "Refactor database connection pool",
        "Update dependencies",
        "Implement retry logic for API calls",
        "Fix race condition in worker thread",
        "Add logging to file upload handler",
        "Optimize query for dashboard endpoint",
    ]
    commits = []
    base_date = datetime(2026, 4, 20)
    for i in range(num_commits):
        commit_date = base_date + timedelta(hours=i * 3)
        commits.append({
            "hash":    f"{random.randint(0, 0xFFFFFF):06x}",
            "author":  random.choice(authors),
            "date":    commit_date.strftime("%Y-%m-%d %H:%M"),
            "message": random.choice(messages)[:40],
            "files":   random.randint(1, 12),
        })
    return commits

def print_commit_summary(commits):
    """Print commit log in multiple formats."""
    print("=== RECENT COMMITS (terminal view) ===")
    print(tabulate(
        commits,
        headers="keys",
        tablefmt="simple",
        colalign=("left", "left", "left", "left", "right")
    ))
    print()

    # Stats summary
    by_author = {}
    for c in commits:
        by_author[c["author"]] = by_author.get(c["author"], 0) + 1

    stats = [[author, count, f"{count/len(commits)*100:.0f}%"]
             for author, count in sorted(by_author.items(), key=lambda x: -x[1])]

    print("=== AUTHOR BREAKDOWN ===")
    print(tabulate(stats,
                   headers=["Author", "Commits", "Share"],
                   tablefmt="github",
                   colalign=("left", "right", "right")))
    print()

    # Markdown export
    print("=== MARKDOWN (for README or PR description) ===")
    print(tabulate(commits, headers="keys", tablefmt="pipe"))

if __name__ == "__main__":
    commits = generate_commit_log(8)
    print_commit_summary(commits)
=== RECENT COMMITS (terminal view) ===
hash      author    date              message                                    files
--------  --------  ----------------  -----------------------------------------  -------
a3f2c1    alice     2026-04-20 00:00  Fix null pointer in auth module                   3
7b8e90    bob       2026-04-20 03:00  Add unit tests for parser                         7
...

=== AUTHOR BREAKDOWN ===
| Author   |   Commits | Share   |
|----------|-----------|---------|
| alice    |         3 | 38%     |
| bob      |         2 | 25%     |
| carol    |         2 | 25%     |
| dave     |         1 | 13%     |

=== MARKDOWN (for README or PR description) ===
| hash   | author   | date             | message                          |   files |
|--------|----------|------------------|----------------------------------|---------|
| a3f2c1 | alice    | 2026-04-20 00:00 | Fix null pointer in auth module  |       3 |
...

This script demonstrates three key strengths of tabulate: switching between terminal and Markdown formats without changing your data, using headers="keys" for dict input, and combining column alignment control with numeric formatting. Extend it to read from a real git log via subprocess.run(["git", "log", "--format=..."]) or a GitHub API response to build a fully functional commit reporter.

Python tabulate tutorial illustration 5
Three lines of code, one professional-looking report. Nobody needs to know.

Frequently Asked Questions

Can I use tabulate directly with a pandas DataFrame?

Yes. Pass the DataFrame to tabulate(df, headers=df.columns) or use the built-in df.to_markdown() which calls tabulate internally. For prettier output with an index, use tabulate(df, headers="keys", showindex=True). The showindex parameter controls whether the row index column appears in the output.

How do I truncate long cell values so the table does not overflow?

tabulate does not have a built-in truncation option. Pre-process your data before passing it in: use a list comprehension like [(row[0][:30], row[1]) for row in data] to clip string values to a maximum width. For terminal output, consider using the rich library’s Table class which has a built-in overflow setting per column.

Can I add color to tabulate output?

tabulate itself does not handle color. You can add ANSI color codes to cell values before passing them to tabulate (e.g., f"\033[91m{value}\033[0m" for red), but this can interfere with column width calculation. For colored terminal tables, the rich library is a better choice. tabulate’s strength is format portability — Markdown and HTML — not terminal styling.

Does tabulate sort data automatically?

No, tabulate outputs rows exactly as it receives them. Sorting is your responsibility before calling tabulate(). Use Python’s built-in sorted(data, key=lambda row: row[1], reverse=True) for lists, or sorted(data, key=lambda d: d["score"], reverse=True) for dicts. This separation of concerns is intentional — tabulate handles formatting only.

How do I save a tabulate table to a file?

tabulate returns a string, so save it like any string: open("report.txt", "w").write(tabulate(data, headers=headers)). For HTML output, wrap the HTML table string in a full HTML document template and save as .html. For Markdown, save as .md. For CSV export, use Python’s built-in csv module instead — tabulate is not designed for CSV.

Is there a way to set maximum column widths in tabulate?

Yes, in tabulate 0.9.0+ you can use the maxcolwidths parameter: tabulate(data, maxcolwidths=30) limits all columns to 30 characters and wraps longer content across multiple rows. Pass a list to set per-column widths: maxcolwidths=[20, None, 15] where None means unlimited. This is one of the most useful recent additions to the library.

Conclusion

tabulate is one of those libraries that solves a small but constantly recurring problem elegantly. You have seen how to format basic list and dict data, pick from 30+ output formats including plain text, Markdown, HTML, and LaTeX, control number formatting per column with floatfmt, handle missing values with missingval, and override alignment with colalign. The real-world commit reporter shows how these features combine into practical reporting tools.

The logical next step is to wire tabulate into a script that generates a report on a schedule — combine it with the csv module to read data, tabulate to format it, and email or Slack API to deliver it automatically. The official documentation at github.com/astanin/python-tabulate has a complete list of format names and options.

How To Use Python Pydash for Functional Programming Utilities

How To Use Python Pydash for Functional Programming Utilities

Intermediate

If you’ve ever written the same list-filtering loop for the hundredth time, or stared at a nested dictionary wondering how to safely pluck a value three levels deep, you already know the problem Pydash solves. Python’s standard library is powerful, but working with collections and data pipelines often produces verbose, repetitive code that buries your intent under implementation details. Pydash is a utility library — inspired by Lodash from JavaScript — that gives you a clean, consistent set of functions for transforming data the way you think about it, not the way Python’s builtins happen to expose it.

Pydash is a pure Python library with no mandatory dependencies, and installing it takes one command: pip install pydash. It works with any Python 3.7+ environment. The functions cover six main areas: lists, dictionaries, strings, numbers, functions (higher-order utilities), and chaining. Most functions are forgiving by design — they return sensible defaults instead of raising exceptions when data is missing or the wrong shape.

In this article, we cover the most useful Pydash utilities for everyday Python work: deep key access with get and set_, list operations with chunk, flatten, and group_by, string utilities, functional tools like curry and partial, and Pydash’s method chaining API. By the end you’ll have a working data pipeline that processes a collection of records using only Pydash functions — no manual loops required.

Working with Pydash: Quick Example

To see how Pydash changes the shape of everyday code, here is a complete example that deep-reads nested dictionary keys, filters a list, and groups results — all without writing a single for-loop or try/except block:

# quick_example.py
import pydash as _

records = [
    {"user": {"name": "Alice", "role": "admin"}, "score": 91},
    {"user": {"name": "Bob",   "role": "member"}, "score": 74},
    {"user": {"name": "Carol", "role": "admin"}, "score": 88},
    {"user": {"name": "Dan",   "role": "member"}, "score": 55},
]

# Deep-get a nested key with a fallback default
first_name = _.get(records[0], "user.name", "Unknown")
print("First user:", first_name)

# Filter records where score >= 80
high_scorers = _.filter_(records, lambda r: r["score"] >= 80)
print("High scorers:", _.map_(high_scorers, "user.name"))

# Group all records by role
by_role = _.group_by(records, "user.role")
print("Admins:", [r["user"]["name"] for r in by_role["admin"]])

Output:

First user: Alice
High scorers: ['Alice', 'Carol']
Admins: ['Alice', 'Carol']

Notice that _.map_(high_scorers, "user.name") uses a path string instead of a lambda — Pydash accepts dot-notation paths wherever it accepts iteratee functions. That single convention eliminates a large category of boilerplate lambdas. The trailing underscore on filter_ and map_ avoids shadowing Python’s built-in names.

The sections below go deeper into each function group, with realistic examples you can run immediately.

Six levels deep. One dot-separated string. Zero KeyErrors.
Six levels deep. One dot-separated string. Zero KeyErrors.

What Is Pydash and Why Use It?

Pydash is a port of Lodash — one of JavaScript’s most-downloaded utility libraries — to Python. Where Python’s standard library organizes utilities by data type (itertools, functools, collections, str methods), Pydash organizes everything under a single consistent namespace: import pydash as _ and every utility is one call away.

The library solves a specific problem: Python’s builtins are not composable at the data level. You cannot safely read data["user"]["address"]["city"] without guarding every bracket with a try/except or a chain of .get() calls. Pydash’s _.get(data, "user.address.city", "Unknown") does the same thing in one line and never raises an exception. This matters most when you’re processing API responses, configuration files, or any JSON-shaped data where fields may or may not exist.

Here is a quick comparison of what Pydash replaces:

TaskVanilla PythonPydash
Safe nested key accessdata.get("a", {}).get("b", default)_.get(data, "a.b", default)
Flatten nested listlist(itertools.chain.from_iterable(...))_.flatten(nested)
Group list by fieldManual defaultdict loop_.group_by(items, "field")
Split list into pagesSlice arithmetic in a loop_.chunk(items, size)
Unique by fieldSeen-set + loop_.uniq_by(items, "id")
Partial applicationfunctools.partial(fn, ...)_.partial(fn, ...) or _.curry(fn)

Pydash is not a replacement for pandas or numpy — it doesn’t do vectorized math or DataFrames. It’s the missing middle ground between raw Python and a full data science stack: a clean toolbox for transforming ordinary Python dicts and lists.

Installing Pydash

Install Pydash from PyPI. No other dependencies are required:

# install_pydash.sh
pip install pydash

Output:

Successfully installed pydash-8.0.3

Throughout this article we import Pydash as _, which mirrors the Lodash convention and makes function calls read naturally. If the underscore alias conflicts with something in your codebase, use import pydash as pd or import functions individually: from pydash import get, flatten, group_by.

Dictionary Utilities: get, set_, has, omit, pick

Safe Deep Access with get and set_

The two most-used Pydash functions are _.get() and _.set_(). They read and write nested keys using dot-notation paths without raising exceptions on missing keys. This is invaluable when consuming API responses where any field might be absent:

# dict_get_set.py
import pydash as _

user = {
    "profile": {
        "name": "Alice",
        "address": {
            "city": "Melbourne",
            "postcode": "3000"
        }
    },
    "scores": [91, 85, 78]
}

# Safe nested read — returns default if path is missing
city = _.get(user, "profile.address.city", "Unknown")
country = _.get(user, "profile.address.country", "Australia")  # key missing
print("City:", city)
print("Country (default):", country)

# Array index access in paths
first_score = _.get(user, "scores[0]", 0)
print("First score:", first_score)

# Deep write — creates intermediate keys if needed
_.set_(user, "profile.settings.theme", "dark")
print("Theme set:", _.get(user, "profile.settings.theme"))

Output:

City: Melbourne
Country (default): Australia
First score: 91
Theme set: dark

_.get() never raises a KeyError or TypeError — if any segment of the path is missing or None, it returns your default value. _.set_() mutates the original dict in place and creates intermediate dicts automatically, so you never need to pre-initialize nested structures.

Selecting and Filtering Keys: pick, omit, has

When you need a subset of a dictionary’s keys — for serialization, logging, or passing to an API — _.pick() and _.omit() do the job cleanly without dictionary comprehensions:

# dict_pick_omit.py
import pydash as _

record = {
    "id": 42,
    "name": "Alice",
    "email": "alice@example.com",
    "password_hash": "abc123",
    "created_at": "2024-01-15",
    "internal_notes": "VIP customer"
}

# Keep only safe fields for API response
public = _.pick(record, ["id", "name", "email", "created_at"])
print("Public record:", public)

# Remove sensitive fields before logging
loggable = _.omit(record, ["password_hash", "internal_notes"])
print("Loggable record:", loggable)

# Check if a nested key exists
print("Has email?", _.has(record, "email"))
print("Has address?", _.has(record, "address.city"))  # nested path

Output:

Public record: {'id': 42, 'name': 'Alice', 'email': 'alice@example.com', 'created_at': '2024-01-15'}
Loggable record: {'id': 42, 'name': 'Alice', 'email': 'alice@example.com', 'created_at': '2024-01-15'}
Has email? True
Has address? False

Both _.pick() and _.omit() return new dicts — the original is untouched. _.has() accepts dot-notation paths just like _.get(), so you can check for deeply nested keys before trying to access them.

pick() and omit() — faster than writing 'if key in dict' for the thousandth time.
pick() and omit() — faster than writing ‘if key in dict’ for the thousandth time.

List Utilities: chunk, flatten, group_by, uniq_by, zip_

Splitting Lists with chunk

When sending items to an API in batches, paginating results, or splitting data for parallel processing, you need to divide a list into fixed-size groups. _.chunk() handles this in one call:

# list_chunk.py
import pydash as _

item_ids = [101, 102, 103, 104, 105, 106, 107, 108, 109, 110]

# Split into batches of 3 for API calls
batches = _.chunk(item_ids, 3)
print("Batches:", batches)

# Simulate batch API calls
for i, batch in enumerate(batches, 1):
    print(f"  Batch {i}: processing IDs {batch}")

Output:

Batches: [[101, 102, 103], [104, 105, 106], [107, 108, 109], [110]]
  Batch 1: processing IDs [101, 102, 103]
  Batch 2: processing IDs [104, 105, 106]
  Batch 3: processing IDs [107, 108, 109]
  Batch 4: processing IDs [110]

The last batch contains whatever items remain — Pydash never drops items to make batches uniform. No slice arithmetic, no off-by-one errors.

Flattening Nested Lists

API responses frequently return nested arrays — list of lists of items, or lists of dicts containing lists. Pydash provides three levels of flattening: _.flatten() for one level deep, _.flatten_deep() for all levels, and _.flatten_depth(n) for a specific depth:

# list_flatten.py
import pydash as _

# One level of nesting (common from paginated API results)
pages = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
all_items = _.flatten(pages)
print("Flatten one level:", all_items)

# Deeply nested structure
nested = [1, [2, [3, [4, [5]]]]]
print("Flatten deep:", _.flatten_deep(nested))
print("Flatten 2 levels:", _.flatten_depth(nested, 2))

Output:

Flatten one level: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Flatten deep: [1, 2, 3, 4, 5]
Flatten 2 levels: [1, 2, 3, [4, [5]]]

Grouping Records with group_by

Grouping a list of dicts by a shared field is one of the most common data transformation tasks. With vanilla Python you build a defaultdict and a loop. With Pydash it’s a single call, and it accepts both a field name string and a lambda for computed groupings:

# list_group_by.py
import pydash as _

orders = [
    {"id": 1, "status": "shipped",   "amount": 120.00},
    {"id": 2, "status": "pending",   "amount":  45.50},
    {"id": 3, "status": "shipped",   "amount":  89.99},
    {"id": 4, "status": "cancelled", "amount":  30.00},
    {"id": 5, "status": "pending",   "amount": 210.75},
]

# Group by field name string
by_status = _.group_by(orders, "status")
for status, items in by_status.items():
    total = sum(o["amount"] for o in items)
    print(f"  {status}: {len(items)} orders, ${total:.2f} total")

# Group by computed value (lambda)
by_size = _.group_by(orders, lambda o: "large" if o["amount"] > 100 else "small")
print("\nLarge orders:", len(by_size.get("large", [])))
print("Small orders:", len(by_size.get("small", [])))

Output:

  shipped: 2 orders, $209.99 total
  pending: 2 orders, $256.25 total
  cancelled: 1 orders, $30.00 total

Large orders: 2
Small orders: 3

Deduplication with uniq_by

When merging datasets or deduplicating records from multiple sources, _.uniq_by() keeps the first occurrence of each unique key value — no seen-set bookkeeping required:

# list_uniq.py
import pydash as _

# Raw records with duplicates (e.g., merged from two data sources)
contacts = [
    {"id": 1, "name": "Alice", "source": "CRM"},
    {"id": 2, "name": "Bob",   "source": "CRM"},
    {"id": 1, "name": "Alice", "source": "CSV"},  # duplicate ID 1
    {"id": 3, "name": "Carol", "source": "CSV"},
]

unique_contacts = _.uniq_by(contacts, "id")
print("Unique contacts:", [c["name"] for c in unique_contacts])
print("Sources kept:", [c["source"] for c in unique_contacts])

Output:

Unique contacts: ['Alice', 'Bob', 'Carol']
Sources kept: ['CRM', 'CRM', 'CSV']

The first occurrence wins — so when merging CRM data over CSV data, put the preferred source first in the input list before calling _.uniq_by().

group_by: because a defaultdict loop is a crime against readability.
group_by: because a defaultdict loop is a crime against readability.

String Utilities: camel_case, snake_case, truncate, words

Pydash includes a full set of string case converters that are useful when normalizing data from APIs (which often use camelCase) into Python conventions (snake_case), or formatting output for display:

# string_utils.py
import pydash as _

# Case conversion — common when consuming REST APIs
api_key = "getUserProfileData"
print("Snake case:", _.snake_case(api_key))      # get_user_profile_data
print("Kebab case:", _.kebab_case(api_key))      # get-user-profile-data
print("Title case:", _.start_case(api_key))      # Get User Profile Data

# Reverse: snake_case to camelCase for sending back to API
field_name = "user_created_at"
print("Camel case:", _.camel_case(field_name))   # userCreatedAt

# String truncation for display
long_text = "Python is a versatile programming language used in web development, data science, and automation."
print("Truncated:", _.truncate(long_text, 60))

# Split into words (handles camelCase and snake_case)
print("Words:", _.words("getUserData"))          # ['get', 'User', 'Data']
print("Words:", _.words("get_user_data"))        # ['get', 'user', 'data']

# Pad strings for tabular output
print(_.pad("OK", 10))                           # '    OK    '
print(_.pad_end("Loading", 12, "."))             # 'Loading.....'

Output:

Snake case: get_user_profile_data
Kebab case: get-user-profile-data
Title case: Get User Profile Data
Camel case: userCreatedAt
Truncated: Python is a versatile programming language used in...
Words: ['get', 'User', 'Data']
Words: ['get', 'user', 'data']
    OK    
Loading.....

These functions are particularly valuable when writing API adapters that translate between external naming conventions and your internal Python code. Calling _.snake_case() on every key in an API response dict is faster to read and less error-prone than a regex substitution.

Functional Utilities: curry, partial, flow

Currying Functions

Currying transforms a multi-argument function into a chain of single-argument functions. This is useful for creating specialized functions from general ones without repeating arguments everywhere:

# functional_curry.py
import pydash as _

# A general function with multiple parameters
def multiply(a, b):
    return a * b

# Curry it — now each call takes one argument at a time
curried_multiply = _.curry(multiply)

double = curried_multiply(2)   # fix the first argument
triple = curried_multiply(3)

print("Double 5:", double(5))   # 10
print("Triple 5:", triple(5))   # 15

# Practical use: apply a tax rate to a list of prices
add_tax = _.curry(lambda rate, price: round(price * (1 + rate), 2))
add_gst = add_tax(0.10)  # 10% GST

prices = [19.99, 49.99, 9.95, 149.00]
with_tax = list(map(add_gst, prices))
print("Prices with GST:", with_tax)

Output:

Double 5: 10
Triple 5: 15
Prices with GST: [21.99, 54.99, 10.94, 163.9]

Building Pipelines with flow

_.flow() composes a series of single-argument functions into a pipeline — the output of each function becomes the input of the next. This is the cleanest way to express multi-step data transformations without deeply nested function calls:

# functional_flow.py
import pydash as _

# Define individual transformation steps
def normalize_text(text):
    return text.strip().lower()

def remove_punctuation(text):
    return ''.join(c for c in text if c.isalnum() or c.isspace())

def split_words(text):
    return text.split()

def count_words(words):
    return len(words)

# Compose into a pipeline
word_count_pipeline = _.flow(
    normalize_text,
    remove_punctuation,
    split_words,
    count_words
)

samples = [
    "  Hello, World! This is Python.  ",
    "Pydash makes functional programming easy!",
    "  One.  ",
]

for sample in samples:
    count = word_count_pipeline(sample)
    print(f"'{sample.strip()[:40]}...' -> {count} words")

Output:

'Hello, World! This is Python.' -> 5 words
'Pydash makes functional programming easy!' -> 5 words
'One.' -> 1 words

_.flow() makes the transformation sequence explicit and readable. Adding a new step is one line — insert the function anywhere in the chain. Compare this to the equivalent nested call: count_words(split_words(remove_punctuation(normalize_text(text)))), which you have to read right-to-left to understand the execution order.

flow() — because nested function calls read inside-out and pipelines don't.
flow() — because nested function calls read inside-out and pipelines don’t.

Method Chaining with _.chain()

Pydash’s chaining API lets you apply multiple operations to a collection in sequence without intermediate variables or nested calls. The chain is lazy — nothing executes until you call .value():

# chaining.py
import pydash as _

employees = [
    {"name": "Alice",   "dept": "Engineering", "salary": 95000, "years": 4},
    {"name": "Bob",     "dept": "Marketing",   "salary": 72000, "years": 2},
    {"name": "Carol",   "dept": "Engineering", "salary": 110000, "years": 7},
    {"name": "Dan",     "dept": "Marketing",   "salary": 68000, "years": 1},
    {"name": "Eve",     "dept": "Engineering", "salary": 88000, "years": 3},
    {"name": "Frank",   "dept": "HR",          "salary": 65000, "years": 5},
]

# Chain: filter engineers -> sort by salary desc -> take top 2 -> extract names
top_engineers = (
    _.chain(employees)
    .filter_(lambda e: e["dept"] == "Engineering")
    .sort_by("salary", reverse=True)
    .take(2)
    .map_("name")
    .value()
)

print("Top 2 engineers by salary:", top_engineers)

# Chain: group by dept -> map to dept summary stats
dept_summary = (
    _.chain(employees)
    .group_by("dept")
    .map_values(lambda members: {
        "count": len(members),
        "avg_salary": round(sum(m["salary"] for m in members) / len(members))
    })
    .value()
)

for dept, stats in dept_summary.items():
    print(f"  {dept}: {stats['count']} people, avg ${stats['avg_salary']:,}")

Output:

Top 2 engineers by salary: ['Carol', 'Alice']
  Engineering: 3 people, avg $97,667
  Marketing: 2 people, avg $70,000
  HR: 1 people, avg $65,000

Each method in the chain wraps the previous result. The chain object accumulates operations without executing them — execution happens only when .value() is called. This means you can build reusable chain templates and conditionally add operations before calling .value().

Real-Life Example: Employee Report Pipeline

This project combines everything from the article into a self-contained pipeline that ingests raw employee records, cleans and validates them, computes department statistics, and produces a formatted summary report — all using Pydash functions.

Real data pipelines: because manual loops are where maintainability goes to die.
Real data pipelines: because manual loops are where maintainability goes to die.
# employee_report.py
import pydash as _

RAW_DATA = [
    {"id": 1,  "full_name": "  alice chen  ", "department": "engineering", "annual_salary": 95000, "tenure_years": 4, "active": True},
    {"id": 2,  "full_name": "BOB SMITH",       "department": "marketing",   "annual_salary": 72000, "tenure_years": 2, "active": True},
    {"id": 3,  "full_name": "Carol Ng",        "department": "engineering", "annual_salary": 110000,"tenure_years": 7, "active": True},
    {"id": 4,  "full_name": "dan jones",       "department": "marketing",   "annual_salary": 68000, "tenure_years": 1, "active": False},
    {"id": 5,  "full_name": "Eve Rodrigo",     "department": "engineering", "annual_salary": 88000, "tenure_years": 3, "active": True},
    {"id": 6,  "full_name": " FRANK LEE ",     "department": "hr",          "annual_salary": 65000, "tenure_years": 5, "active": True},
    {"id": 7,  "full_name": "Grace Kim",       "department": "engineering", "annual_salary": 102000,"tenure_years": 6, "active": True},
    {"id": 8,  "full_name": "henry park",      "department": "hr",          "annual_salary": 61000, "tenure_years": 2, "active": False},
]

# Step 1: Clean and normalize records
def normalize_record(rec):
    return _.assign({}, rec, {
        "full_name": _.start_case(rec["full_name"].strip().lower()),
        "department": _.start_case(rec["department"]),
    })

# Step 2: Build the report pipeline
report = (
    _.chain(RAW_DATA)
    # Normalize names and departments
    .map_(normalize_record)
    # Active employees only
    .filter_(lambda e: e["active"])
    # Group by department
    .group_by("department")
    # Compute stats per department
    .map_values(lambda members: {
        "headcount": len(members),
        "avg_salary": round(sum(m["annual_salary"] for m in members) / len(members)),
        "max_salary": max(m["annual_salary"] for m in members),
        "avg_tenure": round(sum(m["tenure_years"] for m in members) / len(members), 1),
        "top_earner": _.max_by(members, "annual_salary")["full_name"],
    })
    .value()
)

# Step 3: Print the report
print("=" * 54)
print("EMPLOYEE REPORT — ACTIVE STAFF BY DEPARTMENT")
print("=" * 54)
for dept, stats in sorted(report.items()):
    print(f"\n  {dept}")
    print(f"    Headcount  : {stats['headcount']}")
    print(f"    Avg Salary : ${stats['avg_salary']:,}")
    print(f"    Max Salary : ${stats['max_salary']:,}")
    print(f"    Avg Tenure : {stats['avg_tenure']} years")
    print(f"    Top Earner : {stats['top_earner']}")

# Step 4: Global stats
all_active = _.filter_(_.map_(RAW_DATA, normalize_record), lambda e: e["active"])
print(f"\n{'=' * 54}")
print(f"  Total active employees : {len(all_active)}")
print(f"  Company avg salary     : ${round(_.mean(_.map_(all_active, 'annual_salary'))):,}")
print(f"  Highest paid overall   : {_.max_by(all_active, 'annual_salary')['full_name']}")
print("=" * 54)

Output:

======================================================
EMPLOYEE REPORT — ACTIVE STAFF BY DEPARTMENT
======================================================

  Engineering
    Headcount  : 4
    Avg Salary : $98,750
    Max Salary : $110,000
    Avg Tenure : 5.0 years
    Top Earner : Carol Ng

  Hr
    Headcount  : 1
    Avg Salary : $65,000
    Max Salary : $65,000
    Avg Tenure : 5.0 years
    Top Earner : Frank Lee

  Marketing
    Headcount  : 1
    Avg Salary : $72,000
    Max Salary : $72,000
    Avg Tenure : 2.0 years
    Top Earner : Bob Smith

======================================================
  Total active employees : 6
  Company avg salary     : $90,333
  Highest paid overall   : Carol Ng
======================================================

The pipeline reads as a clear sequence of intentions: normalize, filter, group, aggregate. To add a new transformation — say, flagging departments with average tenure under 2 years — you add one .map_values() step to the chain. No refactoring, no new loop variables, no off-by-one concerns.

Frequently Asked Questions

How does Pydash compare to toolz or cytoolz?

Toolz and cytoolz (the Cython-accelerated version) focus on purely functional composition — they’re excellent for pipeline-heavy code with large datasets and prioritize performance. Pydash covers a broader surface area including string utilities, nested dict access, and a chaining API, and is more beginner-friendly because its function names mirror Lodash. For data pipelines that process millions of records, toolz may be faster; for everyday JSON and dict manipulation, Pydash’s ergonomics usually win.

What path string formats does _.get() support?

Pydash’s _.get() accepts dot-notation for nested dicts ("user.address.city"), bracket notation for list indices ("scores[0]"), and combinations of both ("users[2].profile.name"). If a key itself contains a dot (rare but possible), you can pass a list of key segments instead: _.get(data, ["key.with.dot", "nested"]). This covers virtually all real-world JSON structures.

Does Pydash mutate the original data?

Most Pydash functions return new objects and do not mutate their inputs — _.filter_(), _.map_(), _.pick(), _.omit(), and so on. The exceptions are functions that explicitly write: _.set_() and _.assign() mutate their first argument by design. If you need immutable behavior from these, pass a copy: _.set_(dict(original), "key", value). The trailing underscore on function names does not indicate mutation — it’s just used to avoid shadowing Python builtins like filter and map.

Is Pydash performant enough for production?

Pydash is pure Python, so it will be slower than numpy, pandas, or C-extension libraries for large-scale data processing. For typical web application work — processing API responses, transforming configuration data, building report summaries — performance is more than adequate. The library’s functions are implemented straightforwardly and don’t introduce significant overhead over vanilla Python. If you’re processing millions of records in a tight loop, profile first; for everything else, developer productivity gains from clean Pydash code usually outweigh the marginal speed difference.

Is the _.chain() API truly lazy?

Yes — _.chain() creates a wrapper that accumulates operations without executing them. No iteration happens until you call .value(). This means you can build a chain object, conditionally add steps based on runtime conditions, and then execute it — only one pass through the data occurs at the end. It also means a bug in a late chain step won’t be revealed until .value() is called, so testing individual chain steps in isolation during development is good practice.

Conclusion

Pydash fills a genuine gap in Python’s utility landscape. We covered the most useful parts of the library: safe nested dict access with _.get() and _.set_(), targeted key selection with _.pick() and _.omit(), list operations including _.chunk(), _.flatten(), _.group_by(), and _.uniq_by(), string case converters, functional tools like _.curry() and _.flow(), and the full method chaining API with _.chain().

The best way to extend the real-life example is to connect it to a real data source — a CSV export, a REST API response, or a database query result. Try replacing the RAW_DATA list with records from a requests.get() call to jsonplaceholder.typicode.com/users and applying the same pipeline to normalize and summarize that data. The chain won’t change; only the source does.

For the full API reference, including the 150+ functions not covered here, see the official Pydash documentation.

How To Use Python Paramiko for SSH Connections and SFTP

How To Use Python Paramiko for SSH Connections and SFTP

Intermediate

You have a remote server that needs daily log files collected, configuration files deployed, or database backups transferred. You could write shell scripts, but then you have no Python integration, no error handling, and no ability to build on the results programmatically. The paramiko library lets you do all of this in pure Python — SSH into a remote host, run commands, capture output, and transfer files with SFTP — without leaving your Python environment.

Paramiko is a pure-Python implementation of the SSH2 protocol. Install it with pip install paramiko. It provides two main classes: SSHClient for executing remote commands and SFTPClient for file transfers. Both support password authentication and key-based authentication (RSA, Ed25519, ECDSA). For testing without a real server, you can use a local Docker container or SSH into localhost if OpenSSH is installed.

In this article, you will learn how to connect to SSH servers with password and key authentication, run remote commands and capture stdout/stderr, transfer files with SFTP (upload, download, list, delete), handle common SSH errors defensively, and build a real-life deployment script that uses all of these features together.

Quick Example: Run a Remote Command

# quick_ssh.py
import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Connect -- replace with your actual server details
client.connect(
    hostname="your-server.example.com",
    username="ubuntu",
    key_filename="~/.ssh/id_rsa"
)

stdin, stdout, stderr = client.exec_command("uname -a && df -h /")
output = stdout.read().decode()
error = stderr.read().decode()
print(output)

client.close()

Output:

Linux my-server 5.15.0-1034-aws #38-Ubuntu SMP Mon Apr 3 16:14:06 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
Filesystem      Size  Used Avail Use% Mounted on
/dev/xvda1       30G   12G   17G  42% /

The three streams returned by exec_command work like file objects — call .read() to get all output, or iterate line by line for long-running commands. Always call client.close() when done, or better yet, use a with statement to ensure cleanup.

What Is Paramiko and Why Use It?

Paramiko is a pure-Python SSH library that lets your programs speak the SSH2 protocol directly, without calling the system ssh binary. It is the SSH backend used by Fabric, Ansible, and many deployment tools. The pure-Python implementation means it works everywhere Python works, with no external binary dependencies.

Featuresubprocess sshparamiko
Programmatic outputParse text outputStructured streams
SFTP supportExternal scp/sftp binaryBuilt-in SFTPClient
Key managementSystem ssh-agentFull Python API
Multiple commandsMultiple subprocessesPersistent connection
Error handlingParse exit codesPython exceptions
PortabilityRequires ssh binaryPure Python, any OS
subprocess.run(['ssh', ...]) works. Until it doesn't. paramiko gives you the socket.
subprocess.run([‘ssh’, …]) works. Until it doesn’t. paramiko gives you the socket.

Connecting to SSH Servers

Paramiko supports three authentication methods: password, private key file, and private key object. In production, always prefer key-based authentication — passwords in code are a security risk.

# ssh_connect.py
import paramiko
import os

def create_ssh_client(hostname: str, username: str,
                       key_path: str = None, password: str = None) -> paramiko.SSHClient:
    """Create and return a connected SSHClient."""
    client = paramiko.SSHClient()
    # AutoAddPolicy: automatically accept the host key on first connect
    # For production, use RejectPolicy and pre-populate known_hosts
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    connect_kwargs = {
        "hostname": hostname,
        "username": username,
        "port": 22,
        "timeout": 10,
    }

    if key_path:
        # Expand ~ to home directory
        key_path = os.path.expanduser(key_path)
        connect_kwargs["key_filename"] = key_path
    elif password:
        connect_kwargs["password"] = password
    else:
        # Use SSH agent or default keys (~/.ssh/id_rsa, ~/.ssh/id_ed25519)
        connect_kwargs["allow_agent"] = True
        connect_kwargs["look_for_keys"] = True

    try:
        client.connect(**connect_kwargs)
        print(f"Connected to {username}@{hostname}")
        return client
    except paramiko.AuthenticationException:
        raise RuntimeError(f"Authentication failed for {username}@{hostname}")
    except paramiko.SSHException as e:
        raise RuntimeError(f"SSH error: {e}")
    except Exception as e:
        raise RuntimeError(f"Cannot connect to {hostname}: {e}")

# Test with a real server:
# client = create_ssh_client("myserver.example.com", "ubuntu", key_path="~/.ssh/id_rsa")
print("Connection helper defined -- replace hostname to test")

Output:

Connection helper defined -- replace hostname to test

The set_missing_host_key_policy(paramiko.AutoAddPolicy()) call automatically accepts unknown host keys on first connection. This is convenient but less secure than using RejectPolicy with a pre-populated known_hosts file. For production systems, load known_hosts with client.load_system_host_keys() and use RejectPolicy.

Running Remote Commands

The exec_command() method runs a command in a new channel. It is non-blocking — you call it and get back three stream objects. The actual command runs asynchronously on the server; you read the output by calling .read() on the streams.

# remote_commands.py
import paramiko

def run_command(client: paramiko.SSHClient, command: str,
                timeout: int = 30) -> tuple[str, str, int]:
    """Run a command and return (stdout, stderr, exit_code)."""
    stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
    stdout_text = stdout.read().decode("utf-8").strip()
    stderr_text = stderr.read().decode("utf-8").strip()
    exit_code = stdout.channel.recv_exit_status()
    return stdout_text, stderr_text, exit_code

def run_commands_safely(client: paramiko.SSHClient, commands: list) -> list:
    """Run multiple commands and collect results."""
    results = []
    for cmd in commands:
        out, err, code = run_command(client, cmd)
        status = "OK" if code == 0 else f"FAILED (exit {code})"
        results.append({"command": cmd, "output": out, "error": err,
                         "exit_code": code, "status": status})
        if code != 0:
            print(f"  WARNING: '{cmd}' {status}")
            if err:
                print(f"    stderr: {err}")
    return results

# Simulate output without a live server
print("Example results structure:")
example = [
    {"command": "hostname", "output": "web-server-01", "error": "", "exit_code": 0, "status": "OK"},
    {"command": "free -m", "output": "              total  used  free\nMem: 4096  2048  2048", "error": "", "exit_code": 0, "status": "OK"},
    {"command": "ls /nonexistent", "output": "", "error": "ls: cannot access '/nonexistent'", "exit_code": 2, "status": "FAILED (exit 2)"}
]
for r in example:
    print(f"  [{r['status']}] {r['command']}: {r['output'][:40] or r['error'][:40]}")

Output:

Example results structure:
  [OK] hostname: web-server-01
  [OK] free -m:               total  used  free
  [FAILED (exit 2)] ls /nonexistent: ls: cannot access '/nonexistent'

Always check the exit code — exec_command does not raise an exception if the remote command fails. Call stdout.channel.recv_exit_status() after reading all output to get the command’s return code. A non-zero exit code indicates failure, just like in a shell script.

stdout and stderr are separate streams. Reading them in the wrong order can deadlock you.
stdout and stderr are separate streams. Reading them in the wrong order can deadlock you.

File Transfers with SFTPClient

Paramiko’s SFTPClient provides a file-system-like API for the remote server. You can upload files, download files, list directories, create directories, and delete files — all over the same SSH connection.

# sftp_operations.py
import paramiko
import os
from pathlib import Path

def sftp_operations_demo(client: paramiko.SSHClient):
    """Demonstrate SFTP operations."""
    sftp = client.open_sftp()

    try:
        # List remote directory
        files = sftp.listdir("/tmp")
        print(f"Files in /tmp: {files[:5]}")

        # Upload a local file to the remote server
        local_file = "/tmp/deploy_config.json"
        remote_file = "/tmp/deploy_config.json"

        # Create a test file locally
        with open(local_file, "w") as f:
            f.write('{"version": "1.2.0", "env": "production"}')

        sftp.put(local_file, remote_file)
        print(f"Uploaded {local_file} -> {remote_file}")

        # Check remote file size
        stat = sftp.stat(remote_file)
        print(f"Remote file size: {stat.st_size} bytes")

        # Download it back
        downloaded = "/tmp/downloaded_config.json"
        sftp.get(remote_file, downloaded)
        print(f"Downloaded to {downloaded}")
        with open(downloaded) as f:
            print(f"Content: {f.read()}")

        # Create a remote directory
        try:
            sftp.mkdir("/tmp/deploy_backup")
            print("Created /tmp/deploy_backup")
        except OSError:
            print("/tmp/deploy_backup already exists")

        # Delete the remote file
        sftp.remove(remote_file)
        print(f"Deleted {remote_file}")

    finally:
        sftp.close()

# Simulate without live server
print("SFTP operations:")
print("  sftp.put(local, remote)  -- upload file")
print("  sftp.get(remote, local)  -- download file")
print("  sftp.listdir(path)       -- list directory")
print("  sftp.mkdir(path)         -- create directory")
print("  sftp.stat(path)          -- get file metadata")
print("  sftp.remove(path)        -- delete file")
print("  sftp.rename(old, new)    -- move/rename file")

Output:

SFTP operations:
  sftp.put(local, remote)  -- upload file
  sftp.get(remote, local)  -- download file
  sftp.listdir(path)       -- list directory
  sftp.mkdir(path)         -- create directory
  sftp.stat(path)          -- get file metadata
  sftp.remove(path)        -- delete file
  sftp.rename(old, new)    -- move/rename file

For large file transfers, use the callback parameter of sftp.put() and sftp.get() to track progress: sftp.put(local, remote, callback=lambda sent, total: print(f"{sent}/{total}")). Always use the SFTP client inside a try/finally block to ensure sftp.close() is called even if an exception occurs.

Real-Life Example: Automated Deployment Script

# deploy.py
import paramiko
import json
import os
from datetime import datetime

class Deployer:
    """Deploy application files to a remote server via SSH/SFTP."""

    def __init__(self, hostname: str, username: str, key_path: str):
        self.hostname = hostname
        self.username = username
        self.key_path = os.path.expanduser(key_path)
        self.client = None
        self.sftp = None

    def connect(self):
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(self.hostname, username=self.username,
                            key_filename=self.key_path, timeout=15)
        self.sftp = self.client.open_sftp()
        print(f"Connected to {self.username}@{self.hostname}")

    def run(self, command: str) -> tuple[str, int]:
        _, stdout, stderr = self.client.exec_command(command, timeout=60)
        out = stdout.read().decode().strip()
        err = stderr.read().decode().strip()
        code = stdout.channel.recv_exit_status()
        if code != 0 and err:
            print(f"  stderr: {err[:200]}")
        return out, code

    def upload(self, local_path: str, remote_path: str):
        self.sftp.put(local_path, remote_path)
        print(f"  Uploaded: {os.path.basename(local_path)} -> {remote_path}")

    def deploy(self, local_dir: str, remote_dir: str, app_name: str):
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_dir = f"/tmp/{app_name}_backup_{timestamp}"

        print(f"\n=== Deploying {app_name} ===")

        # Step 1: Backup existing deployment
        out, code = self.run(f"test -d {remote_dir} && cp -r {remote_dir} {backup_dir} && echo backed_up || echo fresh_deploy")
        print(f"Backup: {out}")

        # Step 2: Create remote directory
        self.run(f"mkdir -p {remote_dir}")

        # Step 3: Upload files
        print("Uploading files:")
        for filename in os.listdir(local_dir):
            local_path = os.path.join(local_dir, filename)
            remote_path = f"{remote_dir}/{filename}"
            if os.path.isfile(local_path):
                self.upload(local_path, remote_path)

        # Step 4: Verify deployment
        out, code = self.run(f"ls -la {remote_dir}")
        print(f"Remote directory:\n{out}")

        print(f"=== Deployment complete ===\n")

    def close(self):
        if self.sftp:
            self.sftp.close()
        if self.client:
            self.client.close()

# Usage (replace with real server details):
print("Deployer class ready.")
print("Usage:")
print("  deployer = Deployer('myserver.com', 'ubuntu', '~/.ssh/id_rsa')")
print("  deployer.connect()")
print("  deployer.deploy('./dist/', '/var/www/myapp', 'myapp')")
print("  deployer.close()")

Output:

Deployer class ready.
Usage:
  deployer = Deployer('myserver.com', 'ubuntu', '~/.ssh/id_rsa')
  deployer.connect()
  deployer.deploy('./dist/', '/var/www/myapp', 'myapp')
  deployer.close()

This deployer pattern — connect once, run multiple commands and file transfers over the same connection — is much more efficient than opening a new SSH connection for each operation. Extend it by adding application restart logic (self.run("sudo systemctl restart myapp")), health checks, and rollback capability using the backup directory created in step 1.

One SSH connection. Multiple commands. Zero subprocess.run(['ssh', ...]) calls.
One SSH connection. Multiple commands. Zero subprocess.run([‘ssh’, …]) calls.

Frequently Asked Questions

Is AutoAddPolicy safe to use?

AutoAddPolicy automatically accepts any host key on first connection, making it vulnerable to man-in-the-middle attacks. For interactive development and trusted networks it is acceptable. For production, use paramiko.RejectPolicy() combined with client.load_system_host_keys() (loads from ~/.ssh/known_hosts) or client.load_host_keys(path) to use a specific known_hosts file. Only connect to hosts whose keys are already in the file.

Why does exec_command sometimes hang?

Deadlocks happen when a command produces more output than fits in the channel buffer and you try to read stdout and stderr sequentially. The remote process blocks waiting to write stderr, while you are waiting to finish reading stdout. Prevent this by setting a timeout in exec_command, or read both streams concurrently using threads. The safest pattern: read stdout first with .read(), then stderr, and always set a timeout.

How do I run sudo commands with paramiko?

Use exec_command("sudo -S command", get_pty=True) and write the password to stdin: stdin.write(password + "\n"); stdin.flush(). A more reliable approach is to configure passwordless sudo for the deployment user via /etc/sudoers with the specific commands they need. Storing passwords in code or passing them via stdin is a security risk in production environments.

How does paramiko compare to Fabric?

Fabric is built on top of paramiko and provides a higher-level API specifically for deployment tasks — running commands on multiple hosts, connecting with roles, managing connections automatically. If you are doing deployment automation, Fabric is worth evaluating. Paramiko gives you lower-level control and is better when you need to integrate SSH into a larger Python program that is not specifically about deployment.

How do I show upload progress?

Pass a callback function to sftp.put(): sftp.put(local, remote, callback=lambda bytes_transferred, total_bytes: print(f"{bytes_transferred}/{total_bytes}")). The callback receives the bytes transferred so far and the total file size. For a terminal progress bar, use tqdm: wrap the callback to update a tqdm progress bar instance.

Conclusion

Paramiko gives Python programs direct SSH and SFTP capabilities without spawning system processes. You have seen how to connect with key and password authentication using SSHClient, execute remote commands and read stdout/stderr/exit codes with exec_command, transfer files with SFTPClient.put() and get(), and structure a real deployment script that uses a persistent connection for efficiency.

As a next step, explore the fabric library (pip install fabric) which builds on paramiko with a higher-level deployment API, or use paramiko’s Transport class directly for advanced use cases like port forwarding and custom channels. Official documentation is at paramiko.org.

How To Implement Retry Logic in Python with Tenacity

How To Implement Retry Logic in Python with Tenacity

Intermediate

Every API call can fail. Networks drop packets, rate limits kick in, services restart, databases timeout. The naive fix is a bare time.sleep(5); retry() — but that retries on every error (including bugs that will never succeed), uses fixed delays (which hammer a struggling server), and has no limit on attempts. The result is a program that hangs forever or spams a broken endpoint. The tenacity library solves all of this with a decorator-based retry system that is both flexible and production-safe.

Install tenacity with pip install tenacity. Its API is a single decorator, @retry, that wraps any function and controls retry behavior through composable strategy objects. You specify WHAT to retry (which exceptions or return values), HOW LONG to wait between attempts (fixed, exponential backoff, random jitter), and WHEN to stop (after N attempts, after N seconds, or a combination). Everything is declarative and testable.

In this article, you will learn to use the @retry decorator with stop conditions, wait strategies, retry predicates, and callbacks. You will see the difference between retrying on exceptions vs. return values, how to add jitter to prevent thundering herd problems, how to log retry attempts, and a real-life example that builds a resilient HTTP client for an unreliable API.

Quick Example: Retry on Exception

The minimal pattern — retry up to 3 times on any exception, with exponential backoff:

# quick_retry.py
import random
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
def call_flaky_api(url: str) -> dict:
    if random.random() < 0.7:  # Simulate 70% failure rate
        raise ConnectionError(f"Failed to connect to {url}")
    return {"status": "ok", "data": [1, 2, 3]}

try:
    result = call_flaky_api("https://api.example.com/data")
    print(f"Success: {result}")
except Exception as e:
    print(f"All retries failed: {e}")

Output (example run):

Success: {'status': 'ok', 'data': [1, 2, 3]}

The decorator handles all the retry logic -- waiting, counting, and re-raising the final exception if all attempts fail. Your function body stays clean and focused on the happy path. The wait_exponential strategy doubles the wait time on each attempt: 1s, 2s, 4s, up to the max cap.

What Is Tenacity and Why Use It?

Tenacity is the maintained successor to the popular retrying library. It provides a decorator and context-manager API for adding retry logic to any function. The key design principle is composability -- you mix and match stop conditions, wait strategies, and predicates to express exactly the retry policy you need.

FeatureManual retry looptenacity
Exponential backoffManual calculationwait_exponential()
JitterManual random.uniform()wait_random_exponential()
Multiple stop conditionsNested if statementsstop_after_attempt | stop_after_delay
Retry on specific exceptionexcept SomeError: retry()retry=retry_if_exception_type()
Retry on bad return valuewhile result != ok: retry()retry=retry_if_result(predicate)
Before/after callbacksManual loggingbefore_sleep=before_sleep_log()
Retry #1: wait 1s. Retry #2: wait 2s. Retry #3: wait 4s. Retry #4: give up with dignity.
Retry #1: wait 1s. Retry #2: wait 2s. Retry #3: wait 4s. Retry #4: give up with dignity.

Stop Conditions

Stop conditions tell tenacity when to give up. You can stop after a number of attempts, after a total elapsed time, or combine conditions with | (stop if either is true) and & (stop only if both are true).

# stop_conditions.py
from tenacity import (retry, stop_after_attempt, stop_after_delay,
                       stop_never, RetryError)
import time

# Stop after 5 attempts
@retry(stop=stop_after_attempt(5))
def max_5_tries():
    raise ValueError("always fails")

# Stop after 10 seconds total
@retry(stop=stop_after_delay(10))
def max_10_seconds():
    raise ConnectionError("always fails")

# Stop after 3 attempts OR 5 seconds -- whichever comes first
@retry(stop=(stop_after_attempt(3) | stop_after_delay(5)))
def combined_stop():
    raise IOError("always fails")

# Test: should fail after 3 attempts
try:
    combined_stop()
except RetryError as e:
    last_exc = e.last_attempt.exception()
    print(f"Gave up after retries. Last error: {last_exc}")

Output:

Gave up after retries. Last error: always fails

When all retries are exhausted, tenacity raises a RetryError (not the original exception) unless you pass reraise=True. With reraise=True, the original exception from the last attempt is re-raised, which is usually what you want in production code so your error handling sees the actual error type.

Wait Strategies

The wait strategy controls the delay between retry attempts. Choosing the right strategy is important: too short and you hammer a struggling server, too long and your users wait unnecessarily. Adding jitter (randomness) is critical in distributed systems to prevent the "thundering herd" problem where many clients retry simultaneously.

# wait_strategies.py
from tenacity import (retry, stop_after_attempt, wait_fixed,
                       wait_exponential, wait_random, wait_random_exponential)

# Fixed: always wait 2 seconds
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def fixed_wait():
    raise ConnectionError("fail")

# Exponential: 1s, 2s, 4s, 8s... up to 60s max
@retry(stop=stop_after_attempt(6), wait=wait_exponential(multiplier=1, min=1, max=60))
def exponential_wait():
    raise ConnectionError("fail")

# Random jitter: wait 0-3 seconds randomly
@retry(stop=stop_after_attempt(3), wait=wait_random(min=0, max=3))
def random_wait():
    raise ConnectionError("fail")

# Random exponential (RECOMMENDED for production API calls):
# Exponential base with added randomness -- prevents thundering herd
@retry(stop=stop_after_attempt(5),
       wait=wait_random_exponential(multiplier=1, max=60))
def production_ready():
    raise ConnectionError("fail")

# Show the wait times tenacity would use (without executing)
from tenacity import Retrying
for attempt in Retrying(stop=stop_after_attempt(5),
                         wait=wait_random_exponential(multiplier=1, max=60)):
    with attempt:
        pass  # Just show timing
    if not attempt.retry_state.outcome.failed:
        break
    wait_time = attempt.retry_state.outcome.exception().__class__.__name__
    print(f"Attempt {attempt.retry_state.attempt_number}: would wait ~{attempt.retry_state.next_action.sleep:.1f}s")

Output (example -- values vary due to randomness):

Attempt 1: would wait ~0.7s
Attempt 2: would wait ~2.1s
Attempt 3: would wait ~5.4s
Attempt 4: would wait ~12.8s
Thundering herd: when 500 services retry at exactly the same second. Jitter fixes this.
Thundering herd: when 500 services retry at exactly the same second. Jitter fixes this.

Retry Predicates: What to Retry

By default, tenacity retries on any exception. Often you need more specific behavior: retry only on network errors (not on validation errors that will never succeed), or retry when a function returns a specific "not ready" value.

# retry_predicates.py
import requests
from tenacity import (retry, stop_after_attempt, wait_exponential,
                       retry_if_exception_type, retry_if_result,
                       retry_if_not_result)

# Only retry on network-related exceptions, not on ValueError
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, max=10),
    retry=retry_if_exception_type((ConnectionError, TimeoutError))
)
def network_only_retry(url: str):
    # This would NOT be retried (ValueError is not in the tuple)
    if "bad" in url:
        raise ValueError("Invalid URL format")
    # This WOULD be retried (ConnectionError IS in the tuple)
    raise ConnectionError("Network down")

# Retry when the function returns None or a falsy value
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=0.5, max=5),
    retry=retry_if_result(lambda result: result is None)
)
def fetch_job_result(job_id: str):
    """Poll an async job until it returns a result."""
    import random
    # Simulate a job that takes a few polls to complete
    if random.random() < 0.8:
        return None  # Job still running -- will retry
    return {"job_id": job_id, "status": "complete", "output": 42}

# Combine: retry on exception OR on None result
from tenacity import retry_if_exception_type, retry_if_result

@retry(
    stop=stop_after_attempt(5),
    retry=(retry_if_exception_type(ConnectionError) |
           retry_if_result(lambda r: r is None))
)
def robust_fetch(url: str):
    import random
    r = random.random()
    if r < 0.3:
        raise ConnectionError("timeout")
    if r < 0.6:
        return None  # Not ready yet
    return {"data": "success"}

result = robust_fetch("https://api.jsonplaceholder.typicode.com/todos/1")
print(f"Got result: {result}")

Output:

Got result: {'data': 'success'}

The | operator combines predicates with OR logic. When you combine an exception predicate with a result predicate, tenacity retries if either condition is true. This pattern is perfect for polling APIs that return status codes or None until a job completes.

Logging and Callbacks

In production, silent retries are a debugging nightmare. Tenacity provides before_sleep and after callbacks so you can log every retry attempt with the wait time, attempt number, and exception details.

# retry_callbacks.py
import logging
from tenacity import (retry, stop_after_attempt, wait_exponential,
                       before_sleep_log, after_log, RetryCallState)

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

def custom_before_sleep(retry_state: RetryCallState):
    logger.warning(
        f"Retry #{retry_state.attempt_number} for {retry_state.fn.__name__}() "
        f"-- sleeping {retry_state.next_action.sleep:.1f}s "
        f"after: {retry_state.outcome.exception()}"
    )

@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=1, max=8),
    before_sleep=custom_before_sleep,
    reraise=True
)
def flaky_database_write(record: dict) -> bool:
    import random
    if random.random() < 0.75:
        raise TimeoutError("DB write timeout after 30s")
    return True

try:
    flaky_database_write({"id": 42, "name": "test"})
    print("Write succeeded")
except TimeoutError as e:
    print(f"Failed after all retries: {e}")

Output (example):

WARNING: Retry #1 for flaky_database_write() -- sleeping 1.0s after: DB write timeout after 30s
WARNING: Retry #2 for flaky_database_write() -- sleeping 2.0s after: DB write timeout after 30s
Write succeeded

Note the reraise=True -- without it, tenacity wraps the final exception in RetryError. With it, the original TimeoutError propagates to your caller, which is the right behavior for most production code where the caller needs to handle specific exception types.

Real-Life Example: Resilient HTTP API Client

# resilient_client.py
import requests
import logging
from tenacity import (retry, stop_after_attempt, wait_random_exponential,
                       retry_if_exception_type, retry_if_result,
                       before_sleep_log, RetryError)

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s %(message)s')

RETRYABLE_STATUS = {429, 500, 502, 503, 504}

class APIError(Exception):
    pass

class RateLimitError(APIError):
    pass

def should_retry_response(response):
    """Retry on specific HTTP status codes."""
    return response is not None and response.status_code in RETRYABLE_STATUS

@retry(
    stop=stop_after_attempt(5),
    wait=wait_random_exponential(multiplier=1, max=30),
    retry=(
        retry_if_exception_type((requests.ConnectionError, requests.Timeout)) |
        retry_if_result(should_retry_response)
    ),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True
)
def api_get(url: str, params: dict = None) -> dict:
    """Make a GET request with retry logic."""
    resp = requests.get(url, params=params, timeout=10)
    if resp.status_code in RETRYABLE_STATUS:
        return resp  # Return the response object -- retry_if_result handles it
    resp.raise_for_status()  # Raise on 4xx/5xx (not retried)
    return resp.json()

# Usage
BASE = "https://jsonplaceholder.typicode.com"

try:
    user = api_get(f"{BASE}/users/1")
    print(f"User: {user['name']} ({user['email']})")

    todos = api_get(f"{BASE}/todos", params={"userId": 1, "_limit": 3})
    print(f"First 3 todos for user 1:")
    for t in todos:
        status = "done" if t["completed"] else "pending"
        print(f"  [{status}] {t['title'][:50]}")

except RetryError as e:
    print(f"All retries exhausted: {e}")
except requests.HTTPError as e:
    print(f"HTTP error (not retried): {e}")

Output:

User: Leanne Graham (Sincere@april.biz)
First 3 todos for user 1:
  [done] delectus aut autem
  [done] quis ut nam facilis et officia qui
  [done] fugiat veniam minus

This client retries on network errors and specific HTTP status codes (5xx, 429 rate limit) but NOT on 4xx client errors (bad request, unauthorized, not found) which indicate bugs that retrying won't fix. The wait_random_exponential adds jitter to prevent synchronized retries across multiple client instances.

503? Come back later. 404? Stop knocking. Tenacity knows the difference.
503? Come back later. 404? Stop knocking. Tenacity knows the difference.

Frequently Asked Questions

Should I use reraise=True?

In most production code, yes. Without reraise=True, tenacity wraps the final exception in a RetryError, and your callers must catch RetryError instead of the original exception type. With reraise=True, the last exception propagates directly, which integrates cleanly with existing error handling. Use the default (no reraise) only when you want to distinguish "all retries failed" from "raised on first attempt".

Does tenacity support async functions?

Yes. Use @retry directly on async def functions -- tenacity detects async functions automatically and uses asyncio.sleep between retries instead of time.sleep. This means your async retry code does not block the event loop during wait periods, making it safe to use in FastAPI, aiohttp, and other async frameworks.

When should I use the Retrying context manager instead of the decorator?

Use the context manager when you need to retry a block of code rather than a function, or when retry parameters need to be dynamic (computed at runtime). For example: for attempt in Retrying(stop=stop_after_attempt(3)): with attempt: risky_operation(). The Retrying class also gives you access to attempt.retry_state for detailed control.

What does the multiplier parameter do in wait_exponential?

The multiplier scales the exponential formula: wait time = multiplier * 2^(attempt - 1). With multiplier=1: 1s, 2s, 4s, 8s. With multiplier=2: 2s, 4s, 8s, 16s. The min and max parameters clamp the result so the first wait is at least min seconds and never exceeds max seconds.

Is tenacity a circuit breaker?

No -- tenacity is a retry library, not a circuit breaker. It retries independently on each function call. A circuit breaker tracks failure rates across many calls over time and stops sending requests to a failing service entirely (opens the circuit) until it recovers. For circuit breaker functionality in Python, look at the pybreaker library. In practice, combining tenacity (for per-call retries) with pybreaker (for service-level protection) gives you the best of both.

Conclusion

Tenacity makes retry logic declarative and composable. You have seen how to use stop_after_attempt and stop_after_delay to limit retries, wait_exponential and wait_random_exponential for backoff strategies, retry_if_exception_type and retry_if_result to control what gets retried, and the before_sleep callback to log retry events. Combined, these tools let you express sophisticated retry policies without writing manual retry loops.

For a next step, add circuit breaker logic with pybreaker and integrate with structlog for structured retry logging. The official tenacity documentation is at tenacity.readthedocs.io.

Basic Retry Pattern

from tenacity import retry, stop_after_attempt, wait_exponential
import requests

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def fetch_user(user_id):
    response = requests.get("https://api.example.com/users/" + str(user_id), timeout=5)
    response.raise_for_status()
    return response.json()

user = fetch_user(42)

Three building blocks: stop= when to give up, wait= how long between retries, retry= what to retry on (default: any exception).

Stop and Wait Strategies

from tenacity import retry, stop_after_attempt, stop_after_delay, stop_any
from tenacity import wait_fixed, wait_exponential, wait_random, wait_random_exponential

@retry(stop=stop_after_attempt(5))
@retry(stop=stop_after_delay(60))
@retry(stop=stop_any(stop_after_attempt(5), stop_after_delay(60)))

@retry(wait=wait_fixed(2))
@retry(wait=wait_exponential(multiplier=1, max=60))
@retry(wait=wait_random(min=1, max=5))
@retry(wait=wait_random_exponential(multiplier=1, max=60))

For external APIs, wait_random_exponential is the sweet spot — exponential backoff prevents hammering, jitter prevents thundering-herd retries.

Retry on Specific Exceptions

from tenacity import retry, retry_if_exception_type, retry_if_result, stop_after_attempt
import requests

@retry(retry=retry_if_exception_type(requests.RequestException), stop=stop_after_attempt(5))
def fetch_data():
    r = requests.get("https://api.example.com/data", timeout=5)
    r.raise_for_status()
    return r.json()

@retry(retry=retry_if_exception_type((TimeoutError, ConnectionError)))
def f(): ...

@retry(retry=retry_if_result(lambda r: r is None or r.get("retry") == True))
def f(): ...

Logging Retries

from tenacity import retry, before_sleep_log, stop_after_attempt
import logging
log = logging.getLogger(__name__)

@retry(stop=stop_after_attempt(5), before_sleep=before_sleep_log(log, logging.WARNING))
def fetch():
    ...

Every retry attempt logs a warning with exception and wait time — essential for debugging flaky external services in production.

Programmatic API

from tenacity import Retrying, stop_after_attempt, wait_fixed
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(2)):
    with attempt:
        result = risky_operation()
        print(result)

Common Pitfalls

  • Retrying on permanent errors. Retrying a 401 Unauthorized 5 times is just 5 failures with delay. Filter exceptions to transient ones.
  • No max delay cap. Pure exponential backoff can wait minutes. Always set max=.
  • Missing jitter. Many clients with identical backoff hammer the service in sync — thundering herd. Always use wait_random_exponential.
  • Silent retries. Without before_sleep logging, you have no idea retries are happening. Production logging is essential.
  • Decorator on wrong layer. Decorate the HTTP call, not your business logic — otherwise you retry expensive computation, not just the network bit.

FAQ

Q: tenacity or backoff library?
A: Both great. tenacity has more features (predicates, sync+async, stop+wait composition).

Q: Async support?
A: @retry(...) on async functions works natively.

Q: See how many attempts were made?
A: f.retry.statistics gives counters.

Q: What about idempotency?
A: Tenacity doesn't enforce it. Retrying a non-idempotent POST can cause duplicates. Use idempotency keys or retry only on connection errors before the server sees the request.

Q: Retry on HTTP 5xx?
A: r.raise_for_status() turns 5xx into HTTPError — tenacity retries that. Or use retry_if_result.

Wrapping Up

Tenacity is the right answer to "make this network call resilient" in Python. The right policy for most external APIs: 3-5 attempts, exponential backoff with jitter, retry only on transient exceptions, log each retry.

How To Parse Dates and Times with Python dateutil

How To Parse Dates and Times with Python dateutil

Intermediate

Parsing dates from real-world sources is deceptively hard. Your users type “March 5th”, your API returns “2024-03-05T14:30:00+05:30”, and your legacy database has “05/03/24”. Python’s built-in datetime.strptime() requires you to specify the exact format string — get it wrong by one character and you get a ValueError. The dateutil library solves this with a smart parser that figures out the format automatically, plus tools for date arithmetic that would take 50 lines in pure Python.

The python-dateutil package extends Python’s standard datetime module. Install it with pip install python-dateutil. It builds on top of datetime objects you already know, so there is no new type system to learn — just more powerful tools. The two most-used features are parser.parse() (smart date parsing) and relativedelta (calendar-aware date arithmetic).

In this article, you will learn how to parse date strings automatically with parser.parse(), compute date differences with relativedelta, generate recurring date sequences with rrule, handle timezones with tzlocal and gettz(), and work through a real-life example that builds an invoice due-date calculator. By the end, you will handle any date format your data throws at you.

Quick Example: Parse Any Date String

Here is the core value proposition of dateutil in 10 lines — no format strings required:

# quick_parse.py
from dateutil import parser

dates = [
    "March 5, 2024",
    "05/03/24",
    "2024-03-05T14:30:00",
    "5th March 2024",
    "Mar 5 2024 2:30PM",
]

for s in dates:
    dt = parser.parse(s)
    print(f"{s!r:35} -> {dt.strftime('%Y-%m-%d %H:%M')}")

Output:

'March 5, 2024'                     -> 2024-03-05 00:00
'05/03/24'                          -> 2024-05-03 00:00
'2024-03-05T14:30:00'               -> 2024-03-05 14:30
'5th March 2024'                    -> 2024-03-05 00:00
'Mar 5 2024 2:30PM'                 -> 2024-03-05 14:30

Notice that "05/03/24" is parsed as May 3 (US date order by default). You can override this with dayfirst=True for European-style dates. The key insight is that parser.parse() always returns a proper datetime.datetime object, so all the standard methods (strftime, timedelta, comparisons) work immediately.

What Is dateutil and Why Use It?

The python-dateutil library is an extension to Python’s built-in datetime module. While datetime gives you the data structures and strptime for format-exact parsing, dateutil adds a fuzzy parser, calendar-aware arithmetic, recurrence rules, and better timezone support.

Taskdatetime stdlibdateutil
Parse unknown formatValueError (need exact format)parser.parse() auto-detects
Add 1 monthManual (30 days approximation)relativedelta(months=1)
Recurring datesWrite your own looprrule() declarative API
Timezone from nameNot built-ingettz(“America/New_York”)
Years/months diffComplex manual calculationrelativedelta(dt1, dt2)

The install is tiny (pip install python-dateutil) and it has no heavy dependencies. It is a standard utility in Django, Boto3, and hundreds of other production libraries.

strptime needs an exact format. parser.parse just works.
strptime needs an exact format. parser.parse just works.

Parsing Date Strings with parser.parse()

The parser.parse() function accepts almost any human-readable date string and returns a datetime object. For ambiguous dates (where day and month could be swapped), use the dayfirst or yearfirst keyword arguments to specify the expected order.

# parse_options.py
from dateutil import parser

# Default: month-first (US style)
us_date = parser.parse("03/05/24")
print(f"US style: {us_date.date()}")  # March 5

# European style: day comes first
eu_date = parser.parse("03/05/24", dayfirst=True)
print(f"EU style: {eu_date.date()}")  # May 3

# ISO 8601 with timezone
iso_tz = parser.parse("2024-03-05T14:30:00+05:30")
print(f"ISO with tz: {iso_tz}")

# Fuzzy parsing: extract date from surrounding text
text = "Meeting scheduled for Wednesday, March 5, 2024 at 3pm."
fuzzy_dt = parser.parse(text, fuzzy=True)
print(f"Fuzzy: {fuzzy_dt.strftime('%Y-%m-%d %H:%M')}")

# Get both the datetime and the tokens that were ignored
dt, tokens = parser.parse(text, fuzzy_with_tokens=True)
print(f"Parsed: {dt.date()}, Ignored: {tokens}")

Output:

US style: 2024-03-05
EU style: 2024-05-03
ISO with tz: 2024-03-05 14:30:00+05:30
Fuzzy: 2024-03-05 15:00
Parsed: 2024-03-05, Ignored: ('Meeting scheduled for Wednesday, ', ' at ', '.')

The fuzzy=True parameter is particularly useful when parsing dates embedded in unstructured text, like email subjects or log entries. It skips non-date words and extracts whatever date information is present. Always wrap parser.parse() in a try/except block when parsing untrusted input — if the string contains no recognizable date, it raises a ValueError.

fuzzy=True: because dates hide in the strangest sentences.
fuzzy=True: because dates hide in the strangest sentences.

Calendar Arithmetic with relativedelta

Python’s timedelta works in days and seconds — it has no concept of months or years. Adding “one month” with timedelta(days=30) is wrong for January (31 days) and February (28/29 days). The relativedelta class from dateutil handles calendar-aware arithmetic correctly.

# relativedelta_examples.py
from datetime import datetime
from dateutil.relativedelta import relativedelta

base = datetime(2024, 1, 31)  # January 31

# Add one month -- dateutil handles end-of-month correctly
next_month = base + relativedelta(months=1)
print(f"Jan 31 + 1 month = {next_month.date()}")  # Feb 29 (2024 is leap year)

# Add 2 years, 3 months, 5 days at once
future = base + relativedelta(years=2, months=3, days=5)
print(f"Jan 31 + 2y 3m 5d = {future.date()}")

# Difference between two dates in years, months, days
birth = datetime(1990, 6, 15)
today = datetime(2024, 3, 5)
age = relativedelta(today, birth)
print(f"Age: {age.years} years, {age.months} months, {age.days} days")

# Next birthday
this_year_birthday = birth.replace(year=today.year)
if this_year_birthday < today:
    next_birthday = this_year_birthday + relativedelta(years=1)
else:
    next_birthday = this_year_birthday
days_to_birthday = (next_birthday - today).days
print(f"Days to next birthday: {days_to_birthday}")

Output:

Jan 31 + 1 month = 2024-02-29
Jan 31 + 2y 3m 5d = 2026-05-05
Age: 33 years, 8 months, 18 days
Days to next birthday: 102

The key difference from timedelta: when you add one month to January 31, relativedelta gives you the last valid day of February rather than overflowing into March. This matches how humans think about "one month later". The two-argument form relativedelta(date1, date2) gives the exact calendar difference in years, months, and days -- what timedelta cannot do.

Generating Recurring Dates with rrule

The rrule module implements the iCalendar (RFC 5545) recurrence rule specification. It generates sequences of dates based on rules like "every Monday", "the last business day of each month", or "the third Thursday of every quarter". This is exactly what calendar applications use.

# rrule_examples.py
from dateutil.rrule import rrule, WEEKLY, MONTHLY, MO, FR, BYWEEKDAY
from datetime import datetime

start = datetime(2024, 1, 1)

# Every Monday for 5 weeks
mondays = list(rrule(WEEKLY, count=5, byweekday=MO, dtstart=start))
print("Next 5 Mondays:")
for d in mondays:
    print(f"  {d.strftime('%Y-%m-%d (%A)')}")

# First Friday of each month for 4 months
first_fridays = list(rrule(MONTHLY, count=4, byweekday=FR(1), dtstart=start))
print("\nFirst Friday of each month:")
for d in first_fridays:
    print(f"  {d.strftime('%Y-%m-%d (%A)')}")

# Every 2 weeks on Tuesday and Thursday, 6 occurrences
biweekly = list(rrule(WEEKLY, interval=2, count=6,
                       byweekday=[1, 3], dtstart=start))  # 1=Tue, 3=Thu
print("\nBiweekly Tue/Thu:")
for d in biweekly:
    print(f"  {d.strftime('%Y-%m-%d (%A)')}")

Output:

Next 5 Mondays:
  2024-01-01 (Monday)
  2024-01-08 (Monday)
  2024-01-15 (Monday)
  2024-01-22 (Monday)
  2024-01-29 (Monday)

First Friday of each month:
  2024-01-05 (Friday)
  2024-02-02 (Friday)
  2024-03-01 (Friday)
  2024-04-05 (Friday)

Biweekly Tue/Thu:
  2024-01-02 (Tuesday)
  2024-01-04 (Thursday)
  2024-01-16 (Tuesday)
  2024-01-18 (Thursday)
  2024-01-30 (Tuesday)
  2024-02-01 (Thursday)

rrule is lazy by default -- it generates dates on demand. For large sequences, iterate over the rule object instead of calling list(). The between() method is useful for finding all occurrences in a date range: list(rule.between(start, end)).

rrule speaks iCalendar. Your calendar app would be proud.
rrule speaks iCalendar. Your calendar app would be proud.

Timezone Handling with tzlocal and gettz

Timezone handling is where many date libraries fall short. dateutil provides gettz() to look up any IANA timezone by name, and tzlocal() to get the system's local timezone. These integrate directly with Python's datetime.replace() and astimezone().

# timezone_handling.py
from datetime import datetime
from dateutil import tz

# Get timezone objects by IANA name
eastern = tz.gettz("America/New_York")
utc = tz.UTC
mumbai = tz.gettz("Asia/Kolkata")

# Create a timezone-aware datetime in Eastern Time
eastern_dt = datetime(2024, 3, 5, 9, 0, tzinfo=eastern)
print(f"Eastern: {eastern_dt}")

# Convert to UTC
utc_dt = eastern_dt.astimezone(utc)
print(f"UTC:     {utc_dt}")

# Convert to Mumbai time
mumbai_dt = eastern_dt.astimezone(mumbai)
print(f"Mumbai:  {mumbai_dt}")

# Parse a timezone-aware string and convert
raw = "2024-03-05T14:30:00+05:30"
parsed = parser.parse(raw) if False else datetime.fromisoformat(raw)
in_ny = parsed.astimezone(eastern)
print(f"\n+05:30 time in New York: {in_ny.strftime('%Y-%m-%d %H:%M %Z')}")

# Check if currently in DST
is_dst = bool(eastern_dt.dst())
print(f"Eastern DST active: {is_dst}")

Output:

Eastern: 2024-03-05 09:00:00-05:00
UTC:     2024-03-05 14:00:00+00:00
Mumbai:  2024-03-05 19:30:00+05:30
+05:30 time in New York: 2024-03-05 04:00 EST
Eastern DST active: False

Always use timezone-aware datetimes when storing or comparing times across different regions. The string "2024-03-05 09:00" is ambiguous -- 9am where? Attach a timezone when you create the datetime and conversions become automatic.

Real-Life Example: Invoice Due-Date Calculator

Here is a practical example that uses parser.parse, relativedelta, and rrule together to build an invoice due-date calculator with payment reminder schedules:

# invoice_calculator.py
from dateutil import parser as dtparser
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrule, WEEKLY, MO, TU, WE, TH, FR
from datetime import datetime

BUSINESS_DAYS = [MO, TU, WE, TH, FR]

def next_business_day(dt):
    """Advance dt to the next business day if it falls on a weekend."""
    if dt.weekday() >= 5:  # Saturday=5, Sunday=6
        dt = dt + relativedelta(weekday=MO)
    return dt

def calculate_invoice(invoice_date_str: str, net_days: int = 30):
    invoice_date = dtparser.parse(invoice_date_str)
    due_date = next_business_day(invoice_date + relativedelta(days=net_days))

    # Generate weekly reminder dates (business days only)
    reminder_start = invoice_date + relativedelta(weeks=2)
    reminders = list(rrule(
        WEEKLY,
        dtstart=reminder_start,
        until=due_date - relativedelta(days=1),
        byweekday=MO  # Remind every Monday
    ))

    print(f"Invoice Date : {invoice_date.strftime('%B %d, %Y')}")
    print(f"Net Terms    : {net_days} days")
    print(f"Due Date     : {due_date.strftime('%B %d, %Y (%A)')}")
    print(f"\nPayment Reminders ({len(reminders)} total):")
    today = datetime.now()
    for r in reminders:
        days_from_today = (r - today).days
        status = "upcoming" if days_from_today > 0 else "past"
        print(f"  {r.strftime('%Y-%m-%d (%A)')} -- {status}")

    # Calculate if overdue
    overdue = relativedelta(today, due_date)
    if today > due_date:
        print(f"\nOVERDUE by {overdue.days} days!")
    else:
        days_remaining = (due_date - today).days
        print(f"\nDays remaining: {days_remaining}")

    return due_date

# Test with different invoice formats
print("=== Invoice 1 ===")
calculate_invoice("January 15, 2024", net_days=30)

print("\n=== Invoice 2 ===")
calculate_invoice("2024-02-01", net_days=60)

Output:

=== Invoice 1 ===
Invoice Date : January 15, 2024
Net Terms    : 30 days
Due Date     : February 14, 2024 (Wednesday)

Payment Reminders (2 total):
  2024-01-29 (Monday) -- past
  2024-02-05 (Monday) -- past

Days remaining: -20

=== Invoice 2 ===
Invoice Date : February 01, 2024
Net Terms    : 60 days
Due Date     : April 01, 2024 (Monday)

Payment Reminders (4 total):
  2024-02-12 (Monday) -- past
  2024-02-19 (Monday) -- past
  2024-02-26 (Monday) -- past
  2024-03-04 (Monday) -- past

Days remaining: -4

This calculator handles the trickiest edge cases: month-end overflow (via relativedelta), weekend due dates (via next_business_day), and flexible input formats (via parser.parse). You can extend it by adding late fee calculation, reading invoice dates from a CSV with pandas.read_csv(), or emailing reminders with Python's smtplib.

relativedelta knows February only has 28 days. timedelta(days=30) does not.
relativedelta knows February only has 28 days. timedelta(days=30) does not.

Frequently Asked Questions

When should I use strptime vs parser.parse?

Use strptime when you have a fixed, known format and need maximum performance (it is faster than parser.parse). Use parser.parse when the format varies or comes from user input. In batch processing of millions of records where the format is consistent, the speed difference matters. For typical web application or data pipeline usage, parser.parse is more convenient and the performance difference is negligible.

What is the difference between relativedelta and timedelta?

timedelta only works in days and seconds -- it has no concept of months or years. relativedelta understands calendar units. Adding timedelta(days=30) always adds exactly 30 days regardless of the month, while relativedelta(months=1) adds one calendar month and adjusts for the actual length of that month. For anything involving months or years, always use relativedelta.

How reliable is fuzzy parsing?

Fuzzy parsing is good at extracting dates from natural language text but is not perfect. It can make wrong guesses when text contains numbers that look like dates but are not (e.g., "Revenue grew 12/5 percent"). Always validate the result with business logic -- check that the parsed date is in a reasonable range. Use fuzzy_with_tokens=True to see what parts of the string were ignored, which helps you catch parsing errors.

Should I use count or until with rrule?

Use count when you know how many occurrences you need ("next 10 Mondays"). Use until when you know the end date ("every Monday through December 31"). Never call list() on an rrule without one of these limits -- it will try to generate an infinite sequence. The between(start, end) method is also useful when you need all occurrences in a specific date range.

How do I make a naive datetime timezone-aware?

Use datetime.replace(tzinfo=tz.gettz("America/New_York")) to attach a timezone to a naive datetime. Do NOT use astimezone() on a naive datetime -- it will interpret the datetime as local system time, which may not be what you want. Always be explicit about what timezone a naive datetime represents before converting it to an aware datetime.

How do I serialize a datetime with timezone to a string?

Use dt.isoformat() to get an ISO 8601 string like "2024-03-05T14:30:00-05:00", or dt.strftime('%Y-%m-%d %H:%M:%S %Z') for a human-readable format. For databases, ISO 8601 is the safest format. For JSON APIs, isoformat() is standard and parser.parse() can round-trip it back.

Conclusion

The python-dateutil library closes the gap between Python's standard datetime module and the messy reality of real-world date data. You have seen how parser.parse() handles arbitrary date string formats without requiring a format specifier, how relativedelta performs calendar-correct arithmetic with months and years, how rrule generates recurring date sequences using iCalendar syntax, and how gettz() and tzlocal() simplify timezone conversions.

For your next project, try extending the invoice calculator to read from a CSV file and output an Excel report using openpyxl, or use rrule to generate a meeting schedule and export it as an ICS file that any calendar app can import. The official documentation is at dateutil.readthedocs.io.

How To Use Python attrs Library for Data Classes Without Boilerplate

How To Use Python attrs Library for Data Classes Without Boilerplate

Intermediate

You have a class that holds data — a User, a Product, an APIResponse. Without attrs, you write __init__, __repr__, __eq__, add type validation manually, and somehow end up with forty lines of boilerplate before a single line of real logic. The attrs library fixes all of this with a single decorator and a handful of field definitions.

The attrs library (installed as attr or the newer attrs package) generates everything you need automatically: constructors, comparison methods, string representations, and more. It also gives you built-in validators and converters that run at assignment time, which pure Python dataclasses do not provide out of the box. You install it with pip install attrs.

In this article, you will learn how to define attrs classes with the @define and @attrs decorators, how to add validators and converters to fields, how to use frozen classes for immutability, how slots work for memory efficiency, and how to compare attrs to Python’s built-in dataclasses. A real-life example at the end builds a validated configuration loader using attrs classes.

Quick Example: attrs in 30 Seconds

Here is the minimal attrs pattern — a data class defined in four lines with auto-generated __init__, __repr__, and __eq__:

# quick_attrs.py
import attr

@attr.s(auto_attribs=True)
class Point:
    x: float
    y: float

p1 = Point(1.0, 2.0)
p2 = Point(1.0, 2.0)
print(p1)
print(p1 == p2)
print(p1.x)

Output:

Point(x=1.0, y=2.0)
True
1.0

The @attr.s(auto_attribs=True) decorator reads the class-level type annotations and turns them into attrs fields automatically. Python generates __init__, __repr__, and __eq__ for you. With the newer API (attrs package >= 20.1.0), you can use @attr.define instead, which is the recommended approach for new code.

The sections below cover validators, converters, defaults, slots, frozen classes, and the differences between the old @attr.s API and the modern @attr.define API.

What Is attrs and Why Use It?

The attrs library is a Python package that takes the tedium out of writing data classes. When you decorate a class with @attr.define or @attr.s, it inspects the class body, identifies the fields you have declared, and automatically generates all the standard dunder methods your class needs.

Python 3.7 introduced dataclasses as a stdlib alternative, but attrs predates it and offers features that dataclasses still lack: built-in validators, converters, and __slots__ support without extra boilerplate. Many production libraries — including Hypothesis and cattrs — are built on attrs.

Featureattrsdataclassesplain class
Auto __init__YesYesNo
Auto __repr__YesYesNo
Auto __eq__YesYesNo
Built-in validatorsYesNoNo
Built-in convertersYesNoNo
__slots__ supportYes (one kwarg)VerboseManual
Frozen (immutable)YesYesManual
Third-partyYes (pip install)No (stdlib)No

The choice comes down to your project’s needs. For simple value objects, dataclasses is fine and avoids a dependency. When you need runtime validation, type coercion, or slots for large object collections, attrs is the better tool.

Forty lines of boilerplate. One decorator. The math is straightforward.
Forty lines of boilerplate. One decorator. The math is straightforward.

Defining Classes with @attr.define

The modern attrs API uses @attr.define (or equivalently @attrs.define from the attrs package). This is the recommended approach for all new code. It enables __slots__, disables hash generation by default (since mutable objects should not be hashed), and uses the field() function for advanced configuration.

# define_class.py
import attr

@attr.define
class User:
    name: str
    email: str
    age: int = 0

u = User(name="Alice", email="alice@example.com", age=30)
print(u)
print(repr(u))

Output:

User(name='Alice', email='alice@example.com', age=30)
User(name='Alice', email='alice@example.com', age=30)

Fields without a default must come before fields with defaults, just like function parameters. The age=0 default means you can construct a User without providing an age. The @attr.define decorator uses slots internally by default, which is why you cannot add arbitrary attributes after construction — only declared fields are allowed.

Using attr.field() for Advanced Field Options

When a simple annotation is not enough, attr.field() gives you full control over a field’s behavior. You can set the default, mark a field as not included in __repr__, exclude it from comparisons, or add a factory default for mutable defaults:

# advanced_fields.py
import attr
from typing import List

@attr.define
class Team:
    name: str
    members: List[str] = attr.Factory(list)
    _internal_id: int = attr.field(default=0, repr=False, alias="_internal_id")

t = Team(name="Backend")
t.members.append("Alice")
t.members.append("Bob")
print(t)
print(t.members)

Output:

Team(name='Backend', members=['Alice', 'Bob'])
['Alice', 'Bob']

Notice attr.Factory(list) — you must use this instead of default=[] for mutable defaults. Using a bare list as a default would share the same list across all instances, which is a classic Python gotcha. The repr=False on _internal_id keeps internal fields out of the printed representation.

Adding Validators to Fields

Validators are the killer feature that separates attrs from plain dataclasses. A validator is a function that runs at construction time (and optionally at assignment time) and raises an exception if the value is invalid. You attach a validator to a field with attr.field(validator=...) or the @field_name.validator decorator.

# validators.py
import attr

@attr.define
class Product:
    name: str = attr.field()
    price: float = attr.field()
    quantity: int = attr.field()

    @name.validator
    def _validate_name(self, attribute, value):
        if not value or len(value.strip()) == 0:
            raise ValueError(f"{attribute.name} cannot be empty")

    @price.validator
    def _validate_price(self, attribute, value):
        if value < 0:
            raise ValueError(f"{attribute.name} must be non-negative, got {value}")

    @quantity.validator
    def _validate_quantity(self, attribute, value):
        if not isinstance(value, int) or value < 0:
            raise ValueError(f"{attribute.name} must be a non-negative integer")

# Valid usage
p = Product(name="Widget", price=9.99, quantity=100)
print(p)

# Invalid usage -- triggers validator
try:
    bad = Product(name="", price=9.99, quantity=5)
except ValueError as e:
    print(f"Error: {e}")

try:
    bad2 = Product(name="Widget", price=-5.0, quantity=5)
except ValueError as e:
    print(f"Error: {e}")

Output:

Product(name='Widget', price=9.99, quantity=100)
Error: name cannot be empty
Error: price must be non-negative, got -5.0

The validator method receives three arguments: self (the instance being created), attribute (an Attribute object with metadata like attribute.name), and value (the value being assigned). Raise any exception you like -- ValueError and TypeError are the most common choices. attrs also ships several built-in validators in attr.validators: instance_of(), in_(), matches_re(), and more.

Validators run before the object exists. Bad data never makes it in.
Validators run before the object exists. Bad data never makes it in.

Using Built-In Validators

Writing custom validator functions every time gets repetitive for common cases. The attr.validators module ships a set of composable validators you can combine with attr.validators.and_():

# builtin_validators.py
import attr

@attr.define
class Config:
    host: str = attr.field(validator=attr.validators.instance_of(str))
    port: int = attr.field(
        validator=[
            attr.validators.instance_of(int),
            attr.validators.in_(range(1, 65536))
        ]
    )
    log_level: str = attr.field(
        validator=attr.validators.in_(["DEBUG", "INFO", "WARNING", "ERROR"])
    )

c = Config(host="localhost", port=8080, log_level="INFO")
print(c)

try:
    bad = Config(host="localhost", port=99999, log_level="INFO")
except ValueError as e:
    print(f"Error: {e}")

Output:

Config(host='localhost', port=8080, log_level='INFO')
Error: ("'port' must be in [range(1, 65536)] (got 99999 that is a ).", ...)

Passing a list to the validator argument runs all validators in sequence -- all must pass. attr.validators.in_ works with any container: lists, sets, ranges, enums. This composability makes it easy to express complex constraints without writing boilerplate.

Using Converters to Coerce Input Types

A converter is a function that transforms the value before it is stored on the instance. Unlike validators (which reject bad input), converters reshape input into the form you need. This is invaluable when your class receives data from JSON, config files, or web APIs where types are not guaranteed.

# converters.py
import attr

@attr.define
class Measurement:
    value: float = attr.field(converter=float)
    unit: str = attr.field(converter=str.lower)
    label: str = attr.field(converter=str.strip)

# Integers are silently converted to floats
m = Measurement(value="42", unit="KG", label="  body weight  ")
print(m)
print(type(m.value))

Output:

Measurement(value=42.0, unit='kg', label='body weight')

The converter runs before validators, so you can combine both: convert the raw input first, then validate the clean result. In the example above, "42" (a string) is converted to 42.0 (a float), and the unit is normalized to lowercase. This makes your class robust to the messiness of real-world input without cluttering your business logic with type-coercion code.

Converters run first. By the time the validator sees the value, it is already the right type.
Converters run first. By the time the validator sees the value, it is already the right type.

Frozen Classes and __slots__

Two optimizations in attrs that you get almost for free: frozen=True makes instances immutable (useful for dictionary keys and thread safety), and slots (enabled by default in @attr.define) dramatically reduce memory usage for classes with many instances.

# frozen_slots.py
import attr

@attr.define(frozen=True)
class Color:
    red: int = attr.field(validator=attr.validators.in_(range(256)))
    green: int = attr.field(validator=attr.validators.in_(range(256)))
    blue: int = attr.field(validator=attr.validators.in_(range(256)))

    def as_hex(self):
        return f"#{self.red:02x}{self.green:02x}{self.blue:02x}"

white = Color(255, 255, 255)
print(white)
print(white.as_hex())

# Frozen -- mutation raises FrozenInstanceError
try:
    white.red = 0
except attr.exceptions.FrozenInstanceError as e:
    print(f"Error: {e}")

# Can be used as a dictionary key because it is hashable
palette = {white: "background"}
print(palette[Color(255, 255, 255)])

Output:

Color(red=255, green=255, blue=255)
#ffffff
Error: can't set attribute
background

Frozen attrs classes auto-generate __hash__, making them suitable as dict keys or set members. The __slots__ mechanism (used by @attr.define by default) tells Python to store instance attributes in a fixed array rather than a per-instance __dict__, reducing memory usage by roughly 30--40% for classes with many instances. This is meaningful when you're creating thousands of objects (e.g., parsing large datasets).

Evolving Instances with attr.evolve()

Frozen instances cannot be mutated, but you often need a modified copy. attr.evolve() creates a new instance with selected fields changed, leaving all other fields identical to the original. It is attrs' equivalent of dataclasses' replace().

# evolve.py
import attr

@attr.define(frozen=True)
class ServerConfig:
    host: str
    port: int
    debug: bool = False

prod = ServerConfig(host="prod.example.com", port=443, debug=False)
dev = attr.evolve(prod, host="localhost", port=8080, debug=True)

print(prod)
print(dev)
print(prod is dev)  # Different objects

Output:

ServerConfig(host='prod.example.com', port=443, debug=False)
ServerConfig(host='localhost', port=8080, debug=True)
False

This pattern is extremely useful in functional-style code where you want immutable data structures but need to derive updated versions of them. It also ensures validators re-run on the evolved fields, so you cannot accidentally create an invalid instance through evolve.

Frozen doesn't mean final. It means you need a new copy to make changes.
Frozen doesn't mean final. It means you need a new copy to make changes.

Real-Life Example: Validated Configuration Loader

Here is a practical example that ties together attrs classes, validators, converters, and attr.evolve() to build a configuration loader that reads settings from a dictionary (as you would get from a JSON or YAML file) and validates them on load:

# config_loader.py
import attr
import json
from typing import Optional

@attr.define(frozen=True)
class DatabaseConfig:
    host: str = attr.field(converter=str.strip)
    port: int = attr.field(
        converter=int,
        validator=[attr.validators.instance_of(int),
                   attr.validators.in_(range(1, 65536))]
    )
    name: str = attr.field(converter=str.strip)
    pool_size: int = attr.field(
        default=5,
        converter=int,
        validator=attr.validators.in_(range(1, 101))
    )

    @host.validator
    def _check_host(self, attribute, value):
        if not value:
            raise ValueError("Database host cannot be empty")

@attr.define(frozen=True)
class AppConfig:
    database: DatabaseConfig
    debug: bool = attr.field(converter=bool, default=False)
    log_level: str = attr.field(
        default="INFO",
        validator=attr.validators.in_(["DEBUG", "INFO", "WARNING", "ERROR"])
    )

def load_config(raw: dict) -> AppConfig:
    db_raw = raw.get("database", {})
    db = DatabaseConfig(
        host=db_raw.get("host", ""),
        port=db_raw.get("port", 5432),
        name=db_raw.get("name", ""),
        pool_size=db_raw.get("pool_size", 5),
    )
    return AppConfig(
        database=db,
        debug=raw.get("debug", False),
        log_level=raw.get("log_level", "INFO"),
    )

# Simulate loading from a JSON file
json_input = '''
{
    "database": {"host": "  db.prod.example.com  ", "port": "5432", "name": "myapp"},
    "debug": false,
    "log_level": "WARNING"
}
'''
config = load_config(json.loads(json_input))
print(config)
print(config.database.host)  # Stripped by converter

# Create a dev variant
dev_config = attr.evolve(
    config,
    debug=True,
    log_level="DEBUG",
    database=attr.evolve(config.database, host="localhost", port=5432)
)
print(dev_config)

# Bad config raises immediately
try:
    bad = load_config({"database": {"host": "", "port": 5432, "name": "myapp"}})
except ValueError as e:
    print(f"Config error: {e}")

Output:

AppConfig(database=DatabaseConfig(host='db.prod.example.com', port=5432, name='myapp', pool_size=5), debug=False, log_level='WARNING')
db.prod.example.com
AppConfig(database=DatabaseConfig(host='localhost', port=5432, name='myapp', pool_size=5), debug=True, log_level='DEBUG')
Config error: Database host cannot be empty

This pattern -- loading raw dicts into validated attrs classes -- is the core of any robust configuration layer. Bad configuration fails immediately at startup rather than causing mysterious runtime errors later. You can extend this by adding a from_file() classmethod that reads JSON or YAML and calls load_config(), or by adding environment variable overrides with attr.evolve().

attrs: dataclasses but more flexible.
attrs: dataclasses but more flexible.

Frequently Asked Questions

Should I use attrs or dataclasses?

Use dataclasses if you want zero additional dependencies and only need auto-generated __init__, __repr__, and __eq__. Use attrs when you need built-in validators, converters, or memory-efficient slots without extra boilerplate. attrs also predates dataclasses and has a more mature ecosystem (cattrs for serialization, Hypothesis integration, etc.).

What is the difference between @attr.s and @attr.define?

@attr.s (or @attr.attrs) is the legacy API. @attr.define is the modern API introduced in attrs 20.1.0 and is recommended for all new code. Key differences: @attr.define enables __slots__ by default, disables hash generation for mutable classes, and uses a cleaner decorator signature. The attr.ib() function is the legacy equivalent of attr.field().

Do validators run when I set an attribute after construction?

With @attr.define (which uses slots and does not enable on_setattr by default), validators run at construction time only. To enable validation on every assignment, pass on_setattr=attr.setters.validate to the field: attr.field(on_setattr=attr.setters.validate). For fully immutable classes, use frozen=True -- mutation is then impossible, so the question is moot.

How do I serialize an attrs class to JSON?

Use attr.asdict(instance) to convert an attrs instance to a plain dictionary, then pass that to json.dumps(). For the reverse (dict to attrs), use MyClass(**data_dict) or the cattrs library for more complex type conversions. cattrs (companion library, separate package) handles nested attrs classes, lists, and optional fields automatically.

Does attrs support inheritance?

Yes, attrs classes can inherit from each other. Subclass fields are appended after the parent's fields in the generated __init__. However, mixing @attr.define (slots-based) with non-slots parent classes can cause issues. For clean inheritance, use @attr.define consistently throughout the class hierarchy or pass slots=False if you need to inherit from a plain class.

How do I handle Optional fields with attrs?

Declare the field type as Optional[str] and set the default to None: name: Optional[str] = attr.field(default=None). If you want to validate that it is either None or a string, use attr.validators.optional(attr.validators.instance_of(str)) -- the optional() wrapper short-circuits when the value is None.

Conclusion

The attrs library eliminates the boilerplate that comes with writing data classes in Python while adding features that the stdlib's dataclasses module does not offer. You have seen how to define classes with @attr.define, configure fields with attr.field(), enforce constraints with validators (@field.validator, attr.validators.instance_of(), attr.validators.in_()), coerce types with converters, create immutable instances with frozen=True, and derive modified copies with attr.evolve().

To go further, try extending the configuration loader example to read from environment variables or YAML files, and use cattrs to handle serialization and deserialization of nested attrs objects. The real power of attrs shows up in large codebases where hundreds of small data classes would otherwise each require their own boilerplate methods.

Official documentation: attrs documentation at attrs.org. The cattrs library is worth exploring as a companion for serialization.

How To Use Python zipapp to Create Executable Python Archives

How To Use Python zipapp to Create Executable Python Archives

Intermediate

Sharing a Python script with a colleague is easy until it has dependencies. Suddenly you need to say “first install Python, then create a virtual environment, then pip install these five packages, then run the script.” Half the time something breaks on their machine because they are on a different OS version, have conflicting packages, or simply misread one of the steps. You end up spending more time on setup instructions than on the actual tool you built.

Python’s built-in zipapp module solves the distribution problem for CLI tools and internal utilities. It bundles your application code — and optionally its dependencies — into a single .pyz file that runs with python myapp.pyz on any machine that has Python installed. No virtual environment, no pip install, no path configuration. One file, one command.

In this article you will learn how to create a basic zipapp from the command line, bundle dependencies into the archive, set a custom entry point, add a shebang line for Unix direct execution, work around the limitations of compiled extensions, and finish with a real-life example: packaging a CSV-to-JSON converter as a portable .pyz file. By the end you will be distributing Python tools as single files the way Go developers take for granted.

zipapp Quick Example

Here is the minimal workflow — a Python script bundled into a .pyz archive that runs on any machine with Python 3:

# Step 1: create the source directory
# myapp/
#   __main__.py
#   utils.py

# myapp/__main__.py
def main():
    from myapp.utils import greet
    greet("World")

if __name__ == "__main__":
    main()
# myapp/utils.py
def greet(name: str) -> None:
    print(f"Hello, {name}! Running from a .pyz archive.")
# quick_build.py -- run this once to create the archive
import zipapp
zipapp.create_archive("myapp", target="myapp.pyz", interpreter="/usr/bin/env python3")
print("Created myapp.pyz")

Then run it:

$ python myapp.pyz
Hello, World! Running from a .pyz archive.

The .pyz file is a ZIP archive with a Python shebang prepended. Python’s import system knows how to find modules inside ZIP archives (PEP 302), so all imports work exactly as if the files were on disk. The __main__.py file is the entry point that runs when you execute the archive.

What Is zipapp and When Should You Use It?

A Python zip application (.pyz) is a ZIP file that starts with a shebang line (#!/usr/bin/env python3) and contains a __main__.py as its entry point. Python has supported executing ZIP archives since Python 2.6 (PEP 441). The zipapp module, added in Python 3.5, automates the creation process so you do not have to build the ZIP manually.

Distribution methodSingle fileDependencies bundledPython requiredCross-platform
zipapp (.pyz)YesYes (pure Python)YesYes
PyInstallerYesYes (incl. C extensions)NoNo (per-OS build)
pip packageNoVia dependenciesYesYes
Docker imageNoYes (everything)NoPartial

zipapp is the right choice for internal tools, CI scripts, and developer utilities where you know the target machine has Python and you want to share one file over Slack or email. It is not suitable for end-user consumer apps (use PyInstaller or packaging) or for tools that require C extension dependencies (use Docker).

Creating Archives from the Command Line

The zipapp module has a command-line interface that handles the most common cases without writing any Python:

# Command-line usage (run in your terminal, not Python)

# Basic: package a directory into a .pyz
$ python -m zipapp myapp -o myapp.pyz

# With a custom entry point (module:callable syntax)
$ python -m zipapp myapp -o myapp.pyz -m "myapp.cli:main"

# With a shebang for direct Unix execution (chmod +x myapp.pyz first)
$ python -m zipapp myapp -o myapp.pyz -p "/usr/bin/env python3" -m "myapp.cli:main"

# Inspect an existing archive's entry point
$ python -m zipapp --info myapp.pyz
Interpreter: /usr/bin/env python3
Main: myapp.cli:main

The -m "module:callable" flag writes a __main__.py shim that imports and calls the specified function. This means you do not need to write a __main__.py yourself — zipapp generates it. The generated __main__.py looks like:

# Generated __main__.py (created by -m flag)
import sys
sys.exit(__import__("myapp.cli").cli.main())

This is exactly what you would write manually, but you get it for free by specifying the entry point flag.

Bundling Dependencies into the Archive

To bundle third-party pure-Python packages, install them into a subdirectory, then include that directory when building the archive. The key is that everything inside the archive ZIP is available on the import path when the archive runs:

# Project layout for a CLI that uses 'requests'
myapp/
├── myapp/
│   ├── __init__.py
│   └── cli.py
└── deps/                # we'll install pure-Python deps here

# Install dependencies INTO the project, not site-packages
python -m pip install --target deps requests

# Copy our app into the same staging directory
cp -r myapp build/
cp -r deps/* build/

# Build the archive — 'build/' becomes the zipapp root
python -m zipapp build -p "/usr/bin/env python3" -m "myapp.cli:main" -o myapp.pyz

# Run it
./myapp.pyz

The trick is that --target deps installs packages as plain folders (no virtualenv wrapper), so they can be copied alongside your code into the staging directory. At runtime, Python’s import system finds them inside the ZIP because the archive itself is on sys.path.

Important caveat: this works for pure-Python packages only. Anything that ships a compiled C extension (numpy, lxml, psycopg2, Pillow, etc.) needs the binary .so/.pyd files on disk — they can’t be imported from inside a ZIP without extracting first. For those, you need a heavier tool like PyInstaller, or you bundle a stub that extracts to a temp dir on first run.

zipapp vs PyInstaller vs Other Single-File Tools

If you’re shipping Python to a machine, here’s how the major options stack up:

ToolOutputNeeds Python on target?Handles C extensions?Typical size
zipapp (stdlib)Single .pyz fileYesNo (pure Python only)10–500 KB
PyInstallerStandalone .exe / binaryNoYes10–80 MB
ShivSingle .pyz (better than zipapp)YesLimited (extracts on first run)1–20 MB
PEXSingle .pex (built on zipapp)YesYes (extracts on first run)1–50 MB
NuitkaCompiled C binaryNoYes20–100 MB

Rule of thumb: zipapp for internal tools and dev-environment scripts. PyInstaller or Nuitka when the target machine doesn’t have Python. Shiv when you want zipapp’s simplicity but need real dependency bundling with C extensions.

A Real Example: Shipping a Self-Contained CLI

Here’s an end-to-end build of a small command-line tool. The CLI fetches a URL and prints the response body length:

# File: myapp/cli.py
import sys
import urllib.request

def main():
    if len(sys.argv) != 2:
        print("Usage: myapp.pyz URL", file=sys.stderr)
        return 1
    url = sys.argv[1]
    with urllib.request.urlopen(url, timeout=10) as r:
        body = r.read()
    print(f"{len(body)} bytes from {url}")
    return 0

if __name__ == "__main__":
    sys.exit(main())
# Build script (build.sh)
#!/usr/bin/env bash
set -e

rm -rf build myapp.pyz
mkdir build
cp -r myapp build/

python -m zipapp build \
    -p "/usr/bin/env python3" \
    -m "myapp.cli:main" \
    -c \
    -o myapp.pyz

echo "Built myapp.pyz ($(stat -c%s myapp.pyz) bytes)"

# Test it
./myapp.pyz https://example.com

The -c flag enables compression — useful for shipping over slow networks. For internal use you can leave it off and gain a small speed boost since the archive doesn’t have to decompress at import time.

Common Pitfalls

  • Forgetting the shebang. Without -p "/usr/bin/env python3" the archive isn’t directly executable. You have to run it with python myapp.pyz, which defeats most of the point.
  • Trying to bundle compiled wheels. If you see ImportError: cannot import name '_speedups' at runtime, you have a C extension in your dependency tree. Switch to Shiv, PEX, or PyInstaller — zipapp can’t help.
  • Relative file paths inside the archive. Code that does open("data.txt") assumes a regular filesystem. Inside a zipapp, you need importlib.resources.files(__package__).joinpath("data.txt").read_text(), which works whether you’re zipped or not.
  • Confusing __file__. os.path.dirname(__file__) inside a zipapp returns a path that LOOKS like a directory but isn’t a real filesystem location. Refactor anything that walks __file__ to use importlib.resources instead.
  • The archive isn’t a wheel. You can’t pip install a .pyz file — it’s a runnable bundle, not a package. If you want pip-installable, build a wheel separately.

FAQ

Q: Does the target machine need the same Python version?
A: It needs a Python that can run your code — typically the same MAJOR version. A .pyz built using 3.10 syntax (match statements, etc.) won’t run on 3.9. The shebang line /usr/bin/env python3 just picks “any Python 3” — be specific with python3.11 if you rely on newer features.

Q: Can I edit the contents of a .pyz file?
A: Yes — it’s a regular ZIP file. unzip -l myapp.pyz lists contents, and standard ZIP tools can update individual files. Useful for hot-fixes when you can’t rebuild from source.

Q: Is zipapp faster or slower than running from source?
A: Slightly slower on the first run because Python has to read the ZIP table of contents, and slightly faster on subsequent runs since the OS caches the single file in memory. The difference is in milliseconds for normal apps.

Q: How big can a zipapp get?
A: Practically, anything up to a few hundred MB works. The format itself has a 4GB limit (ZIP64). For larger payloads, embed data outside the archive and reference it from a known location.

Q: Why use zipapp over a single-file Python script?
A: Two reasons: (1) you have multiple modules, (2) you want to bundle a few pure-Python dependencies. For genuinely single-file scripts, just ship the .py — that’s the simplest deployment in existence.

Wrapping Up

Python’s zipapp module is one of those tools that does exactly one thing well: bundle pure-Python code into a single executable file. It’s been in the standard library since 3.5 and there’s no setup, no extra dependencies, no hidden magic. For internal tools, build artifacts, and dev environments where Python is already installed, .pyz files are the simplest way to ship code that “just runs”. For everything more complex — C extensions, target machines without Python, true single-binary distribution — reach for Shiv, PyInstaller, or Nuitka instead.

The official zipapp documentation covers the API in full, including the programmatic interface (zipapp.create_archive()) for build scripts.

How To Use Python string Module for Templates and Constants

How To Use Python string Module for Templates and Constants

Beginner

Most Python developers know f-strings and str.format() for string interpolation. But Python’s standard library also ships a string module with three tools that solve problems f-strings cannot: safe templating for user-facing text where untrusted users supply the template, a clean set of character constants for validation and generation tasks, and a Formatter class for building custom format mini-languages. These tools fill specific gaps that show up in real production code, and knowing they exist saves you from reinventing them.

The string module is part of the Python standard library — no installation required. It is a compact module with no heavy dependencies, and every class in it can be imported and used in two or three lines. The module is not something you reach for every day, but when the need arises it solves the problem in exactly the right way.

In this article you will learn all three components of the string module in depth. We will start with string.Template and its safe substitution method, then walk through the character constant strings, build a password generator with them, implement a custom Formatter subclass, and finish with a real-life example: a notification template engine that assembles user-facing messages safely. Every section includes runnable code and expected output.

string Module Quick Example

Here is the three-line version — template substitution, a character constant, and one constant-driven function all at once:

# quick_string_module.py
import string

# Template substitution -- safer than f-strings for user-supplied templates
tmpl = string.Template("Hello $name, your order #$order_id is ready.")
print(tmpl.substitute(name="Alice", order_id=7842))

# Check that a token uses only URL-safe characters
safe_chars = string.ascii_letters + string.digits + "-_"
token = "user-abc_123"
print("Token valid:", all(c in safe_chars for c in token))

Output:

Hello Alice, your order #7842 is ready.
Token valid: True

Two useful operations in six lines, no imports beyond the standard library. The sections below unpack each component with more realistic examples and edge cases.

What Is the string Module?

The string module is a small but focused standard library module with three main components:

ComponentWhat it providesWhen to use it
string.Template$-based substitution with safe_substitute()User-supplied templates, config files, untrusted input
Character constantsascii_letters, digits, punctuation, etc.Validation, generation, char-class membership tests
string.FormatterSubclassable custom format engineDomain-specific format mini-languages

Think of it as a toolkit for string operations that are not quite “manipulation” (that is str methods) and not quite “parsing” (that is re or textwrap). The module fills the gap between raw string operations and format strings when you need safe, controlled substitution or well-defined character sets.

string.ascii_lowercase + string.digits. The basics, named.
string.ascii_lowercase + string.digits. The basics, named.

string.Template for Safe Substitution

F-strings and str.format() execute Python expressions: f"{user_input}" will happily call methods or access attributes on any object if the template comes from user input. string.Template supports only simple variable substitution — no method calls, no attribute access, no expressions. That restriction is exactly what makes it safe for user-supplied templates:

# template_demo.py
import string

# Basic substitution with $variable syntax
tmpl = string.Template("Hi $name! You have $count new messages.")
result = tmpl.substitute(name="Bob", count=3)
print(result)

# safe_substitute() leaves missing variables intact instead of raising KeyError
partial = string.Template("Dear $title $surname, your invoice for $$amount is due.")
# Only supply title -- surname is missing, amount uses $$ (literal dollar sign)
print(partial.safe_substitute(title="Dr"))

# ${variable} syntax for disambiguation
ambiguous = string.Template("${user}name is not the same as $username")
print(ambiguous.substitute(user="super", username="admin"))

Output:

Hi Bob! You have 3 new messages.
Dear Dr $surname, your invoice for $amount is due.
superman is not the same as admin

The key distinction is substitute() vs safe_substitute(). The former raises KeyError if a placeholder is missing from the mapping; the latter leaves missing placeholders in the output as literal text. Use substitute() when you control the template and know all variables will be present. Use safe_substitute() when the template comes from a config file or user input and you want to fill in what you can without crashing. The $$ escape sequence produces a literal dollar sign in the output.

Customising the Template Delimiter

You can subclass string.Template to change the delimiter from $ to anything that suits your domain — useful for templates that already contain dollar signs (SQL, shell scripts, Markdown pricing tables):

# custom_template.py
import string

class BraceTemplate(string.Template):
    """Use {{variable}} syntax instead of $variable."""
    delimiter = "{{"
    pattern = r"""
        \{\{(?:
          (?P<escaped>\{\{) |   # Escape sequence: {{{{
          (?P<named>[_a-z][_a-z0-9]*)\}\} |   # {{varname}}
          (?P<braced>[_a-z][_a-z0-9]*)\}\} |
          (?P<invalid>)
        )
    """

# Simpler subclass: just change the delimiter character
class HashTemplate(string.Template):
    delimiter = "#"

tmpl = HashTemplate("Hello #name, total: #amount")
print(tmpl.substitute(name="Alice", amount="$99.95"))

Output:

Hello Alice, total: $99.95

The delimiter class attribute is the single character (or string) that signals the start of a placeholder. By switching to #, dollar signs in the template are now ordinary text. This pattern comes up when you are generating shell scripts (where $VAR is used by the shell itself), SQL queries, or LaTeX documents where every dollar sign has a different meaning.

string.Template: simple substitution without the f-string firepower.
string.Template: simple substitution without the f-string firepower.

Character Constants

The string module exposes a set of pre-built character strings that cover all the common character classes:

# constants_demo.py
import string

print("ascii_letters:", string.ascii_letters[:20], "...")
print("ascii_lowercase:", string.ascii_lowercase)
print("ascii_uppercase:", string.ascii_uppercase)
print("digits:", string.digits)
print("hexdigits:", string.hexdigits)
print("octdigits:", string.octdigits)
print("punctuation:", string.punctuation)
print("whitespace repr:", repr(string.whitespace))
print("printable (first 30):", string.printable[:30])

Output:

ascii_letters: abcdefghijklmnopqrst ...
ascii_lowercase: abcdefghijklmnopqrstuvwxyz
ascii_uppercase: ABCDEFGHIJKLMNOPQRSTUVWXYZ
digits: 0123456789
hexdigits: 0123456789abcdefABCDEF
octdigits: 01234567
punctuation: !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
whitespace repr: ' \t\n\r\x0b\x0c'
printable (first 30): 0123456789abcdefghijklmnopqrst

These are plain strings — you can iterate over them, test membership with in, use them in re.escape(), or pass them directly to random.choices(). They save you from hard-coding character ranges and from subtle bugs like forgetting that hexdigits includes uppercase letters or that octdigits stops at 7.

Password Generator with Constants

The character constants become genuinely useful in generation and validation tasks. Here is a cryptographically strong password generator that uses them:

# password_gen.py
import string
import secrets   # cryptographically secure RNG

def generate_password(
    length: int = 16,
    use_upper: bool = True,
    use_digits: bool = True,
    use_symbols: bool = True,
) -> str:
    """
    Generate a cryptographically secure password.
    Guarantees at least one character from each enabled character class.
    """
    charset = string.ascii_lowercase
    required = [secrets.choice(string.ascii_lowercase)]

    if use_upper:
        charset += string.ascii_uppercase
        required.append(secrets.choice(string.ascii_uppercase))
    if use_digits:
        charset += string.digits
        required.append(secrets.choice(string.digits))
    if use_symbols:
        # Use a safer subset -- exclude visually ambiguous chars
        symbols = "!@#$%^&*()-_=+"
        charset += symbols
        required.append(secrets.choice(symbols))

    # Fill remaining length with random chars from full charset
    remaining = length - len(required)
    password_chars = required + [secrets.choice(charset) for _ in range(remaining)]

    # Shuffle to avoid predictable positions for required chars
    secrets.SystemRandom().shuffle(password_chars)
    return "".join(password_chars)


def validate_password(password: str, min_length: int = 12) -> tuple[bool, list[str]]:
    """Return (is_valid, list_of_failures)."""
    failures = []
    if len(password) < min_length:
        failures.append(f"Too short: {len(password)} < {min_length}")
    if not any(c in string.ascii_uppercase for c in password):
        failures.append("Missing uppercase letter")
    if not any(c in string.digits for c in password):
        failures.append("Missing digit")
    if not any(c in string.punctuation for c in password):
        failures.append("Missing special character")
    return len(failures) == 0, failures


# Generate and validate three passwords
for _ in range(3):
    pwd = generate_password(length=16)
    valid, issues = validate_password(pwd)
    print(f"{pwd}  |  valid={valid}")

Output (example -- passwords are random):

k!R8mP2@qXzV#nLw  |  valid=True
$4uHjW*eNbYv!9Td  |  valid=True
mQ3@rKpZ#2XsVtEa  |  valid=True

The validator uses string.ascii_uppercase, string.digits, and string.punctuation directly in membership tests instead of hard-coding character ranges. If you ever need to adjust which characters count as "special", you change the constant or define your own subset -- the logic stays the same. Using secrets instead of random ensures the generator is cryptographically suitable for security-sensitive passwords.

Formatter class: when you need substitution rules of your own.
Formatter class: when you need substitution rules of your own.

string.Formatter for Custom Format Mini-Languages

string.Formatter is the engine behind str.format() exposed as a class you can subclass. Override format_field() to add custom conversions to your format strings:

# custom_formatter.py
import string

class ReportFormatter(string.Formatter):
    """
    Extends str.format() with custom format codes:
    :curr  -- format as currency   e.g. 1234.5 -> "$1,234.50"
    :pct   -- format as percentage e.g. 0.857  -> "85.7%"
    :yn    -- bool to Yes/No       e.g. True   -> "Yes"
    """

    def format_field(self, value, format_spec):
        if format_spec == "curr":
            return f"${value:,.2f}"
        elif format_spec == "pct":
            return f"{value * 100:.1f}%"
        elif format_spec == "yn":
            return "Yes" if value else "No"
        # Fall back to standard formatting for anything else
        return super().format_field(value, format_spec)


fmt = ReportFormatter()

report_template = (
    "Customer: {name}\n"
    "Total spent: {total:curr}\n"
    "Conversion rate: {conv_rate:pct}\n"
    "Active subscriber: {is_subscriber:yn}\n"
    "Plan: {plan:>10}"    # standard right-align still works
)

print(fmt.format(
    report_template,
    name="Alice Chen",
    total=1234.5,
    conv_rate=0.857,
    is_subscriber=True,
    plan="Pro"
))

Output:

Customer: Alice Chen
Total spent: $1,234.50
Conversion rate: 85.7%
Active subscriber: Yes
Plan:        Pro

Custom format codes coexist with all standard format specs -- :>10 right-align still works alongside your :curr and :pct codes. This pattern is useful for report generation, invoice templates, and any domain where you want a consistent formatting vocabulary that non-developers can use without knowing Python format syntax.

Real-Life Example: Notification Template Engine

Here is a complete notification engine that loads message templates from a config dict, fills them safely with string.Template.safe_substitute(), and validates that all expected placeholders were filled:

# notification_engine.py
import string
from dataclasses import dataclass
from typing import Any

# Message templates -- in production, load from a YAML/JSON file
TEMPLATES = {
    "order_confirmed": string.Template(
        "Hi $first_name, your order #$order_id has been confirmed. "
        "Estimated delivery: $delivery_date."
    ),
    "password_reset": string.Template(
        "Hi $first_name, use this link to reset your password: $reset_link "
        "(expires in $expiry_minutes minutes)."
    ),
    "subscription_renewal": string.Template(
        "Hi $first_name, your $plan_name subscription renews on $renewal_date "
        "for $$renewal_amount."
    ),
}

@dataclass
class Notification:
    template_name: str
    context: dict[str, Any]
    channel: str = "email"   # "email", "sms", "push"

    def render(self) -> str:
        tmpl = TEMPLATES.get(self.template_name)
        if tmpl is None:
            raise ValueError(f"Unknown template: {self.template_name}")
        # Use safe_substitute so partial context does not crash
        rendered = tmpl.safe_substitute(self.context)
        # Check for any unresolved $placeholders
        remaining = [w for w in rendered.split() if w.startswith("$")]
        if remaining:
            print(f"  Warning: unresolved placeholders: {remaining}")
        return rendered

# Send a batch of notifications
notifications = [
    Notification(
        "order_confirmed",
        {"first_name": "Alice", "order_id": "ORD-7842", "delivery_date": "May 2"},
    ),
    Notification(
        "password_reset",
        {"first_name": "Bob", "reset_link": "https://app.example.com/reset/abc123",
         "expiry_minutes": 30},
        channel="sms",
    ),
    Notification(
        "subscription_renewal",
        {"first_name": "Carol", "plan_name": "Pro", "renewal_date": "May 15"},
        # renewal_amount deliberately missing to show safe_substitute in action
    ),
]

for notif in notifications:
    message = notif.render()
    print(f"[{notif.channel.upper()}] {message}")
    print()

Output:

[EMAIL] Hi Alice, your order #ORD-7842 has been confirmed. Estimated delivery: May 2.

[SMS] Hi Bob, use this link to reset your password: https://app.example.com/reset/abc123 (expires in 30 minutes).

  Warning: unresolved placeholders: ['$renewal_amount.']
[EMAIL] Hi Carol, your Pro subscription renews on May 15 for $renewal_amount.

The engine uses safe_substitute() so a missing value never crashes production. The warning check catches missing placeholders before they reach users in a way that does not fail silently. Because the templates live in a dict (loaded from config in production), non-developers can edit message copy without touching Python code -- the template syntax is simple enough for a product manager to understand.

Frequently Asked Questions

When should I use string.Template instead of f-strings?

Use string.Template when the template itself comes from outside your codebase -- config files, database records, user input, or CMS content. F-strings are evaluated at write time in your source code; string.Template is evaluated at runtime against a provided mapping. If you let users type f"Hello {os.system('rm -rf /')}" into a template field, an f-string would execute it. string.Template only performs simple key substitution -- no expressions, no attribute access, no function calls.

Should I use string constants or regex for character class checks?

For simple membership tests (does this string contain only digits?), string constants are faster and more readable: all(c in string.digits for c in s). For pattern matching (does this string match a specific digit pattern?), use re. For large strings, str.translate() with a translation table built from string.digits can be significantly faster than a generator expression because it is implemented in C.

What can Formatter do that str.format() cannot?

str.format() is actually implemented using string.Formatter internally. Subclassing lets you add custom format specs (like :curr above), override how keys are looked up in the mapping (useful for case-insensitive keys or computed properties), implement format validation before output, and add logging or auditing around format calls. You can also override vformat() to control the entire rendering pipeline.

What is string.capwords()?

string.capwords(s) splits on whitespace, capitalises each word, and rejoins with single spaces. It differs from str.title() in that it handles apostrophes correctly: "it's" becomes "It's" with capwords, but "It'S" with title() (which capitalises after any non-letter, including apostrophes). Use capwords for human names and titles; use title() only when you want the technical definition of title case.

Can I use string.Template with nested or chained templates?

string.Template does not support nested substitution natively -- the result of a substitution is a plain string and is not re-processed for more $ placeholders. If you need two-pass templating (a template whose substituted values contain further $ placeholders), call substitute() twice: once with the outer context, then again on the result with the inner context. This is deliberate -- one-pass substitution prevents infinite loops from recursive templates.

Conclusion

The string module solves three specific problems cleanly: Template gives you safe substitution for user-supplied templates, the character constants give you pre-built character sets for validation and generation, and Formatter lets you build custom format mini-languages without monkey-patching the str type. None of these replace f-strings for everyday use -- they fill the gaps where f-strings are either unsafe or inflexible.

Take the notification engine from the real-life example and extend it by loading the TEMPLATES dict from a YAML file with yaml.safe_load(). Your product team can now edit notification copy in YAML without touching Python, and the engine validates that every required placeholder is filled before sending. Read the full string module documentation at https://docs.python.org/3/library/string.html.

How To Use Python cachetools for Flexible Caching Strategies

How To Use Python cachetools for Flexible Caching Strategies

Intermediate

Python’s built-in functools.lru_cache is great for memoizing pure functions, but it has one fixed eviction strategy (Least Recently Used) and no expiry. Once an entry is cached, it stays until the cache fills up or the process restarts. For most scripts that is fine. For a web application that calls a slow external API, caches database query results, or serves pricing data that changes every few minutes, you need caches that expire old data, evict entries by frequency rather than recency, or give you a bounded size you can tune at runtime.

cachetools is a small third-party library (install with pip install cachetools) that provides four ready-made cache classes — LRU, LFU, TTL, and RR — each as a plain Python MutableMapping you can use as a dict, a decorator via cachetools.cached(), or a drop-in replacement for the standard library’s @lru_cache. It also ships thread-safe and async-safe variants of every cache type.

This article walks through each cache type with concrete code examples, explains which eviction strategy to choose for which scenario, shows how to use cachetools as a decorator, covers thread-safe caching in multi-threaded servers, and closes with a real-life example caching OpenWeather API responses with TTL expiry. By the end you will have a caching toolkit that covers every common scenario.

cachetools Quick Example

Here is a function that calls a slow external API, cached with a TTL so results expire after 60 seconds:

# quick_cachetools.py
import time
import cachetools
import cachetools.func

@cachetools.func.ttl_cache(maxsize=128, ttl=60)
def get_user_profile(user_id: int) -> dict:
    """Fetch user profile -- cached for 60 seconds per user_id."""
    print(f"  Fetching user {user_id} from API...")
    time.sleep(0.5)   # simulate network delay
    return {"id": user_id, "name": f"User {user_id}", "plan": "pro"}

print("First call (cache miss):")
print(get_user_profile(42))

print("\nSecond call (cache hit -- no fetch):")
print(get_user_profile(42))

print("\nDifferent user (cache miss):")
print(get_user_profile(99))

Output:

First call (cache miss):
  Fetching user 42 from API...
{'id': 42, 'name': 'User 42', 'plan': 'pro'}

Second call (cache hit -- no fetch):
{'id': 42, 'name': 'User 42', 'plan': 'pro'}

Different user (cache miss):
  Fetching user 99 from API...
{'id': 99, 'name': 'User 99', 'plan': 'pro'}

The second call for user 42 returns instantly from cache. After 60 seconds, the next call for user 42 will hit the API again automatically. Each unique user_id is a separate cache key. You get expiry, bounded size, and memoization in one decorator — no manual cache management.

Cache Types: LRU, LFU, TTL, and RR

Choosing the right eviction strategy matters because it determines which entries survive under memory pressure. Here is the full comparison:

CacheEvictsBest forClass
LRULeast Recently UsedGeneral memoization, recent-access patternsLRUCache
LFULeast Frequently UsedPopular-item retention (top-N queries)LFUCache
TTLEntries older than TTL secondsExternal API data, time-sensitive resultsTTLCache
RRRandom entryUniform access patterns, simple bounded cacheRRCache

LRU is the right default if you have no other information. Use LFU when you know some keys are requested far more often than others (e.g., top-10 products on an e-commerce site) and you want those to stay cached even if other keys were accessed more recently. Use TTL whenever the underlying data changes on a known schedule. Use RR only if you need a very simple bounded dict and do not care about eviction quality.

TTLCache: items expire on a clock. The freshest cache.
TTLCache: items expire on a clock. The freshest cache.

LRU and LFU Caches

Both LRUCache and LFUCache are plain Python mappings — you use them like dicts, or wrap them with the @cached() decorator:

# lru_lfu_demo.py
from cachetools import LRUCache, LFUCache, cached

# --- LRU as a decorator ---
lru = LRUCache(maxsize=3)

@cached(cache=lru)
def compute_square(n: int) -> int:
    print(f"  Computing {n}^2...")
    return n * n

for x in [1, 2, 3, 1, 4]:   # 4 causes eviction (cache full after 1,2,3)
    print(f"square({x}) = {compute_square(x)}")

print(f"\nLRU cache now holds: {list(lru.keys())}")

# --- LFU as a plain dict ---
lfu = LFUCache(maxsize=3)
lfu["a"] = 1
lfu["b"] = 2
lfu["c"] = 3
_ = lfu["a"]   # access 'a' twice so it is most frequent
_ = lfu["a"]
_ = lfu["b"]   # access 'b' once
# Adding 'd' should evict 'c' (frequency=0, least frequent)
lfu["d"] = 4
print("LFU keys after inserting 'd':", list(lfu.keys()))

Output:

  Computing 1^2...
square(1) = 1
  Computing 2^2...
square(2) = 4
  Computing 3^2...
square(3) = 9
square(1) = 1          <-- cache hit
  Computing 4^2...     <-- 4 inserted, 2 evicted (LRU)
square(4) = 16

LRU cache now holds: [3, 1, 4]
LFU keys after inserting 'd': ['a', 'b', 'd']   # 'c' evicted (never accessed)

The @cached(cache=lru) pattern separates the cache instance from the decorated function, which means you can inspect, resize, or clear the cache by name — unlike @lru_cache where the cache is hidden inside the function’s closure. compute_square.cache_info() is not available, but you can check len(lru), lru.currsize, and lru.maxsize directly.

TTL Cache for Time-Sensitive Data

TTL (Time To Live) cache entries expire automatically after a set number of seconds. This is the right choice whenever your data has a known freshness window — exchange rates, weather data, feature flags, session tokens:

# ttl_demo.py
import time
from cachetools import TTLCache, cached

ttl = TTLCache(maxsize=256, ttl=5)   # entries expire after 5 seconds

@cached(cache=ttl)
def fetch_exchange_rate(currency: str) -> float:
    print(f"  Hitting API for {currency}...")
    # In production: return requests.get(f"https://api.exchangerate.host/latest?base={currency}").json()["rates"]["AUD"]
    rates = {"USD": 1.58, "EUR": 1.72, "GBP": 2.01}
    return rates.get(currency, 1.0)

print("First call:")
print(fetch_exchange_rate("USD"))   # cache miss

print("\nImmediate second call:")
print(fetch_exchange_rate("USD"))   # cache hit

print("\nWaiting 6 seconds for TTL expiry...")
time.sleep(6)

print("Call after expiry:")
print(fetch_exchange_rate("USD"))   # cache miss again -- TTL expired

Output:

First call:
  Hitting API for USD...
1.58

Immediate second call:
1.58

Waiting 6 seconds for TTL expiry...
Call after expiry:
  Hitting API for USD...
1.58

The TTL clock starts when the entry is inserted, not when it was last accessed. This means even a highly popular key expires on schedule — exactly what you want for data that becomes stale by age, not by usage. If you want expiry-on-last-access semantics, you will need to implement a custom __getitem__ that refreshes the TTL on each read, or reconsider whether LRU is actually the right choice for your use case.

LRU: keep the recent, evict the oldest. The classic strategy.
LRU: keep the recent, evict the oldest. The classic strategy.

Thread-Safe Caching

All cachetools cache classes are not thread-safe by default — if two threads simultaneously insert or evict entries, you can get corrupted state. For multi-threaded servers (Flask with threads, Django, FastAPI with sync endpoints), use the thread-safe @cached(cache, lock=RLock()) pattern:

# thread_safe_demo.py
import threading
from cachetools import TTLCache, cached

# Thread-safe TTL cache using an RLock
cache = TTLCache(maxsize=512, ttl=120)
lock = threading.RLock()

@cached(cache=cache, lock=lock)
def get_user_permissions(user_id: int) -> list[str]:
    print(f"  Loading permissions for user {user_id} from DB...")
    # Simulate a slow DB query
    permissions_db = {
        1: ["read", "write", "admin"],
        2: ["read"],
        3: ["read", "write"],
    }
    return permissions_db.get(user_id, [])

# Simulate concurrent requests from multiple threads
def simulate_request(user_id: int):
    perms = get_user_permissions(user_id)
    print(f"Thread {threading.current_thread().name}: user {user_id} -> {perms}")

threads = [threading.Thread(target=simulate_request, args=(1,), name=f"T{i}") for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Output:

  Loading permissions for user 1 from DB...
Thread T0: user 1 -> ['read', 'write', 'admin']
Thread T1: user 1 -> ['read', 'write', 'admin']
Thread T2: user 1 -> ['read', 'write', 'admin']

Only one DB query fires despite three threads requesting the same data. The RLock serialises cache lookups and inserts so no two threads modify the cache simultaneously. Use threading.RLock() rather than threading.Lock() to allow the same thread to re-enter the lock if the cached function is called recursively.

Real-Life Example: Caching OpenWeather API Responses

Here is a complete weather client that caches current conditions for 10 minutes and the 5-day forecast for 30 minutes — matching OpenWeather’s update frequency so you never pay for stale data:

# weather_client.py
import threading
import time
from cachetools import TTLCache, cached
import urllib.request
import json

# Separate caches for current vs forecast (different TTLs)
current_cache = TTLCache(maxsize=100, ttl=600)    # 10 min
forecast_cache = TTLCache(maxsize=50, ttl=1800)   # 30 min
current_lock = threading.RLock()
forecast_lock = threading.RLock()

BASE_URL = "https://api.openweathermap.org/data/2.5"

# Replace with your free key from https://openweathermap.org/api
API_KEY = "YOUR_OPENWEATHER_API_KEY"

@cached(cache=current_cache, lock=current_lock)
def get_current_weather(city: str) -> dict:
    url = f"{BASE_URL}/weather?q={city}&appid={API_KEY}&units=metric"
    with urllib.request.urlopen(url) as resp:
        data = json.loads(resp.read())
    return {
        "city": data["name"],
        "temp_c": data["main"]["temp"],
        "feels_like": data["main"]["feels_like"],
        "description": data["weather"][0]["description"],
        "humidity": data["main"]["humidity"],
    }

@cached(cache=forecast_cache, lock=forecast_lock)
def get_forecast(city: str) -> list[dict]:
    url = f"{BASE_URL}/forecast?q={city}&appid={API_KEY}&units=metric&cnt=5"
    with urllib.request.urlopen(url) as resp:
        data = json.loads(resp.read())
    return [
        {
            "time": item["dt_txt"],
            "temp_c": item["main"]["temp"],
            "description": item["weather"][0]["description"],
        }
        for item in data["list"]
    ]

def weather_report(city: str) -> None:
    print(f"\n=== Weather Report: {city} ===")
    current = get_current_weather(city)
    print(f"Now: {current['temp_c']:.1f}C, {current['description']}, "
          f"humidity {current['humidity']}%")
    forecast = get_forecast(city)
    print("Forecast (next 5 periods):")
    for slot in forecast:
        print(f"  {slot['time']}: {slot['temp_c']:.1f}C, {slot['description']}")
    print(f"Cache sizes: current={current_cache.currsize}, "
          f"forecast={forecast_cache.currsize}")

if __name__ == "__main__":
    weather_report("Sydney")
    print("\n-- Second call (cache hit) --")
    start = time.monotonic()
    weather_report("Sydney")
    elapsed = time.monotonic() - start
    print(f"Second call took {elapsed:.3f}s (should be near 0)")

Calling weather_report("Sydney") a second time returns instantly from both caches, with no network requests. After 10 minutes the current weather cache expires and the next call re-fetches, while the forecast stays cached for another 20 minutes. To extend this, add a LFUCache for city lookups (geocoding) which rarely changes — you want popular cities to stay cached indefinitely while obscure ones get evicted.

Eviction strategies matter. Match them to your access pattern.
Eviction strategies matter. Match them to your access pattern.

Frequently Asked Questions

When should I use cachetools instead of functools.lru_cache?

Use functools.lru_cache when you need simple memoization with no expiry and the cache lives for the entire process lifetime. Switch to cachetools when you need time-based expiry (TTL), frequency-based eviction (LFU), thread-safe access with an explicit lock, per-instance caching on class methods (lru_cache binds to the instance and leaks memory), or a cache you can introspect and clear by name. cachetools gives you explicit control; lru_cache is more convenient but less flexible.

How do I choose a maxsize?

Start with a size proportional to your memory budget divided by average entry size. A dict with a few string keys and int values is roughly 200-500 bytes. A cache holding API response dicts with 20 keys might be 2-5KB per entry. For a 10MB cache budget and 2KB entries, maxsize=5000 is reasonable. Monitor cache.currsize and eviction rates in production, then tune up or down. When in doubt, set maxsize to the number of unique keys you realistically expect to see per TTL window.

How do I cache class instance methods?

Do not use a module-level cache for instance methods — the instance itself will be the first argument (self) and will be held in the cache forever, preventing garbage collection. Instead, create the cache and lock as instance attributes in __init__ and use cachetools.cached(self._cache, lock=self._lock) as a per-instance decorator. Alternatively, use methodtools.lru_cache (a separate small library) which handles instance method caching correctly.

Does cachetools work with async functions?

The standard @cached() decorator does not work with async functions because it calls the wrapped function synchronously. For async caching, use asyncache (a companion library) which provides @acached() and @alru_cache() decorators. They use the same cachetools cache classes and locks, but await the underlying coroutine correctly and use an asyncio.Lock instead of a threading lock.

How does cachetools generate cache keys from arguments?

By default, @cached() uses a hash of all positional and keyword arguments — the same strategy as lru_cache. Arguments must be hashable (strings, ints, tuples, frozensets — not lists or dicts). If your function accepts unhashable arguments like dicts or lists, pass a custom key function: @cached(cache, key=lambda d: json.dumps(d, sort_keys=True)). The key function receives the same args and kwargs as the cached function and must return a hashable value.

Conclusion

You now have a complete caching toolkit: LRUCache for general memoization, LFUCache for popular-item retention, TTLCache for time-sensitive external data, thread-safe patterns with RLock, and the @cached() decorator for clean function-level caching. The real-life weather client shows how to combine multiple cache types with different TTLs to match the freshness windows of different data sources — a pattern that directly reduces API costs and latency in any production system.

Extend the weather client by adding an LFUCache for city geocoding (city name to lat/lon) so popular cities like “Sydney” and “London” stay cached forever while obscure ones are evicted when memory pressure grows. Check the full cachetools API at https://cachetools.readthedocs.io/ for the full list of cache classes and the keys(), values(), and eviction callback options.

The Four Eviction Strategies

from cachetools import LRUCache, LFUCache, TTLCache, RRCache

# LRU — evict the least-recently-used item
cache = LRUCache(maxsize=100)

# LFU — evict the least-frequently-used (counts accesses)
cache = LFUCache(maxsize=100)

# TTL — items expire after N seconds
cache = TTLCache(maxsize=100, ttl=300)   # 5 minutes

# Random — evict any item (lowest overhead)
cache = RRCache(maxsize=100)

# All behave as dicts
cache["user:1"] = {"name": "Alice"}
print(cache.get("user:1"))

LRU is the default for most use cases — recency tracks “still hot” effectively. LFU wins for caches where popularity matters more than recency (cdn-style). TTL is essential for cached external data (rates, API responses). RRCache is the lowest-overhead but unpredictable.

@cached: Function-Level Caching

from cachetools import cached, TTLCache

cache = TTLCache(maxsize=1000, ttl=60)

@cached(cache)
def fetch_user(user_id):
    return slow_database_lookup(user_id)

print(fetch_user(42))   # actual DB call
print(fetch_user(42))   # cache hit, instant

Like functools.lru_cache but with TTL and richer eviction. The cache object can be inspected, cleared, or shared across functions.

Per-Method Caching

from cachetools import cachedmethod, TTLCache
from operator import attrgetter

class UserService:
    def __init__(self):
        self.cache = TTLCache(maxsize=1000, ttl=60)

    @cachedmethod(attrgetter("cache"))
    def get_user(self, user_id):
        return self.db.query(user_id)

# Cache lives on the instance — cleaned up when instance is garbage-collected
service = UserService()
service.get_user(42)
service.get_user(42)   # cached

@cachedmethod stores cache on the instance, not as global state. Each instance has its own cache; instances can be created and destroyed without leaking memory like @lru_cache on methods would.

Thread Safety with RLock

from cachetools import LRUCache, cached
from threading import RLock

cache = LRUCache(maxsize=1000)
lock = RLock()

@cached(cache, lock=lock)
def fetch_user(user_id):
    return slow_lookup(user_id)

# Multiple threads can call concurrently; lock serializes cache access

Vanilla cachetools caches aren’t thread-safe. Pass a lock= to @cached for concurrent access. For async code, use the asyncache library (cachetools-compatible).

Custom TTLs Per Item

from cachetools import TLRUCache
import time

def expire_in(key, value, now):
    # Static TTL: 60 seconds
    return now + 60

# More sophisticated: read from the value
def expire_per_value(key, value, now):
    return now + value.get("ttl", 60)

cache = TLRUCache(maxsize=100, timer=time.time, ttu=expire_per_value)
cache["foo"] = {"data": "...", "ttl": 30}    # expires in 30s
cache["bar"] = {"data": "...", "ttl": 600}   # expires in 10 min

Asynchronous Caching (asyncache)

# pip install asyncache cachetools
from asyncache import cached
from cachetools import TTLCache

cache = TTLCache(maxsize=100, ttl=60)

@cached(cache)
async def fetch_user(user_id):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"/users/{user_id}") as resp:
            return await resp.json()

Common Pitfalls

  • Caching mutable values. A cached dict returned and mutated by the caller corrupts the cache. Either return frozen copies or document that callers must not mutate.
  • Cache key not hashable. Arguments must be hashable. Pass tuples/frozensets instead of lists/dicts, or write a custom key function.
  • TTL feels stale. Items expire on access, not in the background. A long-idle cache with all-expired items still occupies memory until accessed.
  • maxsize=0. Disables caching. Easy typo. Be sure your maxsize is positive.
  • Forgetting to pass the lock. No lock = race conditions on cache writes under concurrency. Pass lock=RLock() to @cached.

FAQ

Q: cachetools or functools.lru_cache?
A: lru_cache for the simplest case (one function, no TTL, no metrics). cachetools when you need TTL, custom eviction, thread safety, or class-method caches.

Q: cachetools or Redis?
A: cachetools for in-process caching (one Python process). Redis when multiple processes need to share the cache, or you need persistence.

Q: How do I clear the cache?
A: cache.clear() or for decorated: decorated_func.cache.clear().

Q: How big should the cache be?
A: Profile your workload. Start with maxsize=1000; double until cache_info hit rate plateaus. Memory is usually the bottleneck.

Q: Can I cache exceptions?
A: No, cachetools only caches return values. Exceptions re-raise on every call. For sticky failures, return a sentinel value instead.

Wrapping Up

cachetools is the right Python library for in-process caching beyond what functools.lru_cache provides. Pick the eviction strategy that matches your access pattern (LRU for most, TTL for external data, LFU for popularity-skewed). Use @cachedmethod on instances, @cached for functions, and always pass a lock when you have concurrent access. For cross-process or persistent caches, graduate to Redis.

How To Use Python Loguru for Modern Application Logging

How To Use Python Loguru for Modern Application Logging

Beginner

Every serious Python project needs logging, but the standard logging module requires you to configure handlers, formatters, and levels before you can write a single line to a file. By the time you have a working setup, you have written more boilerplate than actual code. Worse, exceptions logged with the standard module show a flat traceback — no colours, no local variable values, no easy way to tell which frame caused the crash. Debugging a production issue from a plain-text traceback feels like reading a crash report by torchlight.

Loguru is a third-party library that replaces all of that boilerplate with one import. Install it with pip install loguru and you get a pre-configured logger with coloured output, automatic exception tracing with variable values, file rotation, structured logging, and async support — all driven by a single logger.add() call instead of five classes and a dozen method calls.

In this article we will set up Loguru from scratch, learn its core logging levels and message formatting, configure file sinks with rotation and retention, capture rich exception tracebacks, log structured data, and build a real-life application logger for a FastAPI service. You will finish with a drop-in logging setup you can reuse across any project.

Loguru Quick Example

The fastest way to appreciate Loguru is to see how little setup it needs compared to the standard library:

# quick_loguru.py
from loguru import logger

logger.debug("Checking config...")
logger.info("Application started")
logger.warning("Rate limit approaching: 95/100 requests used")
logger.error("Database connection failed")
logger.success("Cache warmed successfully")   # Loguru-only level

Output (colourised in terminal):

2026-04-27 08:12:34.501 | DEBUG    | __main__:<module>:3 - Checking config...
2026-04-27 08:12:34.502 | INFO     | __main__:<module>:4 - Application started
2026-04-27 08:12:34.502 | WARNING  | __main__:<module>:5 - Rate limit approaching: 95/100 requests used
2026-04-27 08:12:34.503 | ERROR    | __main__:<module>:6 - Database connection failed
2026-04-27 08:12:34.503 | SUCCESS  | __main__:<module>:7 - Cache warmed successfully

No setup, no basicConfig(), no getLogger(). Just import and log. Loguru automatically writes to stderr with timestamps, level names, the calling module and line number, and full colour coding per level. The extra SUCCESS level sits between INFO and WARNING — useful for confirming that an important step completed cleanly.

What Is Loguru and How Does It Differ?

The standard logging module works fine but was designed for maximum configurability, which means minimum convenience. You must create a logger, attach a handler, set a formatter, and set levels — four separate objects just to write a message to a file. Loguru’s philosophy is the opposite: one global logger object, all configuration through logger.add(), and sensible defaults for everything.

Featurelogging (stdlib)Loguru
Setup lines for file logging6-101
Coloured terminal outputNeeds extra libBuilt-in
Exception tracing with variablesNoYes
File rotation by size/timeRotatingFileHandlerlogger.add(…, rotation=)
Structured loggingExtra JSON formatterlogger.add(…, serialize=True)
SUCCESS levelNoYes

Loguru does not replace the standard library in production systems that integrate with third-party logging infrastructure (like Sentry or Datadog handlers). For those, you often bridge Loguru back to the stdlib. But for greenfield projects, scripts, microservices, and anything you write from scratch, Loguru is simply faster to ship.

loguru: print() but with timestamps, levels, and color.
loguru: print() but with timestamps, levels, and color.

Configuring Sinks: Terminal, File, and Rotation

In Loguru, a “sink” is any destination for log messages — a file, a terminal stream, a network endpoint, or a function. You add sinks with logger.add() and remove the default stderr sink if you want complete control:

# sinks_demo.py
import sys
from loguru import logger

# Remove the default stderr sink
logger.remove()

# Add a clean stdout sink showing only INFO and above
logger.add(sys.stdout, level="INFO", colorize=True,
           format="{time:HH:mm:ss} | {level} | {message}")

# Add a rotating file sink -- new file every day, keep 7 days
logger.add(
    "logs/app_{time:YYYY-MM-DD}.log",
    level="DEBUG",
    rotation="00:00",        # rotate at midnight
    retention="7 days",      # delete logs older than 7 days
    compression="zip",       # compress old logs
    enqueue=True,            # thread-safe async writing
)

logger.debug("This goes only to the file (below INFO threshold for stdout)")
logger.info("This goes to both stdout and file")
logger.error("Error logged to both sinks")

Output (stdout):

08:12:34 | INFO | This goes to both stdout and file
08:12:34 | ERROR | Error logged to both sinks

The rotation parameter accepts time strings like "00:00" (midnight), size strings like "100 MB", or a timedelta. The retention parameter automatically deletes old files. enqueue=True makes writes happen in a background thread, which is essential for high-throughput applications where file I/O should not block the main thread. compression="zip" saves disk space by compressing rotated files immediately.

Rich Exception Tracing

Loguru’s most impressive feature is exception logging. Use logger.exception() inside an except block, or use logger.opt(exception=True) — Loguru prints the full traceback including local variable values at every frame:

# exception_demo.py
from loguru import logger

def parse_config(data: dict) -> dict:
    required = ["host", "port", "db_name"]
    result = {}
    for key in required:
        value = data[key]    # KeyError if missing
        result[key] = value
    return result

config_input = {"host": "localhost", "port": 5432}  # missing db_name

try:
    config = parse_config(config_input)
except KeyError:
    logger.exception("Config parsing failed -- check your .env file")

Output:

2026-04-27 08:12:35.100 | ERROR | __main__:<module>:13 - Config parsing failed -- check your .env file
Traceback (most recent call last):
  File "exception_demo.py", line 13, in <module>
    config = parse_config(config_input)
    -- config_input = {'host': 'localhost', 'port': 5432}
  File "exception_demo.py", line 7, in parse_config
    value = data[key]
    -- data = {'host': 'localhost', 'port': 5432}
    -- key = 'db_name'
KeyError: 'db_name'

The -- variable = value lines after each frame are Loguru’s signature feature. When you are debugging a crash that happened in production six hours ago, seeing exactly what values were in scope when it crashed is the difference between a five-minute fix and a two-hour investigation. This alone is worth the dependency.

logger.add(
logger.add(“file.log”). That’s the whole config.

Structured Logging and Context Binding

For machine-readable logs consumed by log aggregators like Elasticsearch or Datadog, enable JSON output with serialize=True. Use logger.bind() to attach context that flows through every subsequent log call in a request lifecycle:

# structured_logging.py
import sys
import json
from loguru import logger

# Add a JSON-output sink
logger.add(sys.stdout, serialize=True, level="INFO")
logger.remove(0)   # remove the default coloured handler (id=0)

# Bind context for a specific request
request_logger = logger.bind(request_id="req-abc-123", user_id=42)

request_logger.info("Processing payment")
request_logger.info("Payment authorised", amount=99.95, currency="AUD")

Output (JSON, one object per line):

{"text": "Processing payment\n", "record": {"elapsed": {...}, "level": {"name": "INFO", ...}, "extra": {"request_id": "req-abc-123", "user_id": 42}, "message": "Processing payment", ...}}
{"text": "Payment authorised\n", "record": {..., "extra": {"request_id": "req-abc-123", "user_id": 42, "amount": 99.95, "currency": "AUD"}, ...}}

Every JSON log line contains the full record with the bound context values. Log aggregators can index extra.request_id and extra.user_id to give you per-request drill-down in your observability dashboard. The logger.bind() call returns a new logger instance — it does not modify the global logger, so you can safely use it inside async handlers or threads without affecting other request contexts.

Real-Life Example: FastAPI Request Logger

Here is a complete logging setup for a FastAPI application with per-request context, file rotation, and exception capturing:

# app_logger.py
import sys
from loguru import logger
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uuid

# ------------------------------------------------------------------
# Logging setup -- call once at startup
# ------------------------------------------------------------------
def setup_logging():
    logger.remove()   # clear defaults
    # Human-readable stdout for development
    logger.add(
        sys.stdout,
        level="DEBUG",
        colorize=True,
        format="{time:HH:mm:ss} | {level:<8} | {extra[req_id]:.8} | {message}",
    )
    # Rotating JSON file for production / log aggregation
    logger.add(
        "logs/api_{time:YYYY-MM-DD}.jsonl",
        level="INFO",
        serialize=True,
        rotation="100 MB",
        retention="30 days",
        compression="gz",
        enqueue=True,
    )

# ------------------------------------------------------------------
# FastAPI app with middleware for per-request logging
# ------------------------------------------------------------------
setup_logging()
app = FastAPI()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    req_id = str(uuid.uuid4())[:8]
    req_logger = logger.bind(req_id=req_id, path=request.url.path)
    req_logger.info(f"START {request.method} {request.url.path}")
    try:
        response = await call_next(request)
        req_logger.info(f"END {response.status_code}")
        return response
    except Exception:
        req_logger.exception("Unhandled exception during request")
        return JSONResponse({"error": "internal server error"}, status_code=500)

@app.get("/items/{item_id}")
async def get_item(item_id: int):
    if item_id < 1:
        raise ValueError(f"item_id must be positive, got {item_id}")
    return {"item_id": item_id, "name": f"Item {item_id}"}

Sample stdout output for a request to /items/5:

08:12:35 | INFO     | req-1a2b | START GET /items/5
08:12:35 | INFO     | req-1a2b | END 200

Each request gets a short UUID prefix so you can grep the logs for a single request across multiple log lines. The except Exception in the middleware catches any unhandled error, logs the full traceback with local variables, and returns a clean 500 response to the client. To extend the system, call logger.add() again with a Slack or PagerDuty webhook sink that fires only on CRITICAL messages.

logger.exception: traceback included, no thinking required.
logger.exception: traceback included, no thinking required.

Frequently Asked Questions

How do I use Loguru alongside third-party libraries that use stdlib logging?

Most third-party libraries log via the stdlib logging module. To forward those messages into Loguru, intercept the stdlib root logger with a custom handler: create a class that subclasses logging.Handler, override emit() to call logger.log(record.levelno, record.getMessage()), and attach it with logging.basicConfig(handlers=[YourHandler()]). This pipes all stdlib log calls through your Loguru sinks automatically.

Can I add custom log levels beyond the built-ins?

Yes. Use logger.level("AUDIT", no=38, color="<yellow>", icon="@") to register a new level between WARNING (30) and ERROR (40). After registering, call it with logger.log("AUDIT", "User deleted account"). Custom levels appear in all sinks that have a level number at or below your custom level's number. This is useful for security audit logs that need to persist even when general INFO logging is disabled.

Is Loguru safe to use in async code?

Yes, with enqueue=True on your file sinks. Without it, multiple async tasks writing to the same file sink can interleave log lines. enqueue=True routes all writes through an internal queue processed by a dedicated thread, serializing them safely. The logger.bind() context is also async-safe because it returns a new logger object rather than mutating global state.

How do I capture Loguru output in pytest tests?

Add a Loguru sink that writes to a list or a StringIO buffer before the test, and remove it after. A clean pattern is a pytest fixture that calls logger.add(string_buffer) with level="DEBUG", yields the buffer for assertions, then calls logger.remove(handler_id) in teardown. You can then assert on string_buffer.getvalue() to verify that specific log messages were emitted during the test.

What does logger.remove(0) actually do?

Loguru starts with one default sink — stderr with id 0. Calling logger.remove(0) removes that sink. After this call, no output goes anywhere until you add a new sink with logger.add(). This is the standard pattern when you want complete control over where logs go. Calling logger.remove() without an argument removes all sinks at once, which is useful during test teardown.

Conclusion

Loguru reduces production logging from a configuration ceremony to a design decision. You have learned how to use the pre-configured global logger, add and customise sinks for terminal and rotating files, capture rich exception tracebacks with local variable values, bind per-request context for structured JSON output, and wire the whole thing into a FastAPI middleware. Every project from one-off scripts to microservices benefits from replacing print() and stdlib boilerplate with these patterns.

The natural next step is adding a Loguru sink that fires on ERROR and above and sends a Telegram or Slack notification. Grab the real-life FastAPI example, point its CRITICAL sink at the Telegram Bot API, and you have on-call alerting in under 20 lines. The official Loguru documentation at https://loguru.readthedocs.io/ has a comprehensive recipe section for sink integrations.

How To Create a Python Plugin System with importlib

How To Create a Python Plugin System with importlib

Intermediate

You have a working Python application, and you want users or team members to extend it without touching the core codebase. Maybe you are building a data pipeline that accepts custom processors, a testing framework that supports user-defined reporters, or a CLI tool where third-party developers can drop in new commands. Every time someone needs a new feature, you do not want to modify, retest, and redeploy the entire application. What you need is a plugin system.

Python’s built-in importlib module makes this surprisingly straightforward. It lets you load Python modules dynamically at runtime — by name, by file path, or by scanning a directory — without knowing anything about those modules at write time. No third-party libraries required; importlib ships with every Python 3 installation.

In this article you will learn how to design and build a complete plugin system from scratch. We will cover the core importlib API, build a plugin interface using Abstract Base Classes, implement auto-discovery that scans a folder and loads every plugin it finds, and tie it all together with a real-life example — an extensible data formatter that accepts output plugins for CSV, JSON, and any future format someone dreams up. By the end you will have a pattern you can drop straight into your own projects.

Python Plugin System: Quick Example

Before diving into the full design, here is the simplest possible working plugin loader — a function that accepts a module path as a string and returns the loaded module object:

# quick_plugin_loader.py
import importlib

def load_plugin(module_path: str):
    """Load a plugin module by its dotted path (e.g. 'plugins.csv_writer')."""
    return importlib.import_module(module_path)

# Usage: load the built-in json module as a "plugin"
plugin = load_plugin("json")
data = {"name": "Alice", "score": 42}
print(plugin.dumps(data))

Output:

{"name": "Alice", "score": 42}

In three lines, importlib.import_module() accepts a dotted module path — exactly like a regular import statement — and returns the module object. You can then call any function or class on it just as you would after a normal import. This single function is the engine behind every plugin system we will build below.

The real power comes when you combine this with a plugin directory and a shared interface. Keep reading to see how.

What Is importlib and Why Use It?

Standard Python imports are resolved at parse time — when Python reads your source file, every import foo statement is baked in. importlib is Python’s own import machinery exposed as a public API. It lets you do the same thing, but at runtime, driven by data rather than hard-coded names.

Think of it like a library’s card catalog. A regular import says “go get the book called json from the shelf.” importlib says “go get whatever book name is written on this card I am holding right now.” The name on the card can come from a config file, a user’s input, or a directory scan. You do not decide at write time — you decide at run time.

ApproachWhn name is knownRuntime flexibilityRequires restart to add plugins
Regular importAt write timeNoneYes
importlib.import_module()At run timeHighNo
importlib.util.spec_from_file_location()At run time (file path)HighestNo

The third option — loading from a file path — is what makes truly external plugins possible: a user can drop a .py file anywhere on disk and your app loads it by path, even if it is not on the Python path at all.

Plugins: third parties extend your app without forking it.
Plugins: third parties extend your app without forking it.

Designing the Plugin Interface with ABC

A plugin system without a contract is chaos — each plugin can expose completely different APIs and your loader has no idea what to call. The solution is an Abstract Base Class (ABC) that defines the interface every plugin must implement. Think of it as a job description: any plugin that wants to work in your system must provide these specific methods.

Create a base class in a file all plugins can import:

# plugin_base.py
from abc import ABC, abstractmethod

class FormatterPlugin(ABC):
    """All output formatter plugins must inherit from this class."""

    @abstractmethod
    def name(self) -> str:
        """Return the plugin's display name (e.g. 'CSV', 'JSON')."""

    @abstractmethod
    def extension(self) -> str:
        """Return the file extension this plugin produces (e.g. '.csv')."""

    @abstractmethod
    def format(self, data: list[dict]) -> str:
        """
        Convert a list of dicts to a formatted string.
        Args:
            data: list of row dicts, e.g. [{'name': 'Alice', 'score': 42}]
        Returns:
            Formatted string ready to write to a file.
        """

Now write two concrete plugins, each in its own file inside a plugins/ directory:

# plugins/csv_plugin.py
import io
import csv
from plugin_base import FormatterPlugin

class CsvPlugin(FormatterPlugin):
    def name(self) -> str:
        return "CSV"

    def extension(self) -> str:
        return ".csv"

    def format(self, data: list[dict]) -> str:
        if not data:
            return ""
        output = io.StringIO()
        writer = csv.DictWriter(output, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)
        return output.getvalue()
# plugins/json_plugin.py
import json
from plugin_base import FormatterPlugin

class JsonPlugin(FormatterPlugin):
    def name(self) -> str:
        return "JSON"

    def extension(self) -> str:
        return ".json"

    def format(self, data: list[dict]) -> str:
        return json.dumps(data, indent=2)

Each plugin is a self-contained file. Adding a new output format means creating a new file — nothing else in the system changes. That is the plugin pattern paying off.

Loading Plugins from a File Path

When plugins live inside your own package, importlib.import_module() works perfectly because Python already knows where to look. But when a user drops a plugin file into an arbitrary directory outside your package, you need importlib.util.spec_from_file_location() to load it by absolute path:

# loader_by_path.py
import importlib.util
import sys

def load_plugin_from_path(name: str, filepath: str):
    """
    Load a Python module from an absolute file path.
    Args:
        name:     A unique name for this module (used as sys.modules key).
        filepath: Absolute path to the .py file.
    Returns:
        The loaded module object.
    """
    spec = importlib.util.spec_from_file_location(name, filepath)
    if spec is None:
        raise ImportError(f"Cannot load spec from {filepath}")
    module = importlib.util.module_from_spec(spec)
    sys.modules[name] = module           # register so it is importable later
    spec.loader.exec_module(module)      # actually execute the module code
    return module

# Example: load the csv plugin by path
import os
plugin_path = os.path.abspath("plugins/csv_plugin.py")
mod = load_plugin_from_path("csv_plugin", plugin_path)

# Instantiate the class inside the module
instance = mod.CsvPlugin()
print(instance.name(), instance.extension())

Output:

CSV .csv

The three-step dance — spec_from_file_location, module_from_spec, exec_module — is always the same pattern. Adding the module to sys.modules before executing it prevents circular import issues when the plugin tries to import something from your main package.

importlib.import_module: load modules by string at runtime.
importlib.import_module: load modules by string at runtime.

Auto-Discovery: Scanning a Plugin Directory

Manually specifying each plugin path defeats the purpose of a plugin system. Real plugin systems auto-discover: you point them at a directory and they find and load everything inside. Here is a complete auto-discovery function that scans a folder, loads every .py file that is not __init__.py, and returns all classes that implement your plugin interface:

# plugin_discovery.py
import importlib.util
import inspect
import os
import sys
from plugin_base import FormatterPlugin

def discover_plugins(plugin_dir: str) -> list[FormatterPlugin]:
    """
    Scan plugin_dir for .py files and return instances of all
    classes that subclass FormatterPlugin.
    """
    plugins = []
    plugin_dir = os.path.abspath(plugin_dir)

    for filename in os.listdir(plugin_dir):
        if not filename.endswith(".py") or filename == "__init__.py":
            continue

        filepath = os.path.join(plugin_dir, filename)
        module_name = filename[:-3]   # strip .py

        # Load the module from its file path
        spec = importlib.util.spec_from_file_location(module_name, filepath)
        if spec is None:
            continue
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        try:
            spec.loader.exec_module(module)
        except Exception as e:
            print(f"  Warning: could not load {filename}: {e}")
            continue

        # Inspect the module for subclasses of FormatterPlugin
        for attr_name in dir(module):
            obj = getattr(module, attr_name)
            if (
                inspect.isclass(obj)
                and issubclass(obj, FormatterPlugin)
                and obj is not FormatterPlugin   # exclude the base class itself
            ):
                plugins.append(obj())   # instantiate and collect

    return plugins


if __name__ == "__main__":
    found = discover_plugins("plugins")
    for p in found:
        print(f"Loaded plugin: {p.name()} -> writes {p.extension()} files")

Output:

Loaded plugin: CSV -> writes .csv files
Loaded plugin: JSON -> writes .json files

The inspect.isclass() and issubclass() checks do the filtering — you get back only the classes that honour the contract. The try/except around exec_module means a broken plugin does not crash the entire app; it prints a warning and continues. Defensive plugin loading is just as important as defensive parsing.

Config-Driven Plugin Loading

Sometimes you want the user to specify exactly which plugins to activate, rather than loading everything in a directory. A config file approach gives users explicit control:

# config_plugins.yaml  (user edits this)
plugins:
  - module: plugins.csv_plugin
    class: CsvPlugin
  - module: plugins.json_plugin
    class: JsonPlugin
# load_from_config.py
import importlib
import yaml     # pip install pyyaml

def load_plugins_from_config(config_path: str) -> list:
    """
    Load plugins specified in a YAML config file.
    Each entry has 'module' (dotted path) and 'class' (class name).
    """
    with open(config_path) as f:
        config = yaml.safe_load(f)

    plugins = []
    for entry in config.get("plugins", []):
        module_path = entry.get("module")
        class_name = entry.get("class")
        if not module_path or not class_name:
            continue
        try:
            mod = importlib.import_module(module_path)
            cls = getattr(mod, class_name)
            plugins.append(cls())
        except (ImportError, AttributeError) as e:
            print(f"  Warning: could not load {module_path}.{class_name}: {e}")

    return plugins

if __name__ == "__main__":
    loaded = load_plugins_from_config("config_plugins.yaml")
    for p in loaded:
        print(f"Active plugin: {p.name()}")

Output:

Active plugin: CSV
Active plugin: JSON

The config-driven approach works well when you have a stable plugin set you want to version-control alongside your app. The auto-discovery approach works better when you want users to just drop files into a folder and have them “just work”. Many real systems support both.

Entry points: setup.py declares 'I am pluggable'.
Entry points: setup.py declares ‘I am pluggable’.

Real-Life Example: Extensible Data Exporter

Let us tie everything together with a complete data exporter that auto-discovers plugins and lets the user pick the output format at runtime:

# exporter.py
import os
import sys
import importlib.util
import inspect
from plugin_base import FormatterPlugin

PLUGIN_DIR = os.path.join(os.path.dirname(__file__), "plugins")
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "output")


def discover_plugins(plugin_dir: str) -> dict[str, FormatterPlugin]:
    """Return a name->instance mapping of all discovered plugins."""
    found = {}
    for filename in os.listdir(plugin_dir):
        if not filename.endswith(".py") or filename == "__init__.py":
            continue
        path = os.path.join(plugin_dir, filename)
        name = filename[:-3]
        spec = importlib.util.spec_from_file_location(name, path)
        if spec is None:
            continue
        mod = importlib.util.module_from_spec(spec)
        sys.modules[name] = mod
        try:
            spec.loader.exec_module(mod)
        except Exception as e:
            print(f"  Skipping {filename}: {e}")
            continue
        for attr in dir(mod):
            obj = getattr(mod, attr)
            if (inspect.isclass(obj)
                    and issubclass(obj, FormatterPlugin)
                    and obj is not FormatterPlugin):
                instance = obj()
                found[instance.name()] = instance
    return found


def export(data: list[dict], format_name: str) -> None:
    plugins = discover_plugins(PLUGIN_DIR)
    if format_name not in plugins:
        available = ", ".join(plugins.keys())
        raise ValueError(f"Unknown format '{format_name}'. Available: {available}")
    plugin = plugins[format_name]
    result = plugin.format(data)
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    outfile = os.path.join(OUTPUT_DIR, f"export{plugin.extension()}")
    with open(outfile, "w") as f:
        f.write(result)
    print(f"Exported {len(data)} rows to {outfile} using {plugin.name()} plugin.")


if __name__ == "__main__":
    sample_data = [
        {"name": "Alice", "score": 95, "grade": "A"},
        {"name": "Bob",   "score": 82, "grade": "B"},
        {"name": "Carol", "score": 77, "grade": "C"},
    ]
    format_choice = sys.argv[1] if len(sys.argv) > 1 else "JSON"
    export(sample_data, format_choice)

Output (running with CSV):

Exported 3 rows to output/export.csv using CSV plugin.

Output (running with JSON):

Exported 3 rows to output/export.json using JSON plugin.

To add a new format — say, Markdown — you create a single file plugins/markdown_plugin.py that subclasses FormatterPlugin and implements name(), extension(), and format(). The exporter picks it up automatically on the next run. Zero changes to exporter.py, zero changes to existing plugins. That moment of “it worked!” is the plugin pattern delivering on its promise.

For more on the importlib internals, read the official docs at https://docs.python.org/3/library/importlib.html. The importlib.metadata submodule is worth exploring next if you move toward installable plugin packages.

How To Use Python importlib for Dynamic Module Loading

How To Use Python importlib for Dynamic Module Loading

Advanced

You are building a data processing tool and you want users to be able to drop their own processor modules into a plugins/ folder without touching your core code. Or you have a CLI that loads formatters by name from config: formatter = "json" should load your formatters.json_formatter module, and switching to "csv" should swap it out without a code change. Static import statements cannot do either of these things — you need runtime module loading, and that is exactly what importlib provides.

Python’s importlib module is the programmatic interface to Python’s import system. Everything the import statement does, importlib can do — plus things the import statement cannot, like loading modules from arbitrary file paths, reloading live modules during development, and inspecting the import machinery itself.

In this article you will learn how to use importlib.import_module() for dynamic imports, importlib.reload() for hot-reloading, importlib.util.spec_from_file_location() for loading modules from arbitrary paths, and how to combine these into a working plugin system. By the end, you will be able to build applications that discover and load user-provided code at runtime.

Dynamic Imports with importlib: Quick Example

Here is the core use case: import a module by name when the name is only known at runtime.

# importlib_quick.py
import importlib

# Same as: import json
module_name = 'json'
json = importlib.import_module(module_name)

data = {'user': 'alice', 'score': 99}
print(json.dumps(data, indent=2))

# Same as: from pathlib import Path
Path = importlib.import_module('pathlib').Path
p = Path('/tmp/example.txt')
print(f"Path stem: {p.stem}")

# Load a submodule: same as import os.path
os_path = importlib.import_module('os.path')
print(f"os.path.sep: {os_path.sep}")
{
  "user": "alice",
  "score": 99
}
Path stem: example
os.path.sep: /

importlib.import_module(name) accepts a fully qualified module name as a string and returns the module object, exactly as if you had written import name. The module is cached in sys.modules after the first import, so subsequent calls are instantaneous. For submodule access, pass the full dotted name: importlib.import_module('os.path') loads os.path.

What Is importlib and Why Use It?

Every import statement in Python is backed by the importlib machinery. In Python 3.1+, the entire import system was rewritten in pure Python using importlib, making it inspectable and overridable. As an application developer, you interact with it when you need imports that cannot be expressed as static source code.

Use caseStatic importimportlib solution
Module name from configNot possibleimport_module(name)
Load file outside sys.pathNot possiblespec_from_file_location
Reload changed module in devNot possiblereload(module)
Conditional import with fallbackTry/except ImportErrorimport_module + try/except
Plugin discovery from directoryNot possiblespec_from_file_location loop
Check if module existstry: import Xutil.find_spec(name)

The key mental model: import foo is just syntactic sugar for importlib.import_module('foo') with the result bound to the name foo in the current namespace. Once you see it this way, dynamic imports feel natural rather than magical.

import_module in Practice

The most common patterns for import_module are configuration-driven dispatch and safe optional imports with fallback.

# importlib_dispatch.py
import importlib

# --- Pattern 1: Config-driven module loading ---
SERIALIZERS = {
    'json':   'json',
    'pickle': 'pickle',
    'csv':    'csv',
}

def get_serializer(fmt: str):
    """Load a serializer module by format name from config."""
    module_name = SERIALIZERS.get(fmt)
    if not module_name:
        raise ValueError(f"Unknown format: {fmt}. Valid: {list(SERIALIZERS)}")
    return importlib.import_module(module_name)

for fmt in ['json', 'csv']:
    mod = get_serializer(fmt)
    print(f"Loaded {fmt}: {mod.__name__} v{getattr(mod, '__version__', 'built-in')}")

# --- Pattern 2: Optional import with fallback ---
def load_optional(preferred: str, fallback: str):
    try:
        return importlib.import_module(preferred)
    except ImportError:
        print(f"  {preferred} not installed, using {fallback}")
        return importlib.import_module(fallback)

# Try ujson first (faster), fall back to stdlib json
json_mod = load_optional('ujson', 'json')
print(f"JSON module: {json_mod.__name__}")

# --- Pattern 3: Relative import equivalent ---
# import_module('..utils', package='myapp.sub') == from ..utils import *
# Only useful inside a package; shown here as pattern reference
def import_relative(module_path: str, from_package: str):
    return importlib.import_module(module_path, package=from_package)
Loaded json: json v built-in
Loaded csv: csv v built-in
  ujson not installed, using json
JSON module: json

The optional-import pattern is far cleaner than wrapping every optional dependency in a try/except at the top of the file. You can centralize all optional-dependency handling in one utility function and use it throughout the codebase. The relative import equivalent (package= parameter) is only meaningful inside an actual package structure and is used by framework internals.

importlib.reload for Hot Reloading

During development, you sometimes want to reload a module after editing it without restarting the Python process — for example in a long-running REPL session or an interactive development loop. importlib.reload(module) re-executes the module’s code in place.

# importlib_reload.py
import importlib
import sys
import json

# First import
import json as json_mod
print(f"Initial id: {id(json_mod)}")

# Simulate "re-importing" after a change
importlib.reload(json_mod)
print(f"After reload id: {id(json_mod)}")  # same module object, re-executed

# Check that it still works
data = json_mod.loads('{"x": 1}')
print(f"After reload, json.loads works: {data}")

# Important caveat: aliases are NOT updated by reload
import os.path as osp_alias
print(f"Before reload: {id(osp_alias)}")
importlib.reload(sys.modules['os.path'])
# osp_alias still points to the OLD module object!
# You must re-bind: osp_alias = sys.modules['os.path']
print(f"After reload (alias unchanged): {id(osp_alias)}")
Initial id: 140234567890
After reload id: 140234567890
After reload, json.loads works: {'x": 1}
Before reload: 140234567891
After reload (alias unchanged): 140234567891

The critical gotcha: reload() re-executes the module file but does NOT update existing references that were bound before the reload. Any variable that holds from mymodule import MyClass still points to the old class. After reloading, you must re-import to get the fresh objects. This is why hot reloading in production is risky — reload is primarily a development convenience tool, not a zero-downtime deployment mechanism.

Loading Modules from File Paths

The most powerful (and most careful-use) feature of importlib is loading a module from an arbitrary file path — one that is not on sys.path and has no package structure. This is the foundation of plugin systems.

# importlib_from_file.py
import importlib.util
import os
import sys
import tempfile

# Create a temporary plugin file to demonstrate
PLUGIN_CODE = '''
PLUGIN_NAME = "demo_plugin"
VERSION = "1.0.0"

def process(data):
    """Example plugin: uppercase all string values in a dict."""
    return {k: v.upper() if isinstance(v, str) else v
            for k, v in data.items()}

def describe():
    return f"{PLUGIN_NAME} v{VERSION}: uppercases string values"
'''

# Write the plugin to a temp file
with tempfile.NamedTemporaryFile(
    mode='w', suffix='.py', delete=False, prefix='plugin_demo_'
) as f:
    f.write(PLUGIN_CODE)
    plugin_path = f.name

try:
    # Load the module from the file path
    spec = importlib.util.spec_from_file_location("demo_plugin", plugin_path)
    module = importlib.util.module_from_spec(spec)

    # Register in sys.modules so other imports can find it
    sys.modules["demo_plugin"] = module

    # Execute the module (runs all top-level code)
    spec.loader.exec_module(module)

    # Use it like any other module
    print(module.describe())
    result = module.process({'name': 'alice', 'role': 'admin', 'score': 99})
    print(f"Processed: {result}")
    print(f"Plugin name: {module.PLUGIN_NAME}")

finally:
    os.unlink(plugin_path)
demo_plugin v1.0.0: uppercases string values
Processed: {'name': 'ALICE', 'role': 'ADMIN', 'score': 99}
Plugin name: demo_plugin

The three-step pattern — spec_from_file_location, module_from_spec, exec_module — is the canonical way to load a module from a path. Adding it to sys.modules is optional but recommended: it prevents the module from being loaded twice if something else tries to import it by name, and it allows the loaded module to use relative imports internally.

Real-Life Example: Plugin Discovery System

Here is a complete plugin discovery system that scans a directory for Python files, loads each one as a plugin, validates it against a required interface, and runs them in a pipeline.

# importlib_plugins.py
import importlib.util
import importlib
import sys
import os
import tempfile
from pathlib import Path

# --- Define the plugin interface ---
REQUIRED_FUNCTIONS = ['transform', 'describe']

def load_plugin(path: Path) -> object:
    """Load a Python file as a plugin module. Returns module or None."""
    name = f"plugin_{path.stem}"
    spec = importlib.util.spec_from_file_location(name, path)
    if spec is None:
        return None
    module = importlib.util.module_from_spec(spec)
    sys.modules[name] = module
    try:
        spec.loader.exec_module(module)
    except Exception as e:
        print(f"  [SKIP] {path.name}: load error -- {e}")
        return None

    # Validate interface
    missing = [fn for fn in REQUIRED_FUNCTIONS if not hasattr(module, fn)]
    if missing:
        print(f"  [SKIP] {path.name}: missing functions {missing}")
        return None

    return module

def discover_plugins(plugin_dir: Path) -> list:
    """Scan a directory and load all valid plugins."""
    plugins = []
    for path in sorted(plugin_dir.glob('*.py')):
        if path.name.startswith('_'):
            continue   # skip __init__.py etc
        plugin = load_plugin(path)
        if plugin:
            plugins.append(plugin)
            print(f"  [OK] Loaded: {plugin.describe()}")
    return plugins


# Create a temporary plugin directory with sample plugins
with tempfile.TemporaryDirectory(prefix='plugins_') as plugin_dir:
    pd = Path(plugin_dir)

    # Plugin 1: uppercase transformer
    (pd / 'upper_plugin.py').write_text('''
def transform(data):
    return {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
def describe(): return "upper_plugin: converts string values to uppercase"
''')

    # Plugin 2: trim whitespace
    (pd / 'trim_plugin.py').write_text('''
def transform(data):
    return {k: v.strip() if isinstance(v, str) else v for k, v in data.items()}
def describe(): return "trim_plugin: strips whitespace from string values"
''')

    # Plugin 3: bad plugin (missing interface)
    (pd / 'bad_plugin.py').write_text('''
VERSION = "1.0"
# Missing transform and describe
''')

    print("=== Discovering plugins ===")
    plugins = discover_plugins(pd)

    print(f"\n=== Running {len(plugins)} plugins ===")
    data = {'name': '  Alice  ', 'role': '  admin  ', 'score': 95}
    print(f"Input:  {data}")

    for plugin in plugins:
        data = plugin.transform(data)
        print(f"After {plugin.__name__.split('_')[1]}: {data}")
=== Discovering plugins ===
  [SKIP] bad_plugin.py: missing functions ['transform', 'describe']
  [OK] Loaded: trim_plugin: strips whitespace from string values
  [OK] Loaded: upper_plugin: converts string values to uppercase

=== Running 2 plugins ===
Input:  {'name': '  Alice  ', 'role': '  admin  ', 'score': 95}
After trim: {'name': 'Alice', 'role': 'admin', 'score': 95}
After upper: {'name': 'ALICE', 'role': 'ADMIN', 'score': 95}

This pattern is used by web frameworks (Starlette middleware, Django apps), test runners (pytest plugins), and data pipeline tools (Airflow operators). Users drop Python files into the plugins directory, the system discovers and validates them, and the application gains new capabilities without a code change. The interface validation step (checking for required functions) is what separates a robust plugin system from one that crashes mysteriously on malformed plugins.

Frequently Asked Questions

When should I use importlib.import_module vs a regular import?

Use a regular import statement whenever the module name is known at write time. Use importlib.import_module when the module name is determined at runtime — from a config file, command-line argument, database record, or environment variable. Also use it for optional-dependency patterns where you want to try a fast implementation (like ujson) and fall back to the stdlib version. Static imports are always clearer and slightly faster; dynamic imports should only be used when static ones cannot express the required behavior.

How can I check if a module is available without importing it?

Use importlib.util.find_spec('module_name'). It returns a ModuleSpec if the module is findable, or None if it is not. This lets you check for optional dependencies in a guard clause: if importlib.util.find_spec('numpy') is None: raise RuntimeError("numpy is required"). Unlike a try/except import, find_spec does not actually execute the module code, so it is faster for availability checks.

What are the dangers of importlib.reload in production?

Several. First, existing references (variables that already hold objects from the old module) are not updated by reload — they keep pointing to old class definitions, which causes isinstance checks to fail and creates hard-to-debug type mismatch errors. Second, module-level side effects (registering signal handlers, opening database connections, starting background threads) run again. Third, C extension modules generally cannot be reloaded at all. Use reload only in development REPLs and hot-reload frameworks that are specifically designed to handle the reference-update problem.

Should I add dynamically loaded plugins to sys.modules?

Yes, as a best practice. Adding to sys.modules prevents the module from being loaded twice if anything else imports it by name, allows the plugin to use Python’s import machinery (relative imports, package detection), and makes the module visible to debugging and profiling tools. Use a unique, namespaced key like "plugins.my_plugin_name" to avoid collisions with existing modules.

Is loading plugins from arbitrary paths a security risk?

Yes, significantly. A malicious .py file in the plugins directory will execute arbitrary Python code with full access to your process’s permissions. Mitigations include: only loading plugins from trusted, access-controlled directories; running plugins in a subprocess with restricted permissions; using a sandboxing approach for untrusted code (though Python sandboxing is notoriously hard to do correctly); and validating plugin files with a linter or AST checker before loading. Never load plugins from user-supplied file paths without thorough sanitization.

Conclusion

The importlib module gives you programmatic control over Python’s import system. You learned how import_module() replaces static imports when the module name is only known at runtime, how reload() re-executes a module for development hot-reloading (with its important caveats), how spec_from_file_location loads modules from arbitrary file paths, and how these tools combine into a production-quality plugin discovery system.

To extend your learning, add error isolation to the plugin system: run each plugin’s transform in a try/except block so a crashing plugin does not abort the entire pipeline. Then add a version_check() validation step that reads a REQUIRED_API_VERSION attribute from each plugin and skips incompatible ones. These two additions will take the example from a demonstration to something you could ship.

Official documentation: https://docs.python.org/3/library/importlib.html.

Importing by String Name

The killer use case for importlib: load a module whose name isn’t known until runtime. Plugin systems, configuration-driven loaders, lazy imports — all enabled by one function:

import importlib

module = importlib.import_module("myapp.handlers.email")
result = module.send(to="alice@example.com", subject="hi")

# Module name with dots works
plugin_name = config["plugin"]
plugin = importlib.import_module("myapp.plugins." + plugin_name)
plugin.register()

The function returns the module object — exactly what you’d get from a static import. The dynamic part is just that the string name was computed.

Reloading Modules

import importlib
import myapp.config

# Edit myapp/config.py externally...
importlib.reload(myapp.config)
# Now myapp.config reflects the new file

Reload is useful for development tools, REPL workflows, and hot-reload servers. NOT for production — references to old module objects keep the old behavior, leading to confusing “but I changed it!” bugs.

Loading from a File Path

For plugin systems where the file isn’t on sys.path:

import importlib.util
import sys

def load_from_path(name, path):
    spec = importlib.util.spec_from_file_location(name, path)
    module = importlib.util.module_from_spec(spec)
    sys.modules[name] = module
    spec.loader.exec_module(module)
    return module

# Load a Python file by absolute path
my_plugin = load_from_path("user_plugin", "/home/user/plugins/myplugin.py")
my_plugin.run()

Entry Points: Discovering Installed Plugins

For pip-installable plugins, use the entry-points mechanism. Each plugin declares itself in setup.cfg / pyproject.toml; your app discovers them at runtime:

from importlib.metadata import entry_points

# Find every plugin that registered itself as "myapp.plugins"
for ep in entry_points(group="myapp.plugins"):
    plugin_class = ep.load()
    plugin = plugin_class()
    plugin.register()
# In the plugin's pyproject.toml:
[project.entry-points."myapp.plugins"]
my_cool_plugin = "my_pkg.module:CoolPlugin"

This is how setuptools console_scripts work, how pytest discovers plugins, how Flask Blueprints register. It’s the Python ecosystem’s standard plugin-discovery mechanism.

Reading Package Resources

For non-Python files shipped inside a package (templates, CSS, sample data), use importlib.resources instead of hardcoded paths:

from importlib.resources import files

# Get a Path-like object pointing into the package
config_text = files("myapp.config").joinpath("default.yaml").read_text()
template_html = files("myapp.templates").joinpath("email.html").read_text()

# For binary files
icon_bytes = files("myapp.assets").joinpath("icon.png").read_bytes()

This works whether the package is installed as a directory, as a wheel, or zipped as a .pyz — no __file__ path tricks needed.

Lazy Imports

For large dependencies you don’t always use, defer the import until first call:

def generate_pdf(html):
    # weasyprint is huge and slow to import — load lazily
    import importlib
    weasyprint = importlib.import_module("weasyprint")
    return weasyprint.HTML(string=html).write_pdf()

# Or use the standard pattern
_weasyprint = None
def get_weasyprint():
    global _weasyprint
    if _weasyprint is None:
        import weasyprint
        _weasyprint = weasyprint
    return _weasyprint

Common Pitfalls

  • importlib.reload on packages. Reload only reloads the named module, not its sub-modules. For packages, walk and reload each submodule manually.
  • Holding references to reloaded modules. If you imported from foo import bar, reloading foo doesn’t update bar. Re-import after reload.
  • sys.path not updated. import_module looks at sys.path. If the plugin lives elsewhere, either add to sys.path first or use spec_from_file_location.
  • Forgetting sys.modules[name] = module. When loading by file path, you must register in sys.modules; otherwise relative imports inside the plugin fail.
  • Entry points in editable installs. pip install -e . needs to re-run setup to update entry points. Re-install or use importlib.reload(metadata).

FAQ

Q: importlib.import_module vs __import__?
A: import_module is the public, friendly version. __import__ is the low-level builtin called by the import statement — clunky API. Always use importlib.import_module.

Q: How do plugin systems handle errors?
A: Wrap each plugin load in try/except. Log failures, continue loading others. A broken plugin should not crash the host app.

Q: How do I find all submodules of a package?
A: pkgutil.iter_modules(my_package.__path__) walks the package directory and yields module names.

Q: Can I unload a module?
A: del sys.modules[name] removes it from cache. Existing references still work; future imports re-execute the module.

Q: importlib.resources or pkg_resources?
A: importlib.resources is the modern stdlib answer. pkg_resources is legacy (from setuptools) — slower, more dependencies. Migrate.

Wrapping Up

importlib is the foundation of every Python plugin system, dynamic loader, and framework extension mechanism. import_module for dynamic by-name imports; spec_from_file_location for loading off sys.path; entry_points for pip-installable plugins; files for package resources. Pick the right one and your plugin architecture stays clean.