How To Use Python Regular Expressions (re Module) with Examples

How To Use Python Regular Expressions (re Module) with Examples

Intermediate

You need to find, extract, or replace text patterns in strings — email addresses in a document, phone numbers in a log file, or URLs scattered across a web page. Python’s built-in re module gives you regular expressions, a powerful pattern-matching language that handles all of these tasks with concise, reusable patterns.

Regular expressions look intimidating at first, but they follow logical rules. Once you learn the core patterns, you will reach for them constantly — for data validation, log parsing, text cleanup, and search-and-replace operations. Python’s re module is part of the standard library, so there is nothing to install.

In this tutorial, you will learn the essential regex patterns, how to use re.search(), re.findall(), re.sub(), and re.compile(), work with groups and named groups, handle common real-world patterns, and build a complete log parser project.

Pattern Matching: Quick Example

Here is a minimal example that extracts all email addresses from a block of text.

# quick_regex.py
import re

text = "Contact us at support@example.com or sales@company.org for help."
pattern = r'[\w.+-]+@[\w-]+\.[\w.]+'

emails = re.findall(pattern, text)
for email in emails:
    print(email)

Output:

support@example.com
sales@company.org

The r prefix creates a raw string, which prevents Python from interpreting backslashes as escape characters. Always use raw strings for regex patterns.

Pattern matching with regex in Python
Pattern matching is just string searching with superpowers — and a learning curve.

What Are Regular Expressions and Why Use Them?

Regular expressions (regex) are sequences of characters that define search patterns. Think of them as advanced wildcards — where a file search might use *.txt to match all text files, regex gives you the precision to match patterns like “any string that looks like a phone number” or “every word followed by a colon.”

Pattern Matches Example
\d Any digit (0-9) \d{3} matches “123”
\w Word character (letter, digit, underscore) \w+ matches “hello_world”
\s Whitespace (space, tab, newline) \s+ matches ” “
. Any character except newline a.c matches “abc”, “a1c”
+ One or more of previous \d+ matches “42”, “1000”
* Zero or more of previous \d* matches “”, “5”, “99”
? Zero or one of previous colou?r matches “color”, “colour”
[abc] Any character in the set [aeiou] matches vowels
^ Start of string ^Hello matches “Hello world”
$ End of string world$ matches “Hello world”

Core Functions: search, match, findall, sub

The re module provides four main functions. Each serves a different use case.

# core_functions.py
import re

text = "Order #12345 was placed on 2024-03-15 for $49.99"

# search() - find first match anywhere in string
match = re.search(r'#(\d+)', text)
if match:
    print(f"Order number: {match.group(1)}")

# findall() - find all matches, return as list
prices = re.findall(r'\$[\d.]+', text)
print(f"Prices found: {prices}")

# sub() - search and replace
cleaned = re.sub(r'\$[\d.]+', '[PRICE]', text)
print(f"Cleaned: {cleaned}")

# match() - match at the START of string only
result = re.match(r'Order', text)
print(f"Starts with Order: {result is not None}")

Output:

Order number: 12345
Prices found: ['$49.99']
Cleaned: Order #12345 was placed on 2024-03-15 for [PRICE]
Starts with Order: True

Use search() when you want to find a pattern anywhere in the string. Use match() only when you specifically need to check the beginning of the string.

Groups and Named Groups

Parentheses in regex create capture groups that let you extract specific parts of a match. Named groups make your code more readable.

# groups.py
import re

log_line = '2024-03-15 14:30:22 ERROR [auth] Failed login for user admin from 192.168.1.100'

# Numbered groups
pattern = r'(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}) (\w+) \[(\w+)\] (.+)'
match = re.search(pattern, log_line)
if match:
    print(f"Date: {match.group(1)}")
    print(f"Time: {match.group(2)}")
    print(f"Level: {match.group(3)}")
    print(f"Module: {match.group(4)}")
    print(f"Message: {match.group(5)}")

# Named groups (much more readable)
pattern = r'(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) (?P<level>\w+)'
match = re.search(pattern, log_line)
if match:
    print(f"\nNamed: {match.group('date')} at {match.group('time')} [{match.group('level')}]")

Output:

Date: 2024-03-15
Time: 14:30:22
Level: ERROR
Module: auth
Message: Failed login for user admin from 192.168.1.100

Named: 2024-03-15 at 14:30:22 [ERROR]

Named groups use the (?P<name>...) syntax. They are especially valuable in complex patterns where numbered groups become confusing.

Using capture groups in Python regex
Capture groups let you grab exactly what you need from the chaos.

Compiled Patterns with re.compile()

When you use the same regex pattern multiple times, compile it first for better performance and cleaner code.

# compiled.py
import re

# Compile once, use many times
email_pattern = re.compile(r'[\w.+-]+@[\w-]+\.[\w.]+')
phone_pattern = re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b')

contacts = [
    "Call John at 555-123-4567 or email john@example.com",
    "Reach Jane at jane@company.org or 555.987.6543",
    "No contact info here, sorry!",
]

for text in contacts:
    emails = email_pattern.findall(text)
    phones = phone_pattern.findall(text)
    print(f"Emails: {emails}, Phones: {phones}")

Output:

Emails: ['john@example.com'], Phones: ['555-123-4567']
Emails: ['jane@company.org'], Phones: ['555.987.6543']
Emails: [], Phones: []

Compiled patterns have the same methods as the re module itself — search(), findall(), sub(), etc. The performance benefit comes from compiling the regex once instead of recompiling it on every call.

Common Real-World Patterns

Here are battle-tested patterns for common data extraction tasks.

# common_patterns.py
import re

# URL extraction
text = "Visit https://example.com or http://docs.python.org/3/ for details"
urls = re.findall(r'https?://[\w./\-?=&]+', text)
print("URLs:", urls)

# Date extraction (YYYY-MM-DD)
text = "Events on 2024-03-15 and 2024-12-25"
dates = re.findall(r'\d{4}-\d{2}-\d{2}', text)
print("Dates:", dates)

# IP address
text = "Server 192.168.1.100 responded, backup at 10.0.0.1"
ips = re.findall(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', text)
print("IPs:", ips)

# HTML tag stripping
html = "<p>Hello <b>world</b></p>"
clean = re.sub(r'<[^>]+>', '', html)
print("Stripped:", clean)

Output:

URLs: ['https://example.com', 'http://docs.python.org/3/']
Dates: ['2024-03-15', '2024-12-25']
IPs: ['192.168.1.100', '10.0.0.1']
Stripped: Hello world

These patterns handle the most common extraction tasks. For production use, consider edge cases — the email pattern above works for most addresses but does not cover every valid format defined in the RFC.

Advanced regex methods in Python
re.sub(), re.split(), re.findall() — pick your weapon wisely.

Real-Life Example: Building a Log File Parser

# log_parser.py
import re
from collections import Counter

# Sample log data
log_data = """
2024-03-15 14:30:22 ERROR [auth] Failed login for user admin from 192.168.1.100
2024-03-15 14:30:25 INFO [auth] Successful login for user alice from 10.0.0.50
2024-03-15 14:31:00 WARNING [db] Slow query detected: 2.5s
2024-03-15 14:31:05 ERROR [api] Timeout connecting to payment service
2024-03-15 14:31:10 INFO [auth] Successful login for user bob from 10.0.0.51
2024-03-15 14:31:15 ERROR [auth] Failed login for user admin from 192.168.1.100
2024-03-15 14:31:20 INFO [api] Health check passed
2024-03-15 14:31:30 ERROR [db] Connection pool exhausted
""".strip()

log_pattern = re.compile(
    r'(?P\d{4}-\d{2}-\d{2}) '
    r'(?P

Output:

Log level counts:
  ERROR: 4
  INFO: 3
  WARNING: 1

Failed login attempts: 2
Suspicious IPs:
  192.168.1.100: 2 failed attempts

This parser combines compiled regex patterns, named groups, and Python’s Counter class to analyze log files efficiently. You can extend it to read from actual log files, send alerts for suspicious activity, or export metrics to a dashboard.

Frequently Asked Questions

What is the difference between greedy and lazy matching?

By default, quantifiers like + and * are greedy — they match as much as possible. Adding ? makes them lazy, matching as little as possible. For example, <.+> on “<b>bold</b>” matches the entire string, while <.+?> matches just “<b>”.

How do I match across multiple lines?

Use the re.DOTALL flag to make . match newlines, or re.MULTILINE to make ^ and $ match at line boundaries instead of string boundaries. Pass these as the flags parameter: re.search(pattern, text, flags=re.MULTILINE).

When do I need to escape special characters?

Escape regex metacharacters (. * + ? [ ] ( ) { } ^ $ | \) with a backslash when you want to match them literally. Use re.escape() to automatically escape a string: re.escape("price is $9.99") returns "price\ is\ \$9\.99".

Are regular expressions slow?

For simple pattern matching, string methods like str.startswith(), str.endswith(), and in are faster. Use regex when you need pattern matching that string methods cannot handle. Compile patterns with re.compile() if you use them repeatedly.

When should I NOT use regex?

Do not use regex to parse HTML or XML — use BeautifulSoup or lxml instead. Do not use regex for JSON — use the json module. Regex is best for flat text patterns, not nested structures. If your pattern needs recursive matching, you need a proper parser.

Conclusion

Regular expressions are one of the most powerful tools in a Python developer’s toolkit. We covered the core syntax, the main re module functions, capture groups, compiled patterns, common real-world patterns, and building a complete log parser.

For the full re module documentation, visit docs.python.org/3/library/re. For interactive regex testing, try regex101.com which provides real-time explanations of your patterns.

How To Use SQLite with Python: A Complete Guide

How To Use SQLite with Python: A Complete Guide

Beginner

You need a database for your Python project, but setting up MySQL or PostgreSQL feels like overkill. SQLite is a lightweight, serverless database that comes built into Python — no installation, no configuration, no separate server process. Just import sqlite3 and start storing data in a single file.

Python’s built-in sqlite3 module provides a full-featured interface to SQLite databases. It supports SQL queries, transactions, parameterized statements, and works identically on every platform. Your database is just a file on disk that you can copy, backup, or share like any other file.

In this tutorial, you will learn how to create databases and tables, insert and query data, use parameterized queries to prevent SQL injection, handle transactions, work with context managers, and build a complete project. By the end, you will be able to add persistent data storage to any Python project in minutes.

SQLite in Python: Quick Example

Here is a complete working example that creates a database, inserts data, and queries it — all in under 15 lines.

# quick_sqlite.py
import sqlite3

conn = sqlite3.connect("example.db")
cursor = conn.cursor()

cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com"))
conn.commit()

cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
    print(row)

conn.close()

Output:

(1, 'Alice', 'alice@example.com')
(2, 'Bob', 'bob@example.com')

The ? placeholders in the INSERT statement are parameterized queries — they prevent SQL injection by keeping data separate from SQL code. Never use f-strings or string concatenation to build SQL queries.

What Is SQLite and When Should You Use It?

SQLite is a self-contained, file-based relational database engine. Unlike MySQL or PostgreSQL, it does not require a separate server process. The entire database lives in a single file on your filesystem.

Feature SQLite PostgreSQL MySQL
Server required No Yes Yes
Setup complexity Zero Medium Medium
Concurrent writers Limited Excellent Good
Max database size 281 TB Unlimited Unlimited
Best for Apps, prototypes, scripts Web apps, analytics Web apps, CMS

Use SQLite when you need persistent storage for a single-user application, prototype, CLI tool, data processing script, or any project where simplicity matters more than concurrent access. Switch to PostgreSQL when you need multiple simultaneous writers or a web application with many users.

SQLite: a database in a file. Embedded. Reliable.
SQLite: a database in a file. Embedded. Reliable.

Creating Databases and Tables

Connecting to a database file creates it automatically if it does not exist. Use CREATE TABLE IF NOT EXISTS to make your scripts idempotent.

# create_tables.py
import sqlite3

conn = sqlite3.connect("bookstore.db")
cursor = conn.cursor()

cursor.execute("""
    CREATE TABLE IF NOT EXISTS books (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title TEXT NOT NULL,
        author TEXT NOT NULL,
        price REAL DEFAULT 0.0,
        published_year INTEGER,
        in_stock INTEGER DEFAULT 1
    )
""")

cursor.execute("""
    CREATE TABLE IF NOT EXISTS categories (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT UNIQUE NOT NULL
    )
""")

conn.commit()
print("Tables created successfully")

# Verify tables exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print("Tables:", [t[0] for t in tables])
conn.close()

Output:

Tables created successfully
Tables: ['books', 'categories']

The AUTOINCREMENT keyword makes SQLite assign a unique incrementing ID to each new row. NOT NULL prevents inserting rows with missing required fields.

Inserting Data Safely

Always use parameterized queries (the ? placeholder pattern) when inserting data. This prevents SQL injection and handles special characters in your data automatically.

# insert_data.py
import sqlite3

conn = sqlite3.connect("bookstore.db")
cursor = conn.cursor()

# Single insert with parameterized query
cursor.execute(
    "INSERT INTO books (title, author, price, published_year) VALUES (?, ?, ?, ?)",
    ("Python Crash Course", "Eric Matthes", 35.99, 2023)
)

# Bulk insert with executemany
books = [
    ("Fluent Python", "Luciano Ramalho", 55.99, 2022),
    ("Automate the Boring Stuff", "Al Sweigart", 29.99, 2019),
    ("Clean Code", "Robert C. Martin", 39.99, 2008),
]
cursor.executemany(
    "INSERT INTO books (title, author, price, published_year) VALUES (?, ?, ?, ?)",
    books
)

conn.commit()
print(f"Inserted {cursor.rowcount + 1} books")
conn.close()

Output:

Inserted 4 books

The executemany() method is significantly faster than calling execute() in a loop because it batches the operations into a single transaction.

SQLite for prototypes. Postgres for production. Usually.
SQLite for prototypes. Postgres for production. Usually.

Querying Data

SQLite supports full SQL query syntax. Use fetchall() for all results, fetchone() for a single row, or iterate directly over the cursor.

# query_data.py
import sqlite3

conn = sqlite3.connect("bookstore.db")
conn.row_factory = sqlite3.Row  # Enable column name access
cursor = conn.cursor()

# Select all books
cursor.execute("SELECT * FROM books ORDER BY price DESC")
print("All books by price:")
for row in cursor.fetchall():
    print(f"  {row['title']} by {row['author']} - ${row['price']}")

# Filtered query with parameter
cursor.execute("SELECT * FROM books WHERE price < ? ORDER BY title", (40.0,))
print("\nBooks under $40:")
for row in cursor.fetchall():
    print(f"  {row['title']} (${row['price']})")

# Aggregate query
cursor.execute("SELECT COUNT(*) as count, AVG(price) as avg_price FROM books")
stats = cursor.fetchone()
print(f"\nTotal: {stats['count']} books, Average price: ${stats['avg_price']:.2f}")
conn.close()

Output:

All books by price:
  Fluent Python by Luciano Ramalho - $55.99
  Clean Code by Robert C. Martin - $39.99
  Python Crash Course by Eric Matthes - $35.99
  Automate the Boring Stuff by Al Sweigart - $29.99

Books under $40:
  Automate the Boring Stuff ($29.99)
  Clean Code ($39.99)
  Python Crash Course ($35.99)

Total: 4 books, Average price: $40.49

Setting conn.row_factory = sqlite3.Row lets you access columns by name instead of index. This makes your code more readable and resilient to schema changes.

Using Context Managers

Context managers ensure your database connection is properly closed even if an error occurs. They also handle transaction commits automatically on success and rollbacks on failure.

# context_manager.py
import sqlite3

# The connection as context manager handles commits/rollbacks
with sqlite3.connect("bookstore.db") as conn:
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    cursor.execute(
        "INSERT INTO books (title, author, price, published_year) VALUES (?, ?, ?, ?)",
        ("Learning Python", "Mark Lutz", 59.99, 2013)
    )
    # Commit happens automatically when exiting the with block

    cursor.execute("SELECT COUNT(*) as count FROM books")
    print(f"Total books: {cursor.fetchone()['count']}")
# Connection is committed and closed here

Output:

Total books: 5

The with statement commits the transaction when the block exits normally and rolls it back if an exception occurs. This is the recommended pattern for all database operations.

Single-process, zero-network. Faster than you think.
Single-process, zero-network. Faster than you think.

Real-Life Example: Building a Contact Book CLI

# contact_book.py
import sqlite3

class ContactBook:
    def __init__(self, db_path="contacts.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS contacts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                phone TEXT,
                email TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def add(self, name, phone=None, email=None):
        self.conn.execute(
            "INSERT INTO contacts (name, phone, email) VALUES (?, ?, ?)",
            (name, phone, email)
        )
        self.conn.commit()
        print(f"Added: {name}")

    def search(self, query):
        cursor = self.conn.execute(
            "SELECT * FROM contacts WHERE name LIKE ? OR email LIKE ?",
            (f"%{query}%", f"%{query}%")
        )
        return cursor.fetchall()

    def list_all(self):
        cursor = self.conn.execute("SELECT * FROM contacts ORDER BY name")
        return cursor.fetchall()

    def delete(self, contact_id):
        self.conn.execute("DELETE FROM contacts WHERE id = ?", (contact_id,))
        self.conn.commit()

    def close(self):
        self.conn.close()

# Demo usage
book = ContactBook(":memory:")
book.add("Alice Smith", "555-0101", "alice@example.com")
book.add("Bob Jones", "555-0102", "bob@example.com")
book.add("Charlie Brown", "555-0103", "charlie@example.com")

print("\nAll contacts:")
for c in book.list_all():
    print(f"  {c['name']} - {c['phone']} - {c['email']}")

print("\nSearch 'ali':")
for c in book.search("ali"):
    print(f"  {c['name']} - {c['email']}")

book.close()

Output:

Added: Alice Smith
Added: Bob Jones
Added: Charlie Brown

All contacts:
  Alice Smith - 555-0101 - alice@example.com
  Bob Jones - 555-0102 - bob@example.com
  Charlie Brown - 555-0103 - charlie@example.com

Search 'ali':
  Alice Smith - alice@example.com

Using ":memory:" creates an in-memory database for testing. Replace it with a filename like "contacts.db" for persistent storage. The class encapsulates all database operations, making it easy to extend with update, export, or import features.

Frequently Asked Questions

Is SQLite thread-safe in Python?

By default, a SQLite connection should only be used in the thread that created it. Set check_same_thread=False if you need to share a connection across threads, but be aware that concurrent writes can cause issues. For multi-threaded applications, give each thread its own connection.

How much data can SQLite handle?

SQLite can handle databases up to 281 terabytes. In practice, it works well for databases up to a few gigabytes. Performance depends more on query patterns than raw size -- proper indexing makes a bigger difference than database engine choice for most workloads.

How do I back up a SQLite database?

The simplest backup is to copy the database file when no writes are happening. For live databases, use Python's conn.backup() method which creates a consistent snapshot even during active writes. Schedule backups with cron or the schedule library.

How do I change the schema of an existing table?

SQLite has limited ALTER TABLE support. You can add columns with ALTER TABLE books ADD COLUMN rating REAL, but you cannot remove or rename columns in older SQLite versions. For complex changes, create a new table, copy data over, drop the old table, and rename the new one.

Should I use SQLite directly or through an ORM?

For simple scripts and small projects, the built-in sqlite3 module is perfect. For larger projects with complex queries and relationships, consider SQLAlchemy which provides an ORM layer. Start with raw SQL to understand the fundamentals, then add an ORM if the project grows.

Conclusion

SQLite with Python gives you a full relational database with zero configuration. We covered creating databases and tables, safe data insertion with parameterized queries, querying with filtering and aggregation, context managers for clean resource handling, and building a complete contact book application.

For the full SQLite documentation, visit docs.python.org/3/library/sqlite3. For SQLite SQL syntax, see sqlite.org/lang.

Parameterized Queries (Never Concatenate)

SQL injection is the #1 security mistake in database code. Never build queries with f-strings or string concatenation. Use parameterized queries — sqlite3 substitutes safely:

import sqlite3

con = sqlite3.connect("app.db")
cur = con.cursor()

# WRONG — injection vulnerability
user_input = "alice'; DROP TABLE users; --"
cur.execute(f"SELECT * FROM users WHERE name = '{user_input}'")

# RIGHT — placeholder + tuple
cur.execute("SELECT * FROM users WHERE name = ?", (user_input,))
print(cur.fetchall())

# Named parameters work too
cur.execute(
    "INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
    {"name": "Bob", "email": "bob@x.com", "age": 30},
)
con.commit()

The ? style is portable across DB-API drivers (psycopg2, mysql, etc.). The named :name style works in sqlite3 and reads better for queries with many parameters.

Context Managers for Transactions

Forgetting commit() after writes is the #2 mistake. The connection object is a context manager that auto-commits on exit, auto-rollbacks on exception:

import sqlite3

con = sqlite3.connect("app.db")

# Auto-commit on clean exit, auto-rollback on error
with con:
    con.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Carol", 25))
    con.execute("UPDATE accounts SET balance = balance - 100 WHERE user_id = 1")
    con.execute("UPDATE accounts SET balance = balance + 100 WHERE user_id = 2")
    # If any line raises, all three are rolled back atomically

# Standalone connection cleanup
con.close()

This is the safest write pattern — either the whole transaction commits, or nothing changes. Use it for any multi-statement update.

Row Factory for Named Columns

By default, fetchall() returns tuples — fine for two columns, painful for ten. Set row_factory = sqlite3.Row to get dict-like access:

con = sqlite3.connect("app.db")
con.row_factory = sqlite3.Row     # rows now act like namedtuples

cur = con.execute("SELECT id, name, email, created_at FROM users LIMIT 5")
for row in cur:
    # Access by column name OR index
    print(row["name"], row["email"])
    print(row[1], row[2])

    # Convert to dict if needed
    print(dict(row))

SQLite-Specific Features

SQLite has a few features other databases lack — worth knowing:

# INSERT OR REPLACE — upsert in one statement
cur.execute(
    "INSERT OR REPLACE INTO users (id, name) VALUES (?, ?)",
    (1, "Alice"),
)

# JSON support (built into modern SQLite)
cur.execute("CREATE TABLE events (id INTEGER PRIMARY KEY, data TEXT)")
cur.execute(
    "INSERT INTO events (data) VALUES (json(?))",
    ('{"user_id": 42, "action": "login"}',),
)
cur.execute("SELECT json_extract(data, '$.user_id') FROM events WHERE id = 1")

# Full-text search via FTS5
cur.executescript('''
    CREATE VIRTUAL TABLE docs USING fts5(title, body);
    INSERT INTO docs VALUES ('Python', 'A programming language');
    INSERT INTO docs VALUES ('SQLite', 'An embedded database');
''')
cur.execute("SELECT title FROM docs WHERE docs MATCH ?", ("programming",))

Performance: Indexes and EXPLAIN QUERY PLAN

SQLite is fast — but only with the right indexes. Every column you filter or join on should be indexed:

# Create indexes
cur.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_orders_user ON orders(user_id, created_at)")

# Diagnose slow queries
cur.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("a@b.com",))
print(cur.fetchall())
# Look for "SEARCH USING INDEX" — good
# Look for "SCAN" — slow, missing index

WAL Mode for Concurrent Reads

The default SQLite journaling mode (rollback) blocks readers during writes. Switch to WAL (Write-Ahead Logging) for better concurrency:

con = sqlite3.connect("app.db")
con.execute("PRAGMA journal_mode = WAL")
con.execute("PRAGMA synchronous = NORMAL")    # slightly less durable, much faster
con.execute("PRAGMA foreign_keys = ON")        # enforce FK constraints

# WAL is per-database — run once, persists across connections

WAL mode lets readers and writers operate without blocking each other (within reasonable limits). This is the single biggest performance unlock for read-heavy SQLite workloads.

Common Pitfalls

  • Forgetting commit() without context manager. If you don't use with con:, you must call con.commit() after writes — or they're lost when the connection closes.
  • Using one connection across threads. sqlite3 connections aren't thread-safe by default. Open one connection per thread, or use check_same_thread=False with a lock.
  • String concatenation in queries. SQL injection is a real risk even in single-user apps — never build queries with user input. Parameterize everything.
  • No foreign-key enforcement. SQLite doesn't enforce FK constraints by default. Set PRAGMA foreign_keys = ON on every connection.
  • Path issues. A relative path to connect() resolves against the cwd, not the script's directory. Use an absolute path or pathlib.Path(__file__).parent / "app.db".

FAQ

Q: SQLite or PostgreSQL?
A: SQLite for embedded, single-process, prototype, or low-write apps. Postgres for multi-process, concurrent writes, or anything needing real network access. SQLite is shockingly fast for read-heavy workloads.

Q: How big can a SQLite database get?
A: 281 TB theoretical maximum, but practically anything over 100 GB is pushing it. Multi-GB databases work fine; multi-TB is when you should evaluate PostgreSQL or DuckDB.

Q: How do I back up a SQLite database?
A: While the DB is in use, run VACUUM INTO 'backup.db' for a consistent snapshot. Or use the built-in backup API: src.backup(dst) in the sqlite3 module.

Q: Can multiple processes write to the same SQLite file?
A: Yes, with WAL mode, but writes serialize. For high write concurrency, switch to Postgres. For occasional concurrent writes (a few per second), SQLite is fine.

Q: SQLAlchemy or raw sqlite3?
A: Raw sqlite3 for small scripts where SQL is the simplest expression of intent. SQLAlchemy when you have many tables, migrations, complex relationships, or want portability across databases.

Wrapping Up

SQLite is the most-deployed database in the world (every smartphone has hundreds of SQLite databases). For Python, it's built in — no install, no service to run, no network. Master parameterized queries, transactions via with con:, row factories for ergonomics, indexes for speed, and WAL mode for concurrency. Those five skills cover 90% of SQLite work and prevent 99% of the foot-guns.

How To Handle Anti-Scraping Measures with Python

How To Handle Anti-Scraping Measures with Python

Advanced

You have built a scraper that works perfectly on practice sites, but real-world websites fight back. Rate limiting, CAPTCHAs, IP blocking, and browser fingerprinting are all common defenses designed to stop automated access. Understanding these measures — and the ethical ways to work around them — is essential for any serious scraping project.

Python gives you several tools to handle anti-scraping defenses ethically: rotating User-Agent headers, managing request timing, using sessions to maintain cookies, and respecting robots.txt files. The key principle is to make your scraper behave like a polite human visitor rather than an aggressive bot.

In this tutorial, you will learn about common anti-scraping techniques websites use, how to add proper headers and User-Agents, manage request timing and rate limits, handle cookies and sessions, check and respect robots.txt, and deal with common blocking scenarios. By the end, you will know how to build scrapers that work reliably without abusing the websites you scrape.

Handling Blocks: Quick Example

Many websites block requests that do not include a User-Agent header. Adding one is the simplest fix for “403 Forbidden” responses.

# quick_headers.py
import requests

url = "https://httpbin.org/headers"

# Without User-Agent (might get blocked on real sites)
response_bare = requests.get(url)
print("Status:", response_bare.status_code)

# With proper headers
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Accept": "text/html,application/xhtml+xml",
    "Accept-Language": "en-US,en;q=0.9",
}
response = requests.get(url, headers=headers)
data = response.json()
print("Sent headers:", list(data["headers"].keys()))

Output:

Status: 200
Sent headers: ['Accept', 'Accept-Language', 'Host', 'User-Agent', 'X-Amzn-Trace-Id']

The httpbin.org/headers endpoint echoes back the headers your request sent, which is perfect for verifying that your headers are being sent correctly.

Common Anti-Scraping Measures

Understanding what defenses websites use helps you build scrapers that handle them correctly. Here are the most common anti-scraping measures you will encounter.

Measure How It Works Your Response
Missing User-Agent block Rejects requests without browser-like headers Add realistic headers
Rate limiting Limits requests per time period per IP Add delays between requests
IP blocking Bans IPs making too many requests Slow down, use proxies ethically
CAPTCHA Requires human verification Respect it — do not automate CAPTCHA solving
JavaScript rendering Content loads only via JS Use Playwright or Selenium
robots.txt Declares which paths bots should avoid Always respect it
CAPTCHAs, rate limits, fingerprinting. The web fights back.
CAPTCHAs, rate limits, fingerprinting. The web fights back.

Respecting robots.txt

Every ethical scraper should check robots.txt before scraping. Python’s built-in urllib.robotparser module handles this for you.

# check_robots.py
from urllib.robotparser import RobotFileParser

rp = RobotFileParser()
rp.set_url("https://quotes.toscrape.com/robots.txt")
rp.read()

# Check if we can scrape specific paths
paths = ["/", "/page/2/", "/login", "/admin/"]
for path in paths:
    allowed = rp.can_fetch("*", path)
    print(f"{path}: {'Allowed' if allowed else 'Blocked'}")

# Check crawl delay
delay = rp.crawl_delay("*")
print(f"Crawl delay: {delay or 'Not specified'}")

Output:

/: Allowed
/page/2/: Allowed
/login: Allowed
/admin/: Allowed
Crawl delay: Not specified

The can_fetch() method returns True if the specified user agent is allowed to access the path according to the site’s robots.txt rules. Always check this before scraping any new website.

Managing Request Timing

The most common reason scrapers get blocked is making too many requests too quickly. Smart timing makes your scraper both polite and resilient.

# rate_limiting.py
import requests
import time
import random

def polite_get(url, session, min_delay=1.0, max_delay=3.0):
    """Make a request with random delay to avoid detection."""
    delay = random.uniform(min_delay, max_delay)
    time.sleep(delay)
    response = session.get(url)
    print(f"[{response.status_code}] {url} (waited {delay:.1f}s)")
    return response

session = requests.Session()
session.headers.update({
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
})

urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/headers",
    "https://httpbin.org/ip",
]

for url in urls:
    response = polite_get(url, session)

Output:

[200] https://httpbin.org/get (waited 1.7s)
[200] https://httpbin.org/headers (waited 2.3s)
[200] https://httpbin.org/ip (waited 1.2s)

Random delays between 1-3 seconds mimic human browsing patterns. Using requests.Session() maintains cookies across requests, which also makes your scraper behave more like a real browser.

Browser fingerprints: 32 dimensions of who-you-are.
Browser fingerprints: 32 dimensions of who-you-are.

Implementing Retry Logic

Network errors and temporary blocks happen. Retry logic with exponential backoff handles these gracefully without hammering the server.

# retry_logic.py
import requests
import time
from requests.exceptions import RequestException

def fetch_with_retry(url, max_retries=3, backoff_factor=2):
    """Fetch URL with exponential backoff retry."""
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    })

    for attempt in range(max_retries):
        try:
            response = session.get(url, timeout=10)
            if response.status_code == 200:
                return response
            elif response.status_code == 429:
                wait = backoff_factor ** attempt
                print(f"Rate limited. Waiting {wait}s before retry...")
                time.sleep(wait)
            else:
                print(f"Got status {response.status_code}")
                return response
        except RequestException as e:
            wait = backoff_factor ** attempt
            print(f"Error: {e}. Retrying in {wait}s...")
            time.sleep(wait)

    print("All retries exhausted")
    return None

result = fetch_with_retry("https://httpbin.org/get")
if result:
    print(f"Success: {result.status_code}")

Output:

Success: 200

Exponential backoff doubles the wait time with each retry (1s, 2s, 4s). This prevents your scraper from making the situation worse when a server is already under load. The timeout=10 parameter prevents hanging on unresponsive servers.

Session and Cookie Management

Using sessions properly is critical. Sessions maintain cookies across requests, which many websites require for normal browsing. Without proper session management, websites may treat each request as a new visitor and trigger anti-bot measures.

# session_management.py
import requests

session = requests.Session()
session.headers.update({
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9",
    "Accept-Language": "en-US,en;q=0.9",
})

# First request sets cookies
response = session.get("https://httpbin.org/cookies/set/session_id/abc123")
print(f"Cookies after first request: {dict(session.cookies)}")

# Subsequent requests send cookies automatically
response = session.get("https://httpbin.org/cookies")
print(f"Server sees cookies: {response.json()['cookies']}")

Output:

Cookies after first request: {'session_id': 'abc123'}
Server sees cookies: {'session_id': 'abc123'}

Always use requests.Session() instead of bare requests.get() for multi-page scraping. It handles cookies, connection pooling, and header persistence automatically.

Respect robots.txt. Rotate user agents. Throttle. Be a good citizen.
Respect robots.txt. Rotate user agents. Throttle. Be a good citizen.

Real-Life Example: Building a Resilient Scraper

# resilient_scraper.py
import requests
import time
import random
from urllib.robotparser import RobotFileParser
from bs4 import BeautifulSoup

class ResilientScraper:
    def __init__(self, base_url, min_delay=1.0, max_delay=3.0):
        self.base_url = base_url
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9",
        })
        self.robot_parser = RobotFileParser()
        self.robot_parser.set_url(f"{base_url}/robots.txt")
        self.robot_parser.read()

    def can_scrape(self, path):
        return self.robot_parser.can_fetch("*", path)

    def get_page(self, path, max_retries=3):
        if not self.can_scrape(path):
            print(f"Blocked by robots.txt: {path}")
            return None
        url = f"{self.base_url}{path}"
        for attempt in range(max_retries):
            delay = random.uniform(self.min_delay, self.max_delay)
            time.sleep(delay)
            try:
                response = self.session.get(url, timeout=10)
                if response.status_code == 200:
                    return BeautifulSoup(response.text, "html.parser")
                elif response.status_code == 429:
                    wait = 2 ** attempt
                    print(f"Rate limited on {path}. Waiting {wait}s...")
                    time.sleep(wait)
            except Exception as e:
                print(f"Error on {path}: {e}")
        return None

scraper = ResilientScraper("https://quotes.toscrape.com")
all_quotes = []
for page_num in range(1, 4):
    soup = scraper.get_page(f"/page/{page_num}/")
    if soup:
        quotes = soup.find_all("div", class_="quote")
        for q in quotes:
            text = q.find("span", class_="text")
            author = q.find("small", class_="author")
            if text and author:
                all_quotes.append({"text": text.text, "author": author.text})
        print(f"Page {page_num}: {len(quotes)} quotes")

print(f"Total scraped: {len(all_quotes)} quotes")

Output:

Page 1: 10 quotes
Page 2: 10 quotes
Page 3: 10 quotes
Total scraped: 30 quotes

This ResilientScraper class combines every technique from this tutorial: robots.txt checking, session management, random delays, retry logic with exponential backoff, and defensive parsing.

Frequently Asked Questions

Is it ethical to bypass anti-scraping measures?

Adding headers and managing timing are standard practices that mimic normal browser behavior. However, bypassing CAPTCHAs, breaking authentication, or ignoring robots.txt crosses ethical lines. Always respect the website’s terms of service and only scrape publicly available data.

Should I use proxy rotation?

Proxy rotation can help distribute requests across multiple IPs, but it should not be used to circumvent explicit blocking. If a website is actively blocking you, that is a signal to stop rather than escalate. Proxies are more appropriate for geographic content access.

What does HTTP 429 mean?

HTTP 429 means “Too Many Requests.” The server is rate limiting you. The correct response is to slow down with exponential backoff, not to try harder. Check the Retry-After header if present — it tells you exactly how long to wait.

Do I need to rotate User-Agent strings?

For most scraping projects, a single realistic User-Agent string is sufficient. Rotating User-Agents is only necessary for high-volume scraping where a single agent string might get flagged. The more important factor is request timing.

Web scraping laws vary by jurisdiction. In the US, scraping publicly available data is generally legal, but violating a website’s terms of service could have legal consequences. In the EU, GDPR adds restrictions around personal data. When in doubt, consult a legal professional.

Conclusion

Handling anti-scraping measures responsibly is about being a good citizen of the web. We covered checking robots.txt, adding proper headers, managing request timing, implementing retry logic, session management, and building a resilient scraper class that combines all these techniques.

The ResilientScraper class gives you a production-ready foundation. Extend it with logging, database storage, or email alerts for long-running scraping projects.

How To Use Playwright for Web Scraping in Python

How To Use Playwright for Web Scraping in Python

Intermediate

Some websites load their content dynamically with JavaScript, which means a simple HTTP request with requests only gets you an empty shell. Playwright solves this by controlling a real browser — Chromium, Firefox, or WebKit — letting you scrape pages that rely on JavaScript rendering, single-page applications, and content loaded behind user interactions.

Microsoft’s Playwright library for Python provides a clean async and sync API for browser automation. It installs its own browser binaries, handles waits automatically, and runs headless by default. Combined with BeautifulSoup for HTML parsing, it gives you the power to scrape virtually any website.

In this tutorial, you will learn how to install Playwright, launch a browser, navigate to pages, wait for dynamic content, extract data from JavaScript-rendered pages, handle clicks and scrolling, and build a complete scraper for dynamic websites.

Scraping a Dynamic Page: Quick Example

Here is a minimal example that scrapes quotes from a JavaScript-rendered page that returns empty HTML to regular HTTP requests.

# quick_playwright.py
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch(headless=True)
    page = browser.new_page()
    page.goto("https://quotes.toscrape.com/js/")
    page.wait_for_selector("div.quote")

    quotes = page.query_selector_all("div.quote span.text")
    for quote in quotes[:5]:
        print(quote.text_content())
    browser.close()

Output:

"The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking."
"It is our choices, Harry, that show what we truly are, far more than our abilities."
"There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle."
"The person, be it gentleman or lady, who has not pleasure in a good novel, must be intolerably stupid."
"Imperfection is beauty, madness is genius and it's better to be absolutely ridiculous than absolutely boring."

The key difference from BeautifulSoup is that Playwright actually executes the page’s JavaScript before you extract data. The wait_for_selector() call ensures the dynamic content has loaded before scraping.

What Is Playwright and When Do You Need It?

Playwright is a browser automation library that controls real browsers programmatically. Unlike requests + BeautifulSoup which only works with static HTML, Playwright can handle any page a human can see in a browser — including single-page applications built with React, Vue, or Angular.

Feature requests + BeautifulSoup Playwright
JavaScript rendering No Yes
Speed Very fast Slower (browser overhead)
Memory usage Low Higher
Login/cookies Manual Automatic
Click/scroll No Yes
Best for Static HTML pages Dynamic/JS-heavy pages

Use Playwright when the page you need to scrape loads content with JavaScript. If the page works with JavaScript disabled, stick with requests and BeautifulSoup — it will be much faster.

Playwright: Selenium's modern younger sibling. Faster, better.
Playwright: Selenium’s modern younger sibling. Faster, better.

Installing Playwright for Python

Playwright requires two installation steps: the Python package and the browser binaries.

# install_playwright.sh
pip install playwright
playwright install chromium

Output:

Successfully installed playwright-1.42.0
Downloading Chromium 123.0.6312.4 - 140.2 Mb
Chromium downloaded to /home/user/.cache/ms-playwright/

The playwright install chromium command downloads a specific Chromium build. You can also install firefox or webkit if you need cross-browser testing.

Sync vs Async API

Playwright offers both synchronous and asynchronous APIs. The sync API is simpler for scripts and scraping. The async API is better for high-concurrency applications.

# sync_example.py
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch()
    page = browser.new_page()
    page.goto("https://quotes.toscrape.com/js/")
    title = page.title()
    print(f"Page title: {title}")
    browser.close()

Output:

Page title: Quotes to Scrape
# async_example.py
import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto("https://quotes.toscrape.com/js/")
        title = await page.title()
        print(f"Page title: {title}")
        await browser.close()

asyncio.run(main())

Output:

Page title: Quotes to Scrape

For most scraping projects, the sync API is perfectly fine. Use async only when you need to scrape multiple pages concurrently.

Headless Chrome, Firefox, WebKit. One API, three browsers.
Headless Chrome, Firefox, WebKit. One API, three browsers.

Waiting for Dynamic Content

The most common mistake with browser-based scraping is trying to extract data before it has loaded. Playwright provides several waiting strategies to handle this.

# waiting_strategies.py
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch()
    page = browser.new_page()
    page.goto("https://quotes.toscrape.com/js/")

    # Wait for a specific element to appear
    page.wait_for_selector("div.quote", timeout=10000)

    # Wait for network to be idle (all AJAX calls done)
    page.wait_for_load_state("networkidle")

    quotes = page.query_selector_all("div.quote")
    print(f"Found {len(quotes)} quotes after waiting")
    browser.close()

Output:

Found 10 quotes after waiting

The wait_for_selector() method pauses execution until the element appears in the DOM. The timeout parameter (in milliseconds) prevents infinite waiting if the element never appears.

Extracting Data from the Page

Playwright gives you two approaches for extracting data: using Playwright’s built-in selectors, or passing the rendered HTML to BeautifulSoup.

# extract_with_playwright.py
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup

with sync_playwright() as p:
    browser = p.chromium.launch()
    page = browser.new_page()
    page.goto("https://quotes.toscrape.com/js/")
    page.wait_for_selector("div.quote")

    # Method 1: Playwright selectors
    first_quote = page.query_selector("span.text")
    print("Playwright:", first_quote.text_content()[:60] + "...")

    # Method 2: Pass HTML to BeautifulSoup
    html = page.content()
    soup = BeautifulSoup(html, "html.parser")
    bs_quote = soup.select_one("span.text")
    print("BeautifulSoup:", bs_quote.text[:60] + "...")

    browser.close()

Output:

Playwright: "The world as we have created it is a process of our thinking...
BeautifulSoup: "The world as we have created it is a process of our thinking...

The BeautifulSoup approach is often better for complex parsing because BeautifulSoup has richer navigation methods. Use page.content() to get the fully rendered HTML and parse it with BeautifulSoup.

Auto-waiting selectors: no more sleep(5) hacks.
Auto-waiting selectors: no more sleep(5) hacks.

Real-Life Example: Scraping a JavaScript-Rendered Quote Page

# js_quote_scraper.py
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import json

def scrape_js_quotes():
    all_quotes = []
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()

        for page_num in range(1, 11):
            url = f"https://quotes.toscrape.com/js/page/{page_num}/"
            page.goto(url)
            page.wait_for_selector("div.quote", timeout=5000)

            html = page.content()
            soup = BeautifulSoup(html, "html.parser")

            for quote_div in soup.find_all("div", class_="quote"):
                text = quote_div.find("span", class_="text")
                author = quote_div.find("small", class_="author")
                tags_div = quote_div.find("div", class_="tags")
                tags = [t.text for t in tags_div.find_all("a")] if tags_div else []

                all_quotes.append({
                    "text": text.text if text else "Unknown",
                    "author": author.text if author else "Unknown",
                    "tags": tags
                })
            print(f"Page {page_num}: {len(soup.find_all('div', class_='quote'))} quotes")
        browser.close()
    return all_quotes

quotes = scrape_js_quotes()
print(f"Total: {len(quotes)} quotes from JS-rendered pages")
with open("js_quotes.json", "w", encoding="utf-8") as f:
    json.dump(quotes, f, indent=2, ensure_ascii=False)
print("Saved to js_quotes.json")

Output:

Page 1: 10 quotes
Page 2: 10 quotes
...
Page 10: 10 quotes
Total: 100 quotes from JS-rendered pages
Saved to js_quotes.json

This scraper combines Playwright for rendering with BeautifulSoup for parsing. Reusing the same browser instance across pages is significantly faster than launching a new browser for each page.

Frequently Asked Questions

What does headless mode mean?

Headless mode means the browser runs without a visible window. This is the default and preferred mode for scraping because it uses less memory and runs faster. Set headless=False during development to see what the browser is doing.

How does Playwright compare to Selenium?

Playwright is newer and generally faster than Selenium. It has better auto-waiting, built-in support for multiple browser contexts, and a cleaner API. Selenium has a larger community and longer track record. For new scraping projects, Playwright is the recommended choice.

Can websites detect Playwright?

Yes, some websites use bot detection that can identify automated browsers. Playwright provides stealth options, but sophisticated anti-bot systems can still detect automation. Always respect website terms of service and consider whether scraping a particular site is appropriate.

How can I make Playwright scraping faster?

Disable images and CSS loading with route interception to speed up page loads. Reuse browser instances across multiple pages. Use networkidle wait state only when necessary. For multiple pages, use async mode with concurrent page objects.

Does Playwright use a lot of memory?

Yes, each browser instance uses 50-200MB of RAM. For large scraping jobs, process pages sequentially and close browser contexts when done. Monitor memory usage and restart the browser periodically for long-running scrapers.

Conclusion

Playwright opens up scraping possibilities that static HTTP libraries cannot touch. We covered installation, sync and async APIs, waiting for dynamic content, extracting data with both Playwright selectors and BeautifulSoup, and building a complete scraper for JavaScript-rendered pages.

For the full Playwright Python documentation, visit playwright.dev/python.

Playwright Setup

Playwright installs the browser binaries alongside the Python package — no separate driver dance like Selenium. One install command brings Chrome, Firefox, and WebKit:

# pip install playwright
# python -m playwright install   (downloads browser binaries)

from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch(headless=True)
    page = browser.new_page()
    page.goto("https://example.com")
    print(page.title())
    browser.close()

The sync_playwright() context manager handles the entire browser-process lifecycle. Use browser.close() in production code to free resources promptly.

Async API for Concurrent Scraping

Playwright’s killer feature for scrapers: native async. Crawl 100 URLs concurrently without spinning up 100 browser processes:

import asyncio
from playwright.async_api import async_playwright

async def fetch_title(url, page):
    await page.goto(url, wait_until="networkidle")
    return url, await page.title()

async def main(urls):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()
        # One page per concurrent task
        results = await asyncio.gather(*[
            fetch_title(url, await context.new_page())
            for url in urls
        ])
        await browser.close()
        for url, title in results:
            print(url, "->", title)

asyncio.run(main([
    "https://example.com",
    "https://python.org",
    "https://github.com",
]))

The shared context reuses cookies, local storage, and authentication state across all tabs — efficient when scraping a logged-in site.

Auto-Waiting Selectors

Playwright’s selectors auto-wait for the target to be ready before acting. No more WebDriverWait + EC.presence_of_element_located:

page.goto("https://shop.example.com")

# Click — waits for element to be visible AND enabled
page.click("button#add-to-cart")

# Fill — waits for the input to be present
page.fill("input[name='email']", "user@example.com")

# Select option — waits for dropdown
page.select_option("select#country", "US")

# Wait for navigation after click
with page.expect_navigation():
    page.click("a.checkout")

# Wait for a specific selector to appear
page.wait_for_selector("text=Order confirmed", timeout=10000)

The implicit waits are why Playwright tests are dramatically less flaky than Selenium tests. No time.sleep(2) hacks.

Locator API and Best Practices

For complex pages, use the locator() API instead of raw selectors. Locators are lazy and re-resolve on each action — perfect for SPAs that re-render constantly:

# Reusable locator
cart_button = page.locator("button#add-to-cart")
cart_button.click()
cart_button.wait_for(state="hidden")  # waits until removed from DOM

# Chained locators (parent -> child)
form = page.locator("form#checkout")
form.locator("input[name='email']").fill("alice@example.com")
form.locator("input[name='zip']").fill("90210")
form.locator("button[type='submit']").click()

# Text-based selector — robust against class-name changes
page.get_by_text("Sign in").click()
page.get_by_role("button", name="Submit").click()
page.get_by_label("Email address").fill("alice@example.com")

The get_by_role and get_by_label helpers are accessibility-first — they target the same semantic elements that screen readers find, making them resilient to CSS / class changes.

Network Interception

Playwright lets you intercept and modify network requests — useful for blocking ads, mocking APIs, or extracting JSON responses:

# Block all images (faster scraping)
page.route("**/*.{png,jpg,jpeg,gif,svg}", lambda route: route.abort())

# Mock an API response
def handle(route):
    route.fulfill(json={"status": "ok", "data": [1, 2, 3]})

page.route("**/api/users", handle)

# Capture all XHR responses
page.on("response", lambda resp: print(resp.url, resp.status) if "api" in resp.url else None)

Common Pitfalls

  • Forgetting to install browsers. pip install playwright doesn’t bring the browser binaries. Run python -m playwright install once after install.
  • Using time.sleep instead of wait_for. Sleeps make tests flaky and slow. page.wait_for_selector, page.wait_for_load_state, and locator’s implicit waits handle every case.
  • One context per page when you should share. Each context costs memory. For 100 pages of the same site, use one context and 100 pages.
  • Ignoring page.on(“dialog”). JavaScript alert() / confirm() dialogs block Playwright forever unless you register a handler that dismisses them.
  • Headless != real. Some sites detect headless Chrome by missing browser features. Use headless=False for debugging, then test in headless mode separately.

FAQ

Q: Playwright or Selenium?
A: Playwright for new projects — faster, less flaky, better API. Selenium when you need broader browser support (Edge, IE legacy) or you have existing Selenium tests.

Q: Sync or async API?
A: Async for scrapers that hit many URLs concurrently. Sync for test suites that already use synchronous frameworks (pytest, unittest). The APIs are nearly identical; switching later is easy.

Q: Does Playwright handle bot protection?
A: Better than Selenium, but not perfectly. For Cloudflare / DataDome, use the playwright-stealth plugin and rotate proxies. Some sites you simply can’t scrape reliably without residential IPs.

Q: How do I take a screenshot for debugging?
A: page.screenshot(path="screenshot.png", full_page=True). For video recording, set context = browser.new_context(record_video_dir="videos/").

Q: Can I reuse browser state across runs?
A: Yes — save context state with context.storage_state(path="state.json") and load with browser.new_context(storage_state="state.json"). Cookies and local storage persist between sessions.

Wrapping Up

Playwright fixes nearly every Selenium pain point — driver dance, flaky waits, slow startup, awkward async. For new web-scraping or browser-automation projects in Python, it’s the right default. The locator API + auto-waiting + native async covers 95% of use cases. The remaining 5% (Cloudflare bypass, mobile emulation, headless detection) needs specialty tools regardless of framework.

How To Scrape Websites with Python and BeautifulSoup

How To Scrape Websites with Python and BeautifulSoup

Intermediate

You need data from a website, but there is no API available. Maybe you want to track product prices, collect research data, or aggregate job listings from multiple sources. Web scraping with Python and BeautifulSoup lets you extract structured data from HTML pages quickly and reliably — and in 2026, the fundamentals remain as relevant as ever.

Python’s requests library handles HTTP connections while BeautifulSoup (from the bs4 package) parses the HTML and lets you navigate the document tree with simple, readable methods. Both are pure Python, install in seconds with pip, and work on every platform.

In this tutorial, you will learn how to fetch web pages, parse HTML with BeautifulSoup, extract text and attributes, work with tables, handle multiple pages, and build a complete scraping project. By the end, you will have a reusable scraping toolkit you can adapt to any static website.

Scraping a Web Page: Quick Example

Let us start with a minimal working example that fetches quotes from a practice website and prints them out.

# quick_scrape.py
import requests
from bs4 import BeautifulSoup

url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")

quotes = soup.find_all("span", class_="text")
for quote in quotes[:5]:
    print(quote.text)

Output:

"The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking."
"It is our choices, Harry, that show what we truly are, far more than our abilities."
"There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle."
"The person, be it gentleman or lady, who has not pleasure in a good novel, must be intolerably stupid."
"Imperfection is beauty, madness is genius and it's better to be absolutely ridiculous than absolutely boring."

Three lines do the heavy lifting: requests.get() fetches the page, BeautifulSoup() parses the HTML, and find_all() extracts matching elements. The rest of this tutorial builds on these patterns.

What Is BeautifulSoup and Why Use It?

BeautifulSoup is a Python library that parses HTML and XML documents into a tree structure you can search and navigate with Python code. Think of it like a smart search tool for web pages — instead of working with raw text strings, you work with structured elements that know about their tags, attributes, parents, and children.

The library handles malformed HTML gracefully, which matters because real-world web pages are often messy. Missing closing tags, inconsistent indentation, and mixed encoding are all things BeautifulSoup handles without crashing.

Feature BeautifulSoup Regular Expressions lxml
Learning curve Low High Medium
Handles broken HTML Yes No Partially
Speed Moderate Fast Very fast
CSS selectors Yes No Yes
Best for Most scraping tasks Simple patterns Large documents

For most web scraping projects, BeautifulSoup with the html.parser backend gives you the best balance of simplicity and reliability.

Finding Elements: find() and find_all()

The two methods you will use most are find() (returns the first match) and find_all() (returns all matches). Both accept tag names, CSS classes, IDs, and attribute dictionaries.

# finding_elements.py
import requests
from bs4 import BeautifulSoup

url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")

first_quote = soup.find("span", class_="text")
print("First quote:", first_quote.text[:60] + "...")

authors = soup.find_all("small", class_="author")
print("Authors on page:", [a.text for a in authors[:5]])

tags_div = soup.find("div", attrs={"class": "tags"})
if tags_div:
    tag_links = tags_div.find_all("a", class_="tag")
    print("Tags:", [t.text for t in tag_links])

Output:

First quote: "The world as we have created it is a process of our...
Authors on page: ['Albert Einstein', 'J.K. Rowling', 'Albert Einstein', 'Jane Austen', 'Marilyn Monroe']
Tags: ['change', 'deep-thoughts', 'thinking', 'world']

Notice the defensive pattern with if tags_div: before calling find_all(). Real websites are messy — elements might be missing, classes might change, or content might be empty. Defensive parsing separates a scraper that crashes on page 3 from one that runs reliably across thousands of pages.

Using CSS Selectors with select()

If you already know CSS, the select() method lets you use CSS selector syntax to find elements. This is often more concise than chaining find() calls.

# css_selectors.py
import requests
from bs4 import BeautifulSoup

url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")

quotes = soup.select("div.quote span.text")
print(f"Found {len(quotes)} quotes using CSS selector")

author_links = soup.select("div.quote span a")
print("Author pages:", [a["href"] for a in author_links[:3]])

first_tag = soup.select_one("a.tag")
print("First tag:", first_tag.text if first_tag else "None found")

Output:

Found 10 quotes using CSS selector
Author pages: ['/author/Albert-Einstein', '/author/J-K-Rowling', '/author/Albert-Einstein']
First tag: change

The select() method returns a list while select_one() returns the first match or None. CSS selectors are particularly useful when elements are deeply nested.

Extracting Text, Attributes, and Links

Once you have found an element, you need to extract useful data from it. BeautifulSoup gives you .text for visible text content, .get() for attributes, and .attrs for the full attribute dictionary.

# extracting_data.py
import requests
from bs4 import BeautifulSoup

url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")

quote_div = soup.find("div", class_="quote")
if quote_div:
    quote_text = quote_div.find("span", class_="text")
    print("Quote:", quote_text.text if quote_text else "Unknown")

    author_link = quote_div.find("a")
    href = author_link.get("href", "#") if author_link else "#"
    print("Author page:", href)

    tags_container = quote_div.find("div", class_="tags")
    if tags_container:
        tags = [tag.text for tag in tags_container.find_all("a")]
        print("Tags:", tags)

Output:

Quote: "The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking."
Author page: /author/Albert-Einstein
Tags: ['change', 'deep-thoughts', 'thinking', 'world']

The .get() method with a default value is safer than direct dictionary access like element["href"] — it returns your default instead of raising a KeyError if the attribute is missing.

Handling Pagination

Most websites split content across multiple pages. To scrape all pages, find the “next page” link and follow it until there are no more pages.

# pagination.py
import requests
from bs4 import BeautifulSoup
import time

base_url = "https://quotes.toscrape.com"
all_quotes = []
url = base_url + "/page/1/"

while url:
    print(f"Scraping: {url}")
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "html.parser")

    for quote_div in soup.find_all("div", class_="quote"):
        text = quote_div.find("span", class_="text")
        author = quote_div.find("small", class_="author")
        if text and author:
            all_quotes.append({"text": text.text, "author": author.text})

    next_btn = soup.find("li", class_="next")
    if next_btn:
        next_link = next_btn.find("a")
        url = base_url + next_link["href"] if next_link else None
    else:
        url = None
    time.sleep(1)

print(f"Total quotes scraped: {len(all_quotes)}")
print(f"Sample: {all_quotes[0]['author']}: {all_quotes[0]['text'][:50]}...")

Output:

Scraping: https://quotes.toscrape.com/page/1/
Scraping: https://quotes.toscrape.com/page/2/
...
Scraping: https://quotes.toscrape.com/page/10/
Total quotes scraped: 100
Sample: Albert Einstein: "The world as we have created it is a process of o...

The time.sleep(1) between requests is essential. Without it, you risk overwhelming the server and getting your IP blocked.

Saving Scraped Data to CSV and JSON

Once you have extracted data, you typically want to save it for analysis. Python’s built-in csv and json modules handle this without extra dependencies.

# save_data.py
import csv
import json

quotes = [
    {"text": "Be yourself; everyone else is already taken.", "author": "Oscar Wilde"},
    {"text": "Two things are infinite: the universe and human stupidity.", "author": "Albert Einstein"},
    {"text": "Be the change that you wish to see in the world.", "author": "Mahatma Gandhi"},
]

with open("quotes.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["text", "author"])
    writer.writeheader()
    writer.writerows(quotes)
print("Saved to quotes.csv")

with open("quotes.json", "w", encoding="utf-8") as f:
    json.dump(quotes, f, indent=2, ensure_ascii=False)
print("Saved to quotes.json")

Output:

Saved to quotes.csv
Saved to quotes.json

Always specify encoding="utf-8" when opening files for scraped data. Web content often contains special characters that will cause errors with the default system encoding.

Real-Life Example: Building a Job Listing Scraper

Let us build a complete scraper that extracts job listings from the Fake Jobs practice site, processes the data, and saves it as both CSV and JSON.

# job_scraper.py
import requests
from bs4 import BeautifulSoup
import csv
import json

def scrape_jobs(url):
    response = requests.get(url)
    response.raise_for_status()
    soup = BeautifulSoup(response.text, "html.parser")
    jobs = []
    cards = soup.find_all("div", class_="card-content")

    for card in cards:
        title_elem = card.find("h2", class_="title")
        company_elem = card.find("h3", class_="company")
        location_elem = card.find("p", class_="location")
        date_elem = card.find("time")

        job = {
            "title": title_elem.text.strip() if title_elem else "Unknown",
            "company": company_elem.text.strip() if company_elem else "Unknown",
            "location": location_elem.text.strip() if location_elem else "Unknown",
            "date_posted": date_elem.text.strip() if date_elem else "Unknown",
        }
        apply_link = card.find("a", string="Apply")
        job["apply_url"] = apply_link["href"] if apply_link else "N/A"
        jobs.append(job)
    return jobs

def save_results(jobs, csv_path, json_path):
    with open(csv_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=jobs[0].keys())
        writer.writeheader()
        writer.writerows(jobs)
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(jobs, f, indent=2, ensure_ascii=False)

url = "https://realpython.github.io/fake-jobs/"
jobs = scrape_jobs(url)
print(f"Scraped {len(jobs)} job listings")
save_results(jobs, "jobs.csv", "jobs.json")
print("Data saved to jobs.csv and jobs.json")

Output:

Scraped 100 job listings
Data saved to jobs.csv and jobs.json

This scraper uses defensive parsing throughout — every find() result is checked before accessing .text, and default values handle missing elements. You can extend this by adding pagination, filtering by location, or scheduling it to run daily.

How To Use Python Itertools for Efficient Looping

How To Use Python Itertools for Efficient Looping

Intermediate

Python’s itertools module is a powerhouse for creating efficient iterators that save memory and speed up your data processing. Instead of building entire lists in memory, itertools generates values on the fly, making it perfect for working with large datasets or creating complex iteration patterns.

If you have ever needed to combine multiple lists, group data by a key, or generate all possible permutations of a set, itertools has a clean, optimized solution ready to go. These tools are implemented in C under the hood, so they run significantly faster than equivalent pure Python code.

In this tutorial, you will learn the most practical itertools functions through real examples. We will cover infinite iterators, combinatoric generators, data grouping, chaining, and filtering — everything you need to write more Pythonic and memory-efficient code.

Quick Answer

The itertools module provides fast, memory-efficient iterator building blocks. Key functions include chain() for combining iterables, groupby() for grouping data, product() for cartesian products, combinations() and permutations() for combinatorics, and islice() for slicing iterators. Import with from itertools import chain, groupby, product, combinations.

Quick Example

from itertools import chain, islice, count

# Chain multiple iterables together seamlessly
combined = chain([1, 2, 3], ['a', 'b'], [True, False])
print(list(combined))

# Take the first 5 even numbers from an infinite counter
evens = (x for x in count(0, 2))
first_five = list(islice(evens, 5))
print(first_five)
[1, 2, 3, ‘a’, ‘b’, True, False]
[0, 2, 4, 6, 8]

The chain() function combines three separate iterables into one seamless stream without creating a new list in memory. The islice() function safely takes a slice from an infinite iterator, something you cannot do with regular list slicing.

What Is the Itertools Module?

The itertools module is part of Python’s standard library and provides a collection of fast, memory-efficient tools for creating and working with iterators. The module is inspired by constructs from functional programming languages like APL, Haskell, and SML.

The key advantage of itertools is lazy evaluation. Instead of building a complete list in memory, each function produces values one at a time as they are requested. This means you can work with datasets much larger than your available RAM, process infinite sequences, and build complex data pipelines that remain efficient.

The functions in itertools fall into three categories: infinite iterators that produce values forever, finite iterators that process input sequences, and combinatoric generators that produce arrangements of elements. All of them are implemented in C for maximum performance.

itertools.chain, cycle, repeat. The infinite generators you didn't know you need
itertools.chain, cycle, repeat. The infinite generators you didn’t know you needed.

Infinite Iterators

count — Infinite Counter

The count() function generates an endless sequence of numbers starting from a given value with a specified step. You must always use it with something that limits the output, like islice() or a break in a loop.

from itertools import count, islice

# Count from 10 with step 5
counter = count(10, 5)
print(list(islice(counter, 6)))

# Useful for generating IDs
def id_generator(prefix="ID"):
    for num in count(1):
        yield f"{prefix}-{num:04d}"

ids = id_generator("ORD")
print([next(ids) for _ in range(4)])
[10, 15, 20, 25, 30, 35]
[‘ORD-0001’, ‘ORD-0002’, ‘ORD-0003’, ‘ORD-0004’]

cycle — Repeat a Sequence Forever

The cycle() function takes an iterable and repeats it infinitely. This is perfect for round-robin scheduling, alternating patterns, or rotating through a fixed set of options.

from itertools import cycle, islice

# Alternate between teams for task assignment
teams = cycle(["Alpha", "Beta", "Gamma"])
tasks = ["Deploy v2.1", "Fix login bug", "Update docs",
         "Database migration", "API refactor", "Write tests", "Code review"]

assignments = {task: next(teams) for task in tasks}
for task, team in assignments.items():
    print(f"  {team}: {task}")
Alpha: Deploy v2.1
Beta: Fix login bug
Gamma: Update docs
Alpha: Database migration
Beta: API refactor
Gamma: Write tests
Alpha: Code review

repeat — Repeat a Value

The repeat() function produces the same value over and over, either infinitely or a specified number of times. It is commonly used with map() or zip() to provide a constant value alongside changing data.

from itertools import repeat

# Create a list of default configurations
defaults = list(repeat({"enabled": True, "retries": 3}, 4))
print(defaults)

# Use with map for element-wise operations
import operator
bases = [2, 3, 4, 5]
squared = list(map(operator.pow, bases, repeat(2)))
print(f"Squared: {squared}")
cubed = list(map(operator.pow, bases, repeat(3)))
print(f"Cubed: {cubed}")
[{‘enabled’: True, ‘retries’: 3}, {‘enabled’: True, ‘retries’: 3}, {‘enabled’: True, ‘retries’: 3}, {‘enabled’: True, ‘retries’: 3}]
Squared: [4, 9, 16, 25]
Cubed: [8, 27, 64, 125]

Finite Iterators

chain — Combine Multiple Iterables

The chain() function links multiple iterables together into a single continuous stream. It processes the first iterable completely, then moves to the second, and so on — all without creating an intermediate list.

from itertools import chain

# Merge data from multiple sources
database_users = ["alice", "bob"]
api_users = ["charlie", "diana"]
file_users = ["eve"]

all_users = list(chain(database_users, api_users, file_users))
print(f"All users: {all_users}")

# chain.from_iterable flattens a list of lists
nested_data = [["python", "java"], ["rust", "go"], ["ruby"]]
flat = list(chain.from_iterable(nested_data))
print(f"Flattened: {flat}")
All users: [‘alice’, ‘bob’, ‘charlie’, ‘diana’, ‘eve’]
Flattened: [‘python’, ‘java’, ‘rust’, ‘go’, ‘ruby’]

groupby — Group Consecutive Elements

The groupby() function groups consecutive elements that share the same key. The data must be sorted by the grouping key first, or you will get unexpected results.

from itertools import groupby
from operator import itemgetter

# Group transactions by category
transactions = [
    {"category": "food", "amount": 25.50},
    {"category": "food", "amount": 12.00},
    {"category": "transport", "amount": 35.00},
    {"category": "transport", "amount": 15.50},
    {"category": "entertainment", "amount": 45.00},
    {"category": "food", "amount": 8.75},
]

# Sort first, then group
sorted_trans = sorted(transactions, key=itemgetter("category"))
for category, group in groupby(sorted_trans, key=itemgetter("category")):
    items = list(group)
    total = sum(t["amount"] for t in items)
    print(f"  {category}: {len(items)} transactions, ${total:.2f}")
entertainment: 1 transactions, $45.00
food: 3 transactions, $46.25
transport: 2 transactions, $50.50
Important: The data must be sorted by the same key you pass to groupby. If unsorted, groupby will create a new group every time the key changes, resulting in multiple groups for the same key value.

islice — Slice Any Iterator

The islice() function works like regular list slicing but on any iterator, including infinite ones and generators. Unlike list slicing, it does not support negative indices because iterators cannot go backwards.

from itertools import islice

# Slice a generator (can't use regular slicing)
def fibonacci():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

# Get fibonacci numbers 10 through 15
fib_slice = list(islice(fibonacci(), 10, 16))
print(f"Fibonacci 10-15: {fib_slice}")

# Read first 3 lines from a large dataset (simulated)
data_lines = (f"Row {i}: data_{i}" for i in range(1000000))
preview = list(islice(data_lines, 3))
print(f"Preview: {preview}")
Fibonacci 10-15: [55, 89, 144, 233, 377, 610]
Preview: [‘Row 0: data_0’, ‘Row 1: data_1’, ‘Row 2: data_2’]

Lazy iteration. Memory stays constant, time stays linear.
Lazy iteration. Memory stays constant, time stays linear.

Combinatoric Iterators

product — Cartesian Product

The product() function computes the cartesian product of input iterables, equivalent to nested for loops. This is perfect for generating all combinations of options.

from itertools import product

# Generate all t-shirt variants
sizes = ["S", "M", "L"]
colors = ["Red", "Blue"]
styles = ["V-neck", "Crew"]

variants = list(product(sizes, colors, styles))
print(f"Total variants: {len(variants)}")
for v in variants[:6]:
    print(f"  {v[0]} {v[1]} {v[2]}")

# Generate grid coordinates
grid = list(product(range(3), range(3)))
print(f"\n3x3 grid: {grid}")
Total variants: 12
S Red V-neck
S Red Crew
S Blue V-neck
S Blue Crew
M Red V-neck
M Red Crew

3×3 grid: [(0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2), (2, 0), (2, 1), (2, 2)]

combinations and permutations

The combinations() function generates all unique groups of a given size where order does not matter. The permutations() function generates all arrangements where order does matter.

from itertools import combinations, permutations

# Pick 2-person teams from 4 candidates
candidates = ["Alice", "Bob", "Charlie", "Diana"]

teams = list(combinations(candidates, 2))
print(f"Possible teams ({len(teams)}):")
for team in teams:
    print(f"  {team[0]} & {team[1]}")

# Order matters for race finish positions
runners = ["A", "B", "C"]
finishes = list(permutations(runners))
print(f"\nPossible race finishes ({len(finishes)}):")
for f in finishes:
    print(f"  1st: {f[0]}, 2nd: {f[1]}, 3rd: {f[2]}")
Possible teams (6):
Alice & Bob
Alice & Charlie
Alice & Diana
Bob & Charlie
Bob & Diana
Charlie & Diana

Possible race finishes (6):
1st: A, 2nd: B, 3rd: C
1st: A, 2nd: C, 3rd: B
1st: B, 2nd: A, 3rd: C
1st: B, 2nd: C, 3rd: A
1st: C, 2nd: A, 3rd: B
1st: C, 2nd: B, 3rd: A

Real-Life Project: Building a Data Pipeline with Itertools

Let us build a practical data processing pipeline that uses multiple itertools functions to efficiently process log data. This pipeline chains, filters, groups, and summarizes data while keeping memory usage minimal.

from itertools import chain, groupby, islice, accumulate
from operator import itemgetter
from collections import Counter
import operator

# Simulated log data from multiple servers
server1_logs = [
    {"timestamp": "2024-01-15 10:01", "level": "INFO", "service": "auth", "message": "User login"},
    {"timestamp": "2024-01-15 10:02", "level": "ERROR", "service": "auth", "message": "Invalid token"},
    {"timestamp": "2024-01-15 10:05", "level": "INFO", "service": "api", "message": "Request processed"},
    {"timestamp": "2024-01-15 10:07", "level": "WARNING", "service": "db", "message": "Slow query"},
]

server2_logs = [
    {"timestamp": "2024-01-15 10:01", "level": "INFO", "service": "api", "message": "Health check"},
    {"timestamp": "2024-01-15 10:03", "level": "ERROR", "service": "api", "message": "Timeout"},
    {"timestamp": "2024-01-15 10:04", "level": "ERROR", "service": "db", "message": "Connection lost"},
    {"timestamp": "2024-01-15 10:06", "level": "INFO", "service": "auth", "message": "Token refreshed"},
]

# Step 1: Chain all logs together
all_logs = list(chain(server1_logs, server2_logs))
print(f"Total log entries: {len(all_logs)}")

# Step 2: Sort and group by service
sorted_by_service = sorted(all_logs, key=itemgetter("service"))
print("\nLogs by service:")
for service, logs in groupby(sorted_by_service, key=itemgetter("service")):
    log_list = list(logs)
    error_count = sum(1 for l in log_list if l["level"] == "ERROR")
    print(f"  {service}: {len(log_list)} entries ({error_count} errors)")

# Step 3: Sort and group by log level
sorted_by_level = sorted(all_logs, key=itemgetter("level"))
print("\nLogs by level:")
for level, logs in groupby(sorted_by_level, key=itemgetter("level")):
    count = len(list(logs))
    print(f"  {level}: {count}")

# Step 4: Running error count using accumulate
error_flags = [1 if log["level"] == "ERROR" else 0 for log in all_logs]
running_errors = list(accumulate(error_flags, operator.add))
print(f"\nRunning error count: {running_errors}")
print(f"Total errors: {running_errors[-1]}")

# Step 5: Get the latest 3 entries
latest = list(islice(sorted(all_logs, key=itemgetter("timestamp"), reverse=True), 3))
print("\nLatest 3 entries:")
for entry in latest:
    print(f"  [{entry['level']}] {entry['timestamp']} - {entry['service']}: {entry['message']}")
Total log entries: 8

Logs by service:
api: 3 entries (1 errors)
auth: 3 entries (1 errors)
db: 2 entries (1 errors)

Logs by level:
ERROR: 3
INFO: 4
WARNING: 1

Running error count: [0, 1, 1, 1, 1, 2, 3, 1]
Total errors: 3

Latest 3 entries:
[WARNING] 2024-01-15 10:07 – db: Slow query
[INFO] 2024-01-15 10:06 – auth: Token refreshed
[INFO] 2024-01-15 10:05 – api: Request processed

groupby: structure your iterator by neighboring keys.
groupby: structure your iterator by neighboring keys.

Common Pitfalls and Troubleshooting

Problem Cause Solution
groupby returns unexpected groups Data not sorted by grouping key Always sort by the same key before calling groupby
Iterator exhausted after first use Iterators can only be consumed once Convert to list if you need multiple passes, or use tee()
Memory error with product() Cartesian product of large sets creates huge output Use islice() to limit output, or process items one at a time
Infinite loop with count() or cycle() No termination condition Always pair infinite iterators with islice(), takewhile(), or break
accumulate gives wrong type Initial value type mismatch Pass an explicit initial value matching your expected type
combinations_with_replacement unexpected Confused with regular combinations Use combinations() for no repeats, combinations_with_replacement() when repeats are allowed

Frequently Asked Questions

What is the difference between itertools.chain and list concatenation?

chain() creates a lazy iterator that processes elements one at a time without creating a new list in memory. List concatenation with + creates an entirely new list containing all elements from both sources. For large datasets, chain is significantly more memory-efficient because it never holds more than one element at a time.

Can I use itertools with pandas DataFrames?

Yes, but with some caveats. Itertools functions work with any iterable, so you can use them on DataFrame columns, rows from iterrows(), or index values. However, pandas has its own optimized methods like groupby, merge, and concat that are usually faster for DataFrame operations. Use itertools when working with pure Python iterables or when pandas does not have an equivalent function.

How do I restart an exhausted iterator?

You cannot restart a consumed iterator. Instead, use itertools.tee() to create independent copies before consuming, store the data in a list if it fits in memory, or recreate the iterator from the original source. For generators, call the generator function again to get a fresh iterator.

When should I use product() versus nested for loops?

product() is cleaner and more readable than nested loops when you need all combinations of multiple iterables. It also makes it easy to dynamically change the number of nested dimensions. Use nested for loops when you need complex logic between iterations, early exits, or when only some combinations should be processed.

Is itertools faster than list comprehensions?

For simple operations, list comprehensions and itertools have similar speed. The main advantage of itertools is memory efficiency rather than raw speed. When processing millions of items, itertools avoids building large intermediate lists, which prevents memory issues and can actually be faster due to reduced memory allocation overhead. For small datasets, the difference is negligible.

Conclusion

The itertools module transforms how you handle iteration in Python. You learned how infinite iterators like count() and cycle() create endless streams, how chain() and groupby() organize data efficiently, how combinatoric tools like product() and combinations() generate arrangements, and how islice() safely limits output from any iterator.

The data pipeline example showed how these tools compose naturally to build efficient processing chains. Start by replacing list concatenation with chain() and nested loops with product() in your existing code — those two changes will immediately make your code more readable and memory-efficient. As you get comfortable, explore groupby() and accumulate() for more advanced data processing patterns.

Related Articles

How To Use Python Functools for Higher-Order Functions

How To Use Python Functools for Higher-Order Functions

Intermediate

Python’s functools module is a treasure chest of higher-order functions that transform how you write and compose functions. Whether you need to cache expensive computations, create partial function applications, or build powerful decorators, functools provides elegant solutions that make your code cleaner and faster.

If you have ever written the same wrapper logic around multiple functions, or wished you could “freeze” some arguments into a function call, functools is exactly what you need. It sits in the standard library, so there is nothing extra to install — just import and go.

In this tutorial, you will master the most practical functools tools with hands-on examples you can use in real projects today. We will cover caching, partial application, function composition, comparison helpers, and more.

Quick Answer

The functools module provides higher-order functions for working with callable objects. The most commonly used tools are @lru_cache for memoization, partial() for freezing function arguments, reduce() for cumulative operations, and @wraps for building proper decorators. Import with from functools import lru_cache, partial, reduce, wraps.

Quick Example

from functools import lru_cache

@lru_cache(maxsize=128)
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

print(fibonacci(50))
print(fibonacci.cache_info())
12586269025
CacheInfo(hits=48, misses=51, maxsize=128, currsize=51)

Without the cache, computing fibonacci(50) would take an impossibly long time due to exponential recursive calls. With @lru_cache, it returns instantly because each unique input is computed only once.

What Is the Functools Module?

The functools module is part of Python's standard library and provides functions that act on or return other functions. The name stands for "function tools," and every utility in the module helps you work with callables more effectively.

Higher-order functions are functions that take other functions as arguments or return functions as results. This is a core concept in functional programming, and functools brings these ideas into Python in a practical, Pythonic way. You do not need to adopt a fully functional style — you can sprinkle these tools into your existing object-oriented or procedural code wherever they help.

The module has been part of Python since version 2.5 and has gained powerful additions over the years. Python 3.8 added cached_property, and Python 3.9 improved cache as a simpler alias for unbounded LRU caching.

@lru_cache: when the same call returns the same result, remember it.
@lru_cache: when the same call returns the same result, remember it.

Core Functools Tools

lru_cache — Automatic Memoization

The @lru_cache decorator caches function results based on the arguments passed. LRU stands for "Least Recently Used," meaning when the cache reaches its maximum size, the oldest unused entries get evicted first.

from functools import lru_cache
import time

@lru_cache(maxsize=256)
def expensive_lookup(user_id):
    # Simulate a slow database query
    time.sleep(0.5)
    return {"id": user_id, "name": f"User_{user_id}", "active": True}

# First call takes 0.5 seconds
start = time.time()
result = expensive_lookup(42)
print(f"First call: {time.time() - start:.3f}s -> {result}")

# Second call is instant (cached)
start = time.time()
result = expensive_lookup(42)
print(f"Cached call: {time.time() - start:.6f}s -> {result}")

# Check cache statistics
print(expensive_lookup.cache_info())

# Clear the cache when needed
expensive_lookup.cache_clear()
First call: 0.501s -> {'id': 42, 'name': 'User_42', 'active': True}
Cached call: 0.000002s -> {'id': 42, 'name': 'User_42', 'active': True}
CacheInfo(hits=1, misses=1, maxsize=256, currsize=1)
Important: All arguments to a cached function must be hashable (strings, numbers, tuples). You cannot cache functions that accept lists or dictionaries as arguments. Convert them to tuples or frozensets first.

cache — Unbounded Memoization

Python 3.9 introduced @cache as a simpler alternative to @lru_cache(maxsize=None). It caches every unique call forever, which is perfect when you know the set of possible inputs is bounded.

from functools import cache

@cache
def factorial(n):
    if n <= 1:
        return 1
    return n * factorial(n - 1)

print(factorial(10))
print(factorial(20))  # Reuses cached results for 1-10
print(factorial.cache_info())
3628800
2432902008176640000
CacheInfo(hits=10, misses=20, maxsize=None, currsize=20)

partial — Freeze Function Arguments

The partial() function creates a new callable with some arguments pre-filled. This is incredibly useful when you need to pass a function somewhere that expects fewer arguments than your function takes.

from functools import partial

def power(base, exponent):
    return base ** exponent

# Create specialized functions
square = partial(power, exponent=2)
cube = partial(power, exponent=3)

print(square(5))
print(cube(4))

# Practical example: configuring a logger
def log_message(level, component, message):
    print(f"[{level}] {component}: {message}")

# Create component-specific loggers
auth_log = partial(log_message, component="AUTH")
db_log = partial(log_message, component="DATABASE")

auth_log("INFO", message="User logged in")
auth_log("WARNING", message="Failed login attempt")
db_log("ERROR", message="Connection timeout")
25
64
[INFO] AUTH: User logged in
[WARNING] AUTH: Failed login attempt
[ERROR] DATABASE: Connection timeout

reduce — Cumulative Operations

The reduce() function applies a two-argument function cumulatively to the items of a sequence, reducing it to a single value. It processes items left to right, carrying the result forward at each step.

from functools import reduce

# Sum of all numbers (same as built-in sum)
numbers = [1, 2, 3, 4, 5]
total = reduce(lambda a, b: a + b, numbers)
print(f"Sum: {total}")

# Find maximum value (same as built-in max)
largest = reduce(lambda a, b: a if a > b else b, numbers)
print(f"Max: {largest}")

# Flatten nested lists
nested = [[1, 2], [3, 4], [5, 6]]
flat = reduce(lambda a, b: a + b, nested)
print(f"Flattened: {flat}")

# Build a dictionary from pairs
pairs = [("name", "Alice"), ("age", 30), ("city", "Melbourne")]
result = reduce(lambda d, pair: {**d, pair[0]: pair[1]}, pairs, {})
print(f"Dict: {result}")
Sum: 15
Max: 5
Flattened: [1, 2, 3, 4, 5, 6]
Dict: {'name': 'Alice', 'age': 30, 'city': 'Melbourne'}
Tip: While reduce is powerful, Python's built-in functions like sum(), max(), min(), and any() are more readable for common cases. Use reduce when you need a custom accumulation pattern that does not have a built-in equivalent.

wraps — Build Proper Decorators

When you write a decorator, the wrapper function replaces the original function's metadata (name, docstring, signature). The @wraps decorator preserves this metadata, which is essential for debugging and documentation tools.

from functools import wraps
import time

def timing_decorator(func):
    @wraps(func)  # Preserves func's __name__, __doc__, etc.
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} took {elapsed:.4f}s")
        return result
    return wrapper

@timing_decorator
def process_data(items):
    """Process a list of items and return the total."""
    return sum(items)

# The function metadata is preserved
print(f"Name: {process_data.__name__}")
print(f"Doc: {process_data.__doc__}")
print(f"Result: {process_data(range(1000000))}")
Name: process_data
Doc: Process a list of items and return the total.
process_data took 0.0234s
Result: 499999500000

cached_property — One-Time Computed Attributes

The @cached_property decorator turns a method into a property that is computed once and then cached as a normal attribute. This is perfect for expensive calculations that do not change after the object is created.

from functools import cached_property
import statistics

class DataAnalysis:
    def __init__(self, data):
        self._data = list(data)

    @cached_property
    def mean(self):
        print("Computing mean...")
        return statistics.mean(self._data)

    @cached_property
    def std_dev(self):
        print("Computing standard deviation...")
        return statistics.stdev(self._data)

    @cached_property
    def summary(self):
        print("Building summary...")
        return {
            "count": len(self._data),
            "mean": self.mean,
            "std_dev": self.std_dev,
            "min": min(self._data),
            "max": max(self._data)
        }

analysis = DataAnalysis(range(1, 10001))
print(analysis.mean)      # Computes and caches
print(analysis.mean)      # Returns cached value (no "Computing..." message)
print(analysis.summary)   # Triggers mean cache hit, computes std_dev
Computing mean...
5000.5
5000.5
Building summary...
Computing standard deviation...
{'count': 10000, 'mean': 5000.5, 'std_dev': 2886.896, 'min': 1, 'max': 10000}

total_ordering — Complete Comparison Methods

The @total_ordering class decorator lets you define just __eq__ and one ordering method (__lt__, __le__, __gt__, or __ge__), and it fills in the rest automatically.

from functools import total_ordering

@total_ordering
class Version:
    def __init__(self, major, minor, patch):
        self.major = major
        self.minor = minor
        self.patch = patch

    def __eq__(self, other):
        return (self.major, self.minor, self.patch) == \
               (other.major, other.minor, other.patch)

    def __lt__(self, other):
        return (self.major, self.minor, self.patch) < \
               (other.major, other.minor, other.patch)

    def __repr__(self):
        return f"Version({self.major}.{self.minor}.{self.patch})"

versions = [Version(2, 1, 0), Version(1, 9, 5), Version(2, 0, 1), Version(1, 9, 5)]
print(sorted(versions))
print(Version(2, 0, 0) >= Version(1, 9, 9))
print(Version(1, 0, 0) <= Version(1, 0, 0))
[Version(1.9.5), Version(1.9.5), Version(2.0.1), Version(2.1.0)]
True
True

Real-Life Project: Building a Plugin System with Functools

Let us build a practical plugin system for a web application that uses several functools features together. This system registers handler functions, caches their results, and supports partial configuration.

from functools import wraps, partial, lru_cache, reduce
from collections import defaultdict
import time
import json

class PluginRegistry:
    """A plugin system using functools for caching and composition."""

    def __init__(self):
        self._plugins = defaultdict(list)
        self._middleware = []

    def register(self, event_type, priority=0):
        """Decorator to register a handler for an event type."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            wrapper._priority = priority
            self._plugins[event_type].append(wrapper)
            self._plugins[event_type].sort(
                key=lambda f: f._priority, reverse=True
            )
            return wrapper
        return decorator

    def add_middleware(self, middleware_func):
        """Add a middleware function that processes all events."""
        self._middleware.append(middleware_func)

    @lru_cache(maxsize=64)
    def get_handlers(self, event_type):
        """Get cached tuple of handlers for an event type."""
        return tuple(self._plugins.get(event_type, []))

    def emit(self, event_type, data):
        """Emit an event through middleware then to handlers."""
        # Apply middleware chain using reduce
        processed = reduce(
            lambda d, mw: mw(event_type, d),
            self._middleware,
            data
        )

        handlers = self._plugins.get(event_type, [])
        results = []
        for handler in handlers:
            result = handler(processed)
            if result is not None:
                results.append(result)
        return results


# Create registry and register plugins
registry = PluginRegistry()

# Middleware: add timestamp to all events
def timestamp_middleware(event_type, data):
    return {**data, "timestamp": time.time()}

# Middleware: log all events
def logging_middleware(event_type, data):
    print(f"  [LOG] Event '{event_type}' with keys: {list(data.keys())}")
    return data

registry.add_middleware(timestamp_middleware)
registry.add_middleware(logging_middleware)

@registry.register("user.login", priority=10)
def validate_login(data):
    """Check if the user credentials are valid."""
    if data.get("username") and data.get("password"):
        return {"status": "validated", "user": data["username"]}
    return {"status": "invalid"}

@registry.register("user.login", priority=5)
def record_login(data):
    """Record the login attempt for analytics."""
    return {"recorded": True, "user": data.get("username")}

@registry.register("data.transform", priority=10)
def normalize_data(data):
    """Normalize string fields to lowercase."""
    return {
        k: v.lower() if isinstance(v, str) else v
        for k, v in data.items()
    }

# Using partial to create pre-configured event emitters
emit_login = partial(registry.emit, "user.login")
emit_transform = partial(registry.emit, "data.transform")

# Emit events
print("Login event:")
results = emit_login({"username": "alice", "password": "secret123"})
for r in results:
    print(f"  Result: {r}")

print("\nTransform event:")
results = emit_transform({"Name": "BOB", "City": "MELBOURNE", "age": 25})
for r in results:
    print(f"  Result: {r}")
Login event:
[LOG] Event 'user.login' with keys: ['username', 'password', 'timestamp']
Result: {'status': 'validated', 'user': 'alice'}
Result: {'recorded': True, 'user': 'alice'}

Transform event:
[LOG] Event 'data.transform' with keys: ['Name', 'City', 'age', 'timestamp']
Result: {'Name': 'bob', 'City': 'melbourne', 'age': 25, 'timestamp': 1712745600.0}

partial(): freeze arguments and pass the function. Currying, almost.
partial(): freeze arguments and pass the function. Currying, almost.

Common Pitfalls and Troubleshooting

Problem Cause Solution
TypeError: unhashable type with lru_cache Passing a list or dict as argument to cached function Convert to tuple or frozenset before passing
Memory growing with @cache Unbounded cache stores every unique call Use @lru_cache(maxsize=N) to limit cache size
cached_property not updating Value computed once and stored as attribute Delete the attribute with del obj.prop to force recompute
Decorated function loses metadata Missing @wraps in decorator Add @wraps(func) to every decorator wrapper
reduce gives unexpected result Missing initial value argument Pass initializer as third argument to reduce()
partial kwargs overridden Caller passes same keyword argument Document which args are frozen or use positional args

Frequently Asked Questions

What is the difference between @cache and @lru_cache?

@cache is equivalent to @lru_cache(maxsize=None). It stores every unique call result forever without evicting old entries. Use @lru_cache(maxsize=N) when you want to limit memory usage by keeping only the N most recent unique results. For most applications, @lru_cache with a reasonable maxsize is the safer choice.

Can I use lru_cache on class methods?

Yes, but with a caveat. The self parameter becomes part of the cache key, meaning each instance gets its own cache entries. For instance-level caching, consider @cached_property instead. For class-level caching, use a module-level function or a custom descriptor.

Is functools.reduce the same as a for loop?

Functionally, yes — reduce performs the same cumulative operation you could write with a for loop. However, reduce expresses the intent more declaratively. Use reduce when the accumulation pattern is clear and concise. If the logic is complex or needs multiple lines, a regular for loop is more readable and Pythonic.

How do I clear a cached_property value?

Delete the attribute using del instance.property_name. The next access will recompute and re-cache the value. This works because cached_property stores the result as a regular instance attribute that shadows the descriptor.

When should I use partial instead of a lambda?

Use partial() when you want to freeze arguments of an existing function — it is more readable, preserves the original function's metadata, and works better with pickling. Use a lambda when you need a quick inline expression or when the logic goes beyond simple argument freezing. In general, partial is preferred for configuration-style currying.

@wraps: keep the metadata when you wrap a function. Use it always.
@wraps: keep the metadata when you wrap a function. Use it always.

Conclusion

The functools module gives you powerful tools that make Python functions more flexible and efficient. You learned how @lru_cache and @cache can dramatically speed up expensive or recursive functions, how partial() creates specialized versions of general functions, how reduce() handles cumulative operations, and how @wraps keeps your decorators well-behaved.

These tools work beautifully together, as the plugin system example showed. Start by adding @lru_cache to your most expensive functions and @wraps to your decorators — those two changes alone will improve most Python projects. From there, explore partial() and cached_property as your needs grow.

partial(): Pre-Fill Arguments

from functools import partial

def power(base, exponent):
    return base ** exponent

square = partial(power, exponent=2)
cube = partial(power, exponent=3)

print(square(5))   # 25
print(cube(3))     # 27

# Useful for callbacks and higher-order functions
import threading
def task(user_id, action):
    print("Running:", action, "for", user_id)

threading.Timer(5, partial(task, "user123", "cleanup")).start()

partial freezes some arguments; the resulting callable accepts the rest. Perfect for adapting a function's signature to a callback that expects fewer arguments.

@lru_cache: Memoization for Free

from functools import lru_cache

@lru_cache(maxsize=1000)
def expensive(query):
    return slow_database_lookup(query)

# Cache stats
print(expensive.cache_info())     # CacheInfo(hits=12, misses=4, maxsize=1000, currsize=4)
expensive.cache_clear()

@lru_cache turns any pure function into a memoized one. Calls with the same arguments return the cached result. maxsize=None for unbounded; finite values evict least-recently-used entries.

@cache: lru_cache With Unbounded Memory

from functools import cache

@cache
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

print(fibonacci(100))   # instant, even though recursive

@cache (Python 3.9+) is shorthand for @lru_cache(maxsize=None) — slightly faster, simpler API.

reduce(): Fold a Sequence

from functools import reduce

# Sum of a list (just use sum())
total = reduce(lambda acc, x: acc + x, [1, 2, 3, 4, 5])   # 15

# Product
import operator
product = reduce(operator.mul, [1, 2, 3, 4, 5])   # 120

# Max
maximum = reduce(lambda a, b: a if a > b else b, [5, 3, 8, 1])   # 8

# Build a dict from key-value pairs
pairs = [("a", 1), ("b", 2), ("c", 3)]
d = reduce(lambda acc, kv: {**acc, kv[0]: kv[1]}, pairs, {})

reduce is powerful but often overkill. For common operations (sum, max, min) Python has builtins. Reach for reduce only when the operation doesn't have a standard equivalent.

@wraps: Preserve Decorated Function Metadata

from functools import wraps
import time

def timed(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        print(f"{func.__name__} took {time.perf_counter()-t0:.4f}s")
        return result
    return wrapper

@timed
def slow_function(x):
    """Does something slow."""
    time.sleep(0.1)
    return x

slow_function(5)
print(slow_function.__name__)   # 'slow_function' (not 'wrapper')
print(slow_function.__doc__)    # 'Does something slow.' (preserved)

Without @wraps, your decorated function's __name__ and __doc__ become 'wrapper' and None — breaks help(), Sphinx, inspect. Always use @wraps on decorator inner functions.

singledispatch: Type-Based Function Overloading

from functools import singledispatch

@singledispatch
def render(item):
    raise NotImplementedError(f"No renderer for {type(item)}")

@render.register
def _(item: int):
    return f"Integer: {item}"

@render.register
def _(item: str):
    return f"String: {item!r}"

@render.register
def _(item: list):
    return "List of " + str(len(item)) + " items"

print(render(42))         # 'Integer: 42'
print(render("hello"))    # "String: 'hello'"
print(render([1, 2, 3]))  # 'List of 3 items'

Python's answer to method overloading. The right implementation runs based on the FIRST argument's type. Useful for serializers, formatters, anything that's polymorphic.

Common Pitfalls

  • lru_cache on mutable arguments. Arguments must be hashable. Lists, dicts, sets — won't cache. Convert to tuple/frozenset/string before calling.
  • Method caching memory leak. @lru_cache on a method captures self in the cache key, preventing instance garbage collection. Use cached_property for instance methods.
  • Decorator without @wraps. Loses function metadata. Always use @wraps inside decorator definitions.
  • Mutating cached return. @cache returns a reference, not a copy. Modifying the result corrupts the cache. Make defensive copies if the caller might mutate.
  • singledispatch on positional args. Only dispatches on the FIRST argument. For multi-argument dispatch, use plum or multipledispatch.

FAQ

Q: lru_cache or cachetools?
A: lru_cache for simple cases (one function, in-memory, no TTL). cachetools when you need TTLs, custom eviction policies, or thread safety beyond stdlib.

Q: Will lru_cache leak memory?
A: With finite maxsize, no — evicts LRU entries. With maxsize=None (or @cache), can grow unbounded. Monitor with cache_info().

Q: Use partial vs lambda?
A: partial when you want a picklable, named-keyword version (useful for multiprocessing). lambda when it's a one-liner used immediately.

Q: reduce vs explicit loop?
A: Loop is usually clearer. reduce shines for chain operations where the loop body is short and the accumulator is the whole story.

Q: cached_property when?
A: For per-instance memoization of expensive properties — first access computes, subsequent accesses return cached. Cleanly garbage-collects with the instance.

Wrapping Up

functools is a toolbox of higher-order primitives. partial for argument freezing; lru_cache/cache for memoization; wraps for decorator hygiene; singledispatch for type-based polymorphism; reduce for folds. Master these and your Python becomes more expressive without becoming clever-for-clever's-sake.

Related Articles

How To Use Python Contextlib for Resource Management

How To Use Python Contextlib for Resource Management

Intermediate

Every Python developer learns to use with open('file.txt') as f early on, but few explore the full power of context managers beyond file handling. If you have ever needed to manage database connections, acquire locks, temporarily change settings, or ensure cleanup code always runs, the contextlib module is your toolkit. It turns complex resource management into clean, readable code.

The contextlib module is part of Python’s standard library, so there is nothing to install. It provides decorators and utilities that let you create context managers without writing a full class with __enter__ and __exit__ methods. The most powerful tool is @contextmanager, which turns a simple generator function into a fully functional context manager.

In this tutorial, you will learn how to create custom context managers with @contextmanager, manage multiple resources with ExitStack, suppress specific exceptions cleanly, redirect output streams, and build reusable resource management patterns. By the end, you will write context managers as naturally as you write regular functions.

Custom Context Manager: Quick Example

Here is how to create a timing context manager that measures how long a block of code takes to run:

# quick_timer.py
from contextlib import contextmanager
import time

@contextmanager
def timer(label="Block"):
    start = time.perf_counter()
    yield
    elapsed = time.perf_counter() - start
    print(f"{label} took {elapsed:.4f} seconds")

# Use it
with timer("Data processing"):
    total = sum(range(1_000_000))
    print(f"Sum: {total}")

Output:

Sum: 499999500000
Data processing took 0.0312 seconds

The @contextmanager decorator transforms the generator function into a context manager. Everything before yield runs on entry (like __enter__), and everything after yield runs on exit (like __exit__). The yield itself is where your with block executes.

What is contextlib and Why Use It?

Context managers in Python follow the protocol defined by the __enter__ and __exit__ magic methods. Any object with these two methods can be used with the with statement. The contextlib module provides shortcuts that save you from writing boilerplate class definitions for simple use cases.

Without contextlib, creating a context manager requires a full class with two methods. With it, you can do the same thing in a few lines using a generator function. This matters because context managers are everywhere in professional Python code — they manage database transactions, HTTP sessions, temporary files, thread locks, and any resource that needs guaranteed cleanup.

ApproachLines of CodeBest For
Class with __enter__/__exit__10-20 linesComplex state management, reusable libraries
@contextmanager decorator5-10 linesSimple setup/teardown, one-off utilities
ExitStack3-5 linesDynamic number of resources, conditional cleanup

The rule of thumb: if your context manager is straightforward setup-yield-cleanup, use @contextmanager. If it needs to manage dynamic or conditional resources, reach for ExitStack.

with-blocks: clean up always happens, even when things blow up.
with-blocks: clean up always happens, even when things blow up.

The @contextmanager Decorator

The @contextmanager decorator is the workhorse of the module. It takes a generator that yields exactly once and turns it into a context manager. Here is a practical example that temporarily changes the working directory:

# change_dir.py
from contextlib import contextmanager
import os

@contextmanager
def change_directory(path):
    """Temporarily change the working directory."""
    original = os.getcwd()
    try:
        os.chdir(path)
        yield path
    finally:
        os.chdir(original)

# Usage
print(f"Before: {os.getcwd()}")
with change_directory("/tmp") as new_dir:
    print(f"Inside: {os.getcwd()}")
    print(f"Yielded: {new_dir}")
print(f"After: {os.getcwd()}")

Output:

Before: /home/user/project
Inside: /tmp
Yielded: /tmp
After: /home/user/project

The try/finally block inside the generator ensures cleanup happens even if an exception occurs within the with block. Whatever you pass to yield becomes the value bound by as in the with statement.

Handling Errors in Context Managers

When an exception occurs inside a with block, it gets thrown into the generator at the yield point. You can catch it, handle it, or let it propagate:

# error_handling.py
from contextlib import contextmanager

@contextmanager
def safe_operation(name):
    """Context manager that logs errors without suppressing them."""
    print(f"Starting: {name}")
    try:
        yield
    except Exception as e:
        print(f"Error in {name}: {type(e).__name__}: {e}")
        raise  # Re-raise to let caller handle it
    finally:
        print(f"Finished: {name}")

# Normal usage
with safe_operation("calculation"):
    result = 42 / 2
    print(f"Result: {result}")

print("---")

# With an error
try:
    with safe_operation("bad calculation"):
        result = 42 / 0
except ZeroDivisionError:
    print("Caught the error outside")

Output:

Starting: calculation
Result: 21.0
Finished: calculation
---
Starting: bad calculation
Error in bad calculation: ZeroDivisionError: division by zero
Finished: bad calculation
Caught the error outside

The finally block guarantees cleanup runs whether the operation succeeds or fails. If you want to suppress the exception (prevent it from propagating), do not re-raise it — but be careful, as silently swallowing exceptions makes debugging difficult.

Managing Multiple Resources with ExitStack

When you need to manage a variable number of resources — like opening multiple files determined at runtime — ExitStack is the right tool:

# exitstack_demo.py
from contextlib import ExitStack
import tempfile
import os

def process_multiple_files(filenames):
    """Open and process multiple files safely."""
    with ExitStack() as stack:
        # Open all files -- ExitStack closes them all on exit
        files = [
            stack.enter_context(open(f, 'w'))
            for f in filenames
        ]
        
        # Write to each file
        for i, f in enumerate(files):
            f.write(f"Content for file {i}\n")
            print(f"Wrote to {filenames[i]}")
        
        print(f"All {len(files)} files open simultaneously")
    # All files are closed here, even if an error occurred
    print("All files closed")

# Create temp files for demo
temp_dir = tempfile.mkdtemp()
filenames = [os.path.join(temp_dir, f"file_{i}.txt") for i in range(3)]
process_multiple_files(filenames)

# Verify files are written and closed
for f in filenames:
    print(f"{os.path.basename(f)}: {open(f).read().strip()}")

Output:

Wrote to /tmp/tmpXXXXXX/file_0.txt
Wrote to /tmp/tmpXXXXXX/file_1.txt
Wrote to /tmp/tmpXXXXXX/file_2.txt
All 3 files open simultaneously
All files closed
file_0.txt: Content for file 0
file_1.txt: Content for file 1
file_2.txt: Content for file 2

The key advantage of ExitStack is that it handles cleanup for all registered resources, even if opening a later resource fails. Without it, you would need deeply nested with statements or manual cleanup logic.

@contextmanager: a decorator that turns a generator into a with-block.
@contextmanager: a decorator that turns a generator into a with-block.

Suppressing Exceptions with suppress()

Sometimes you want to ignore specific exceptions cleanly. Instead of writing try/except: pass, use contextlib.suppress():

# suppress_demo.py
from contextlib import suppress
import os

# Without suppress (verbose)
try:
    os.remove("nonexistent_file.txt")
except FileNotFoundError:
    pass

# With suppress (clean)
with suppress(FileNotFoundError):
    os.remove("nonexistent_file.txt")

# Multiple exception types
with suppress(FileNotFoundError, PermissionError):
    os.remove("/protected/file.txt")

print("Cleanup complete -- no crashes")

Output:

Cleanup complete -- no crashes

Use suppress() when you genuinely do not care about the exception — like deleting a file that might not exist, or disconnecting a client that might already be disconnected. Do not use it to hide errors you should be handling.

Redirecting Output with redirect_stdout

The redirect_stdout and redirect_stderr context managers temporarily redirect output streams. This is useful for capturing output from third-party libraries or silencing noisy functions:

# redirect_demo.py
from contextlib import redirect_stdout, redirect_stderr
import io

# Capture stdout to a string
buffer = io.StringIO()
with redirect_stdout(buffer):
    print("This goes to the buffer")
    print("So does this")

captured = buffer.getvalue()
print(f"Captured {len(captured)} characters:")
print(repr(captured))

# Silence stderr
with redirect_stderr(io.StringIO()):
    import warnings
    warnings.warn("This warning is silenced")

Output:

Captured 39 characters:
'This goes to the buffer\nSo does this\n'

This pattern is especially useful in testing, where you need to verify what a function prints without modifying the function itself.

ExitStack: stack a dozen context managers without nested ifs.
ExitStack: stack a dozen context managers without nested ifs.

Real-Life Example: Database Transaction Manager

Let us build a practical transaction manager that handles database connections, commits on success, and rolls back on failure — all with proper resource cleanup:

# transaction_manager.py
from contextlib import contextmanager, ExitStack
import sqlite3
import os
import tempfile

@contextmanager
def database_connection(db_path):
    """Manage a database connection lifecycle."""
    conn = sqlite3.connect(db_path)
    try:
        yield conn
    finally:
        conn.close()

@contextmanager
def transaction(conn):
    """Manage a database transaction with auto-commit/rollback."""
    cursor = conn.cursor()
    try:
        yield cursor
        conn.commit()
        print("Transaction committed")
    except Exception as e:
        conn.rollback()
        print(f"Transaction rolled back: {e}")
        raise

@contextmanager
def managed_database(db_path):
    """Complete database session with connection and transaction."""
    with ExitStack() as stack:
        conn = stack.enter_context(database_connection(db_path))
        cursor = stack.enter_context(transaction(conn))
        yield cursor

# Demo
db_path = os.path.join(tempfile.mkdtemp(), "demo.db")

# Successful transaction
with managed_database(db_path) as cursor:
    cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
    cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))
    cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com"))

# Read back
with managed_database(db_path) as cursor:
    cursor.execute("SELECT * FROM users")
    for row in cursor.fetchall():
        print(f"  User: {row}")

# Failed transaction (rollback)
print("\nAttempting bad insert:")
try:
    with managed_database(db_path) as cursor:
        cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Charlie", "charlie@example.com"))
        raise ValueError("Simulated error -- transaction should rollback")
except ValueError:
    pass

# Verify rollback worked
with managed_database(db_path) as cursor:
    cursor.execute("SELECT COUNT(*) FROM users")
    count = cursor.fetchone()[0]
    print(f"Users after rollback: {count} (Charlie was NOT added)")

Output:

Transaction committed
Transaction committed
  User: (1, 'Alice', 'alice@example.com')
  User: (2, 'Bob', 'bob@example.com')

Attempting bad insert:
Transaction rolled back: Simulated error -- transaction should rollback
Transaction committed
Users after rollback: 2 (Charlie was NOT added)

This example composes three context managers: database_connection handles the connection lifecycle, transaction handles commit/rollback logic, and managed_database combines both using ExitStack. The composition pattern keeps each context manager focused on a single responsibility while providing a convenient combined interface.

Frequently Asked Questions

When should I use a class-based context manager instead of @contextmanager?

Use a class when you need to store state between __enter__ and __exit__, when the context manager will be reused as a library component, or when you need the __exit__ method’s exception arguments to make decisions about exception handling. Use @contextmanager for simple setup-teardown patterns where a generator function is more readable.

Does contextlib work with async code?

Yes. Python 3.7+ includes contextlib.asynccontextmanager for async generators, and AsyncExitStack for managing async resources. The patterns are identical — just use async with instead of with and async def instead of def.

Can I nest context managers?

Yes, and there are multiple ways: nested with statements, comma-separated managers in a single with statement (with A() as a, B() as b:), or ExitStack for dynamic nesting. The comma syntax is preferred for a fixed number of managers; ExitStack is preferred when the number is determined at runtime.

Can I reuse a context manager instance?

It depends. @contextmanager-based managers are single-use — calling __enter__ twice on the same instance raises a RuntimeError. Class-based managers can be made reentrant by resetting state in __enter__. If you need a reentrant version, create a new instance each time or use a factory function.

What is contextlib.closing() for?

The closing() wrapper calls .close() on an object when the with block exits. Use it for objects that have a close() method but do not implement the context manager protocol natively — like urllib.request.urlopen() in older Python versions or custom connection objects.

Conclusion

The contextlib module transforms resource management from boilerplate-heavy class definitions into clean, expressive patterns. We covered @contextmanager for generator-based managers, ExitStack for dynamic resource composition, suppress() for clean exception ignoring, and redirect_stdout for output capture. The transaction manager example showed how these tools compose into production-ready patterns.

Try extending the database transaction manager with connection pooling, nested savepoints, or retry logic. For the complete API reference, see the official contextlib documentation.

How To Build a WebSocket Server with Python

How To Build a WebSocket Server with Python

Intermediate

If you have ever built a chat application, a live dashboard, or a multiplayer game, you know that regular HTTP requests fall short when you need real-time, bidirectional communication. Polling the server every few seconds wastes bandwidth and adds latency. WebSockets solve this by keeping a persistent connection open between client and server, allowing both sides to send messages at any time.

Python’s websockets library makes building WebSocket servers and clients remarkably straightforward. It is built on top of asyncio, so it integrates naturally with Python’s async ecosystem. You only need to install one package — pip install websockets — and you are ready to go.

In this tutorial, you will learn how to create a WebSocket server, build a client that connects to it, implement broadcast messaging for chat-style applications, handle connection lifecycle events, and add basic authentication. By the end, you will have a working real-time chat server that you can extend for your own projects.

WebSocket Echo Server: Quick Example

Here is the simplest possible WebSocket server — it echoes back whatever the client sends:

# echo_server.py
import asyncio
import websockets

async def echo(websocket):
    async for message in websocket:
        print(f"Received: {message}")
        await websocket.send(f"Echo: {message}")

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        print("Echo server running on ws://localhost:8765")
        await asyncio.Future()  # Run forever

asyncio.run(main())

Output (server):

Echo server running on ws://localhost:8765
Received: Hello WebSocket!
Received: How are you?

The websockets.serve() function starts a server that calls the echo handler for every new connection. The async for message in websocket pattern automatically handles the connection lifecycle — it reads messages until the client disconnects, then exits cleanly. We will build on this pattern throughout the tutorial.

What Are WebSockets and Why Use Them?

WebSockets are a communication protocol that provides full-duplex (two-way) communication channels over a single TCP connection. Unlike HTTP, where the client must initiate every request, WebSockets allow either side to send data at any time after the initial handshake.

The WebSocket protocol starts with an HTTP upgrade request. The client sends a regular HTTP request with an Upgrade: websocket header, and if the server agrees, the connection is upgraded from HTTP to WebSocket. From that point on, both sides communicate using lightweight WebSocket frames instead of full HTTP requests.

FeatureHTTPWebSocket
DirectionClient to server onlyBidirectional
ConnectionNew connection per requestPersistent single connection
OverheadHeaders on every requestMinimal frame overhead (2-14 bytes)
LatencyRound-trip per requestNear real-time
Use caseREST APIs, page loadsChat, live feeds, gaming

Use WebSockets when you need low-latency, real-time updates. Use HTTP when you need stateless request-response patterns (like REST APIs).

WebSockets: HTTP grew tired of one-way conversations.
WebSockets: HTTP grew tired of one-way conversations.

Setting Up the websockets Library

Install the library with pip:

# install.sh
pip install websockets

Output:

Successfully installed websockets-13.1

The websockets library requires Python 3.8 or later. It has no dependencies beyond the standard library’s asyncio module, keeping your dependency tree clean.

Building a WebSocket Client

To test our servers, we need a client. Here is a simple interactive client that sends messages and prints responses:

# client.py
import asyncio
import websockets

async def chat_client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        print("Connected to server. Type messages (Ctrl+C to quit):")
        
        # Send a greeting
        await websocket.send("Hello from Python client!")
        response = await websocket.recv()
        print(f"Server says: {response}")
        
        # Interactive loop
        while True:
            message = input("> ")
            await websocket.send(message)
            response = await websocket.recv()
            print(f"Server says: {response}")

asyncio.run(chat_client())

Output:

Connected to server. Type messages (Ctrl+C to quit):
Server says: Echo: Hello from Python client!
> Testing 123
Server says: Echo: Testing 123

The websockets.connect() context manager handles the connection lifecycle automatically. When you exit the async with block (or the program ends), the connection closes cleanly with a proper WebSocket close handshake.

Push notifications, live dashboards, chat. WebSockets does it all.
Push notifications, live dashboards, chat. WebSockets does it all.

Building a Broadcast Chat Server

A real chat server needs to broadcast messages from one client to all connected clients. This requires tracking active connections in a set:

# chat_server.py
import asyncio
import websockets
import json
from datetime import datetime

connected_clients = set()

async def broadcast(message, sender=None):
    """Send a message to all connected clients except the sender."""
    disconnected = set()
    for client in connected_clients:
        if client != sender:
            try:
                await client.send(message)
            except websockets.ConnectionClosed:
                disconnected.add(client)
    # Clean up disconnected clients
    connected_clients -= disconnected

async def chat_handler(websocket):
    """Handle a single client connection."""
    # Register client
    connected_clients.add(websocket)
    client_id = f"User-{id(websocket) % 1000}"
    print(f"{client_id} connected. Total clients: {len(connected_clients)}")
    
    # Notify others
    join_msg = json.dumps({
        "type": "system",
        "message": f"{client_id} joined the chat",
        "timestamp": datetime.now().isoformat()
    })
    await broadcast(join_msg, sender=websocket)
    
    try:
        async for message in websocket:
            # Wrap message with metadata
            chat_msg = json.dumps({
                "type": "chat",
                "sender": client_id,
                "message": message,
                "timestamp": datetime.now().isoformat()
            })
            print(f"{client_id}: {message}")
            await broadcast(chat_msg)
    except websockets.ConnectionClosed:
        pass
    finally:
        # Unregister client
        connected_clients.discard(websocket)
        leave_msg = json.dumps({
            "type": "system",
            "message": f"{client_id} left the chat",
            "timestamp": datetime.now().isoformat()
        })
        await broadcast(leave_msg)
        print(f"{client_id} disconnected. Total clients: {len(connected_clients)}")

async def main():
    async with websockets.serve(chat_handler, "localhost", 8765):
        print("Chat server running on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Output (server with two clients):

Chat server running on ws://localhost:8765
User-142 connected. Total clients: 1
User-857 connected. Total clients: 2
User-142: Hello everyone!
User-857: Hey there!
User-142 disconnected. Total clients: 1

The connected_clients set tracks all active connections. When a client sends a message, broadcast() forwards it to every other connected client. The try/finally block ensures we clean up the client set even if the connection drops unexpectedly.

Handling Connection Errors Gracefully

Real-world WebSocket connections drop unexpectedly due to network issues, client crashes, or timeouts. Robust error handling is essential:

# robust_server.py
import asyncio
import websockets
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("websocket-server")

async def robust_handler(websocket):
    """Handler with comprehensive error management."""
    remote = websocket.remote_address
    logger.info(f"New connection from {remote}")
    
    try:
        # Set a ping interval to detect dead connections
        async for message in websocket:
            if len(message) > 10000:
                await websocket.send("Error: Message too long (max 10000 chars)")
                continue
            
            await websocket.send(f"Processed: {message}")
            
    except websockets.ConnectionClosedError as e:
        logger.warning(f"Connection closed with error: {e.code} {e.reason}")
    except websockets.ConnectionClosedOK:
        logger.info(f"Connection closed normally from {remote}")
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
    finally:
        logger.info(f"Cleanup complete for {remote}")

async def main():
    async with websockets.serve(
        robust_handler,
        "localhost",
        8765,
        ping_interval=20,     # Send ping every 20 seconds
        ping_timeout=10,      # Wait 10 seconds for pong
        max_size=2**20,       # Max message size: 1MB
        close_timeout=5       # Wait 5 seconds for close handshake
    ):
        logger.info("Robust server running on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Output:

INFO:websocket-server:Robust server running on ws://localhost:8765
INFO:websocket-server:New connection from ('127.0.0.1', 54321)
WARNING:websocket-server:Connection closed with error: 1006 
INFO:websocket-server:Cleanup complete for ('127.0.0.1', 54321)

The ping_interval and ping_timeout parameters enable automatic keepalive detection. If a client stops responding to pings within the timeout, the server closes the connection and triggers cleanup. The max_size parameter prevents clients from sending oversized messages that could exhaust server memory.

Heartbeats: prove the socket is alive, not just open.
Heartbeats: prove the socket is alive, not just open.

Adding Basic Authentication

You can authenticate WebSocket connections using query parameters, headers, or an initial authentication message. Here is a token-based approach:

# auth_server.py
import asyncio
import websockets
import json
import secrets

# Valid tokens (in production, use a database)
VALID_TOKENS = {
    "demo-token-abc123": "alice",
    "demo-token-def456": "bob"
}

async def authenticate(websocket):
    """Authenticate the client using the first message."""
    try:
        auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
        data = json.loads(auth_msg)
        token = data.get("token", "")
        
        for valid_token, username in VALID_TOKENS.items():
            if secrets.compare_digest(token, valid_token):
                await websocket.send(json.dumps({
                    "type": "auth", 
                    "status": "ok",
                    "username": username
                }))
                return username
        
        await websocket.send(json.dumps({
            "type": "auth",
            "status": "error", 
            "message": "Invalid token"
        }))
        return None
        
    except asyncio.TimeoutError:
        await websocket.send(json.dumps({
            "type": "auth",
            "status": "error",
            "message": "Authentication timeout"
        }))
        return None

async def secure_handler(websocket):
    """Handler that requires authentication."""
    username = await authenticate(websocket)
    if not username:
        await websocket.close(1008, "Authentication failed")
        return
    
    print(f"{username} authenticated successfully")
    
    async for message in websocket:
        response = f"[{username}] {message}"
        await websocket.send(response)

async def main():
    async with websockets.serve(secure_handler, "localhost", 8765):
        print("Secure server running on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Output:

Secure server running on ws://localhost:8765
alice authenticated successfully

The server expects the first message to contain a JSON object with a token field. It uses secrets.compare_digest() for timing-safe comparison and gives the client 5 seconds to authenticate before timing out. If authentication fails, the connection is closed with WebSocket status code 1008 (Policy Violation).

Real-Life Example: Live Notification System

Let us build a notification system where a server pushes real-time alerts to connected clients based on their subscribed topics:

# notification_server.py
import asyncio
import websockets
import json
from datetime import datetime
from collections import defaultdict

class NotificationServer:
    """Real-time notification server with topic subscriptions."""
    
    def __init__(self):
        self.subscribers = defaultdict(set)  # topic -> set of websockets
        self.clients = {}  # websocket -> client info
    
    async def handle_subscribe(self, websocket, topics):
        """Subscribe a client to one or more topics."""
        for topic in topics:
            self.subscribers[topic].add(websocket)
        self.clients[websocket]["topics"] = topics
        await websocket.send(json.dumps({
            "type": "subscribed",
            "topics": topics
        }))
    
    async def notify(self, topic, message):
        """Send a notification to all subscribers of a topic."""
        if topic not in self.subscribers:
            return 0
        
        notification = json.dumps({
            "type": "notification",
            "topic": topic,
            "message": message,
            "timestamp": datetime.now().isoformat()
        })
        
        sent = 0
        disconnected = set()
        for client in self.subscribers[topic]:
            try:
                await client.send(notification)
                sent += 1
            except websockets.ConnectionClosed:
                disconnected.add(client)
        
        # Cleanup
        self.subscribers[topic] -= disconnected
        return sent
    
    async def handler(self, websocket):
        """Main connection handler."""
        self.clients[websocket] = {"topics": [], "connected": datetime.now()}
        
        try:
            async for raw_message in websocket:
                data = json.loads(raw_message)
                action = data.get("action")
                
                if action == "subscribe":
                    await self.handle_subscribe(
                        websocket, data.get("topics", [])
                    )
                elif action == "publish":
                    topic = data.get("topic")
                    message = data.get("message")
                    count = await self.notify(topic, message)
                    await websocket.send(json.dumps({
                        "type": "published",
                        "topic": topic,
                        "recipients": count
                    }))
        except websockets.ConnectionClosed:
            pass
        finally:
            # Remove from all subscriptions
            for topic_subs in self.subscribers.values():
                topic_subs.discard(websocket)
            self.clients.pop(websocket, None)

server = NotificationServer()

async def main():
    async with websockets.serve(server.handler, "localhost", 8765):
        print("Notification server running on ws://localhost:8765")
        
        # Simulate periodic system notifications
        async def system_alerts():
            while True:
                await asyncio.sleep(30)
                await server.notify("system", "Heartbeat check - all systems normal")
        
        await asyncio.gather(
            asyncio.Future(),  # Run forever
            system_alerts()
        )

asyncio.run(main())

Output:

Notification server running on ws://localhost:8765

This server supports topic-based subscriptions. Clients subscribe to topics like “alerts”, “updates”, or “system”, and only receive notifications for their subscribed topics. The notify() method handles broadcasting to subscribers and automatically cleans up disconnected clients. You could extend this with persistent message queues, delivery confirmation, or priority levels.

Frequently Asked Questions

How many concurrent connections can a Python WebSocket server handle?

A single Python process using asyncio can typically handle 10,000-50,000 concurrent WebSocket connections, depending on message frequency and server resources. The websockets library is efficient with memory, using roughly 10KB per connection. For higher scale, use multiple processes behind a load balancer like Nginx.

How do I add TLS/SSL encryption (wss://)?

Pass an ssl context to websockets.serve(): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER), then load your certificate with ssl_context.load_cert_chain(certfile, keyfile). In production, use a reverse proxy like Nginx to handle TLS termination instead.

How do I handle client reconnection?

The websockets library does not auto-reconnect. On the client side, wrap your connection in a retry loop with exponential backoff. The websockets.connect() context manager closes cleanly, so you can simply reconnect in a while True loop with a try/except block catching ConnectionClosed errors.

Can I send binary data over WebSockets?

Yes. The websockets library automatically detects whether you send str (text frames) or bytes (binary frames). Use await websocket.send(b"\x00\x01\x02") for binary data. This is useful for sending images, audio streams, or protobuf-encoded messages.

Can I use WebSockets with Django or Flask?

Django supports WebSockets through Django Channels, which adds an ASGI layer. Flask does not natively support WebSockets, but you can use Flask-SocketIO or run a separate websockets server alongside Flask. For new projects needing WebSocket support, consider FastAPI, which has native WebSocket support built on Starlette.

Conclusion

You now have a solid foundation for building WebSocket applications in Python. We covered creating echo servers, building broadcast chat systems, handling connection errors gracefully, adding authentication, and implementing topic-based notification systems. The websockets library’s async-first design makes it easy to handle thousands of concurrent connections with clean, readable code.

Try extending the notification server with features like message persistence, delivery acknowledgments, or a web-based admin dashboard. For the complete API reference, see the official websockets documentation.

Bare websockets Library Server

The websockets library is the canonical pure-Python implementation — modern async, type-hinted, RFC-compliant. The minimal echo server is 8 lines:

# pip install websockets

import asyncio
from websockets.asyncio.server import serve

async def echo(ws):
    async for message in ws:
        await ws.send(f"Echo: {message}")

async def main():
    async with serve(echo, "0.0.0.0", 8765) as server:
        await server.serve_forever()

asyncio.run(main())

Connect from anywhere — browser JavaScript (new WebSocket("ws://localhost:8765")), wscat, or another Python client via websockets.connect. The handler runs once per connection; async for message reads incoming frames until the client disconnects.

Broadcasting to Multiple Clients

For chat rooms or live dashboards, track all connected clients in a set, broadcast each message to all of them:

import asyncio
from websockets.asyncio.server import serve
import json

clients = set()

async def handler(ws):
    clients.add(ws)
    try:
        async for raw in ws:
            payload = json.loads(raw)
            outbound = json.dumps({"user": payload["user"], "text": payload["text"]})
            # Send to everyone, ignore disconnected clients
            await asyncio.gather(
                *[c.send(outbound) for c in clients],
                return_exceptions=True,
            )
    finally:
        clients.remove(ws)

async def main():
    async with serve(handler, "0.0.0.0", 8765):
        await asyncio.Future()  # run forever

asyncio.run(main())

Auth, Subprotocols, and Path Routing

Production WebSocket servers need auth and routing. Use the process_request hook for HTTP-level checks before upgrading to WebSocket:

from websockets.asyncio.server import serve
from http import HTTPStatus

async def auth_handler(connection, request):
    # Reject before upgrade
    token = request.headers.get("Authorization", "").removeprefix("Bearer ")
    if not is_valid(token):
        return connection.respond(HTTPStatus.UNAUTHORIZED, "Bad token\n")
    # Attach user info for the handler
    connection.user_id = get_user_id_from_token(token)

async def handler(ws):
    user_id = ws.user_id
    async for msg in ws:
        # use user_id in messages
        pass

async def main():
    async with serve(handler, "0.0.0.0", 8765, process_request=auth_handler):
        await asyncio.Future()

WebSockets in FastAPI

If you already have a FastAPI app, FastAPI ships its own WebSocket support — no second framework needed:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()
active = []

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()
    active.append(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            for client in active:
                await client.send_text(f"[broadcast] {data}")
    except WebSocketDisconnect:
        active.remove(websocket)

Heartbeats and Reconnection

WebSockets across the public internet drop silently — proxies and NAT timeouts kill idle connections after 30-120 seconds. Send pings to keep them alive:

async with serve(handler, "0.0.0.0", 8765, ping_interval=20, ping_timeout=10):
    ...

For clients, implement exponential-backoff reconnect on the JS side — bare browsers don’t auto-reconnect.

Common Pitfalls

  • Blocking inside the handler. A synchronous time.sleep(5) blocks the entire event loop. Use await asyncio.sleep(5). Long CPU work belongs in asyncio.to_thread.
  • Forgetting to remove disconnected clients. Without cleanup, broadcasting tries to send on a dead socket — exception, server crashes. Use try/finally with remove.
  • Treating each message as JSON without try/except. A malformed message kills the handler. Wrap json.loads in a try/except and send an error message back.
  • Holding references in the broadcast list. Long-lived sets of WebSocket objects can leak memory if a handler exits without removing itself. Always remove in finally.
  • Skipping wss:// in production. Plain ws:// sends data unencrypted. Always terminate TLS at your reverse proxy and use wss://.

FAQ

Q: websockets library or FastAPI WebSockets?
A: FastAPI if you already have a FastAPI app. The websockets library when you want a dedicated, low-overhead WebSocket-only server.

Q: How many concurrent connections can one server handle?
A: Thousands on a single Python process, tens of thousands with uvloop. Beyond that, scale horizontally with a load balancer that supports sticky sessions or use a pub/sub layer (Redis) for cross-instance broadcasts.

Q: How do I scale broadcasts across multiple server instances?
A: Use Redis pub/sub as the message bus. Each instance subscribes to a channel; broadcasts go through Redis; every connected client gets the message regardless of which instance they’re on.

Q: Polling vs WebSockets vs SSE?
A: WebSockets for bidirectional real-time (chat, multiplayer games). Server-Sent Events for one-way push (dashboards, notifications). Polling when you need a backup. Each has its place.

Q: Behind nginx — what’s the config?
A: Add proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; to your location block. Increase proxy_read_timeout well beyond default (3600s).

Wrapping Up

WebSockets are the right answer for any real-time, bidirectional, low-latency need: chat, multiplayer, live cursors, streaming logs, collaborative editing. The websockets library is the modern Python implementation; FastAPI bundles equivalent functionality if you’re already using it. Pings, auth at upgrade time, and TLS termination at a reverse proxy are the production essentials.

How To Use Python Secrets Module for Secure Tokens

How To Use Python Secrets Module for Secure Tokens

Intermediate

You have probably written code that generates random tokens for password resets, API keys, or session identifiers. If you used the random module for this, your tokens are predictable — an attacker who knows the seed can reproduce every token you generate. This is not a theoretical risk; it has been the root cause of real security breaches.

Python 3.6 introduced the secrets module specifically for generating cryptographically secure random values. It is part of the standard library, so there is nothing to install. It uses the operating system’s best source of randomness (/dev/urandom on Linux, CryptGenRandom on Windows) and produces tokens that are safe for security-sensitive applications.

In this tutorial, you will learn how to generate secure tokens in multiple formats (hex, URL-safe, bytes), create safe passwords, build one-time password reset links, and compare tokens securely. By the end, you will have a complete toolkit for handling secrets in any Python application.

Generating a Secure Token: Quick Example

Here is the fastest way to generate a cryptographically secure token in Python:

# quick_token.py
import secrets

# Generate a 32-byte URL-safe token
token = secrets.token_urlsafe(32)
print(f"Token: {token}")
print(f"Length: {len(token)} characters")

Output:

Token: x7Kj2mN9pQrS1tUvWxYz3aB4cD5eF6gH7iJ8kL0mNo
Length: 43 characters

The secrets.token_urlsafe() function generates random bytes and encodes them as a URL-safe base64 string. The 32-byte input produces a 43-character token with enough entropy (256 bits) to resist brute-force attacks. We will explore all the token types and their use cases in the sections below.

What is the Secrets Module and Why Use It?

The secrets module provides functions for generating cryptographically strong random numbers suitable for managing secrets such as account authentication, tokens, and similar. Think of it as the security-focused counterpart to random.

The key difference is the source of randomness. The random module uses a Mersenne Twister algorithm — fast and statistically uniform, but deterministic. If someone discovers the internal state (which requires observing only 624 consecutive outputs), they can predict all future values. The secrets module draws from the OS entropy pool, which is non-deterministic and cannot be reversed.

Featurerandomsecrets
AlgorithmMersenne Twister (PRNG)OS entropy pool (CSPRNG)
Predictable?Yes, if seed is knownNo
SpeedVery fastSlightly slower
Use caseSimulations, games, testingPasswords, tokens, API keys
Thread-safe?No (shared state)Yes

The rule is simple: if the value protects something, use secrets. If it does not, random is fine.

secrets.token_urlsafe(): the right way to generate a token.
secrets.token_urlsafe(): the right way to generate a token.

Generating Tokens in Different Formats

The secrets module offers three token functions, each producing a different encoding of the same underlying random bytes. The format you choose depends on where you plan to use the token.

Hex Tokens with token_hex()

Hex tokens produce a string of hexadecimal characters (0-9, a-f). They are commonly used for database identifiers, session IDs, and anywhere you need a clean alphanumeric string.

# hex_tokens.py
import secrets

# Generate hex tokens of different lengths
token_16 = secrets.token_hex(16)  # 16 bytes = 32 hex chars
token_32 = secrets.token_hex(32)  # 32 bytes = 64 hex chars

print(f"16-byte hex: {token_16}")
print(f"32-byte hex: {token_32}")
print(f"Lengths: {len(token_16)}, {len(token_32)} characters")

Output:

16-byte hex: a3b7c9d1e5f20a4b6c8d0e2f4a6b8c0d
32-byte hex: f1a2b3c4d5e6f7089a0b1c2d3e4f50617a8b9c0d1e2f3a4b5c6d7e8f90a1b2c3
Lengths: 32, 64 characters

Each byte produces two hex characters, so token_hex(16) returns a 32-character string. For most applications, 32 bytes (256 bits) provides sufficient entropy.

URL-Safe Tokens with token_urlsafe()

URL-safe tokens use base64 encoding with - and _ instead of + and /. This makes them safe to include in URLs, query parameters, and HTTP headers without encoding issues.

# urlsafe_tokens.py
import secrets

token = secrets.token_urlsafe(32)
reset_link = f"https://example-app.com/reset?token={token}"

print(f"Token: {token}")
print(f"Reset link: {reset_link}")

Output:

Token: Rk9PQkFSLVRoaXMtaXMtYS1zYWZlLXRva2VuLXlheg
Reset link: https://example-app.com/reset?token=Rk9PQkFSLVRoaXMtaXMtYS1zYWZlLXRva2VuLXlheg

This is the most commonly used format for password reset links, email verification tokens, and API keys because it is compact and URL-compatible.

Raw Bytes with token_bytes()

When you need raw binary data — for encryption keys, HMACs, or feeding into other cryptographic functions — use token_bytes().

# raw_bytes.py
import secrets

raw = secrets.token_bytes(32)
print(f"Bytes: {raw}")
print(f"Length: {len(raw)} bytes")
print(f"As hex: {raw.hex()}")

Output:

Bytes: b'\xd4\x8a\x1f...(32 bytes of binary data)'
Length: 32 bytes
As hex: d48a1f3b7c9e2d...

Raw bytes are not human-readable, but they are the right choice when the token feeds directly into a cryptographic function like hmac.new() or a symmetric encryption library.

Generating Secure Passwords

The secrets module includes secrets.choice(), which selects a random element from a sequence using cryptographically secure randomness. Combined with Python’s string module, you can build password generators that meet any complexity requirement.

# password_gen.py
import secrets
import string

def generate_password(length=16):
    """Generate a secure password with mixed character types."""
    alphabet = string.ascii_letters + string.digits + string.punctuation
    
    # Ensure at least one of each required type
    password = [
        secrets.choice(string.ascii_uppercase),
        secrets.choice(string.ascii_lowercase),
        secrets.choice(string.digits),
        secrets.choice(string.punctuation),
    ]
    
    # Fill remaining length with random choices
    password += [secrets.choice(alphabet) for _ in range(length - 4)]
    
    # Shuffle to avoid predictable positions
    # Use secrets for the shuffle via sorting with random keys
    password.sort(key=lambda _: secrets.randbelow(1000))
    
    return ''.join(password)

# Generate 5 passwords
for i in range(5):
    pwd = generate_password(20)
    print(f"Password {i+1}: {pwd}")

Output:

Password 1: kQ7#mR2$nP9!xW4&bT6@
Password 2: jF3*hL8^vC1!yN5&dK7#
Password 3: wS6@tH4$pB9#mX2!qR8&
Password 4: gD1#zE5$cA3!oI7&uY9@
Password 5: fV8*aJ2^lG6!eM4&nW0#

The key detail is the shuffle step. Without it, the first four characters always follow the same type pattern (uppercase, lowercase, digit, punctuation). The secrets.randbelow() function provides a secure random integer that we use as a sort key to randomize character positions.

random is for games. secrets is for security.
random is for games. secrets is for security.

Comparing Tokens Securely

When verifying a token (checking if a submitted reset token matches the stored one), you must use constant-time comparison to prevent timing attacks. A timing attack measures how long the comparison takes — a regular == comparison returns False as soon as it finds the first mismatched character, leaking information about how many characters were correct.

# secure_compare.py
import secrets
import hmac

stored_token = "abc123def456"
submitted_token = "abc123def456"

# WRONG: vulnerable to timing attacks
if stored_token == submitted_token:
    print("Match (but insecure comparison)")

# RIGHT: constant-time comparison
if secrets.compare_digest(stored_token, submitted_token):
    print("Match (secure comparison)")

# Also works with bytes
stored_bytes = b"secret_token_value"
submitted_bytes = b"secret_token_value"
if secrets.compare_digest(stored_bytes, submitted_bytes):
    print("Bytes match (secure comparison)")

Output:

Match (but insecure comparison)
Match (secure comparison)
Bytes match (secure comparison)

The secrets.compare_digest() function always examines every character, taking the same amount of time regardless of where (or whether) the strings differ. This is the same function used internally by hmac.compare_digest().

Practical Security Patterns

Building a Password Reset Flow

Here is how to generate a time-limited password reset token with proper security practices:

# reset_flow.py
import secrets
import hashlib
import time

class TokenManager:
    """Manages secure, time-limited tokens."""
    
    def __init__(self, expiry_seconds=3600):
        self.tokens = {}  # In production, use a database
        self.expiry = expiry_seconds
    
    def create_token(self, user_id):
        """Generate a reset token for a user."""
        raw_token = secrets.token_urlsafe(32)
        # Store only the hash -- never store raw tokens
        token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
        self.tokens[token_hash] = {
            'user_id': user_id,
            'created': time.time()
        }
        return raw_token  # Send this to the user via email
    
    def verify_token(self, submitted_token):
        """Verify a submitted token and return user_id if valid."""
        token_hash = hashlib.sha256(submitted_token.encode()).hexdigest()
        
        # Constant-time lookup using compare_digest
        for stored_hash, data in self.tokens.items():
            if secrets.compare_digest(stored_hash, token_hash):
                # Check expiry
                if time.time() - data['created'] > self.expiry:
                    del self.tokens[stored_hash]
                    return None  # Expired
                return data['user_id']
        return None  # Not found

# Usage
manager = TokenManager(expiry_seconds=3600)
token = manager.create_token("user_42")
print(f"Reset token: {token}")
print(f"Verified: {manager.verify_token(token)}")
print(f"Invalid token: {manager.verify_token('wrong-token')}")

Output:

Reset token: aB3cD4eF5gH6iJ7kL8mN9oP0qR1sT2uV3wX4yZ5
Verified: user_42
Invalid token: None

The critical detail is that we store only the hash of the token, never the raw token itself. If the database is breached, attackers get useless hashes. The raw token only exists in the email sent to the user and in memory during verification.

Generating API Keys

API keys typically need a prefix for identification and a random body for security:

# api_keys.py
import secrets

def generate_api_key(prefix="pk"):
    """Generate a prefixed API key."""
    random_part = secrets.token_urlsafe(24)
    return f"{prefix}_{random_part}"

# Generate different key types
public_key = generate_api_key("pk")
secret_key = generate_api_key("sk")
test_key = generate_api_key("tk")

print(f"Public key:  {public_key}")
print(f"Secret key:  {secret_key}")
print(f"Test key:    {test_key}")

Output:

Public key:  pk_Rk9PQkFSLVRoaXMtaXMtYQ
Secret key:  sk_c2VjcmV0LWtleS12YWx1ZQ
Test key:    tk_dGVzdC1rZXktdmFsdWUtaGVyZQ

The prefix makes it easy to identify key types in logs and configuration files without revealing the secret portion. This is the same pattern used by services like Stripe (sk_live_, pk_test_).

Using random for passwords? You're rolling dice for the wolf.
Using random for passwords? You’re rolling dice for the wolf.

Real-Life Example: Secure Invitation System

Let us build a complete invitation system that generates secure invite codes, tracks their usage, and enforces expiration and single-use constraints.

# invitation_system.py
import secrets
import hashlib
import time
from datetime import datetime

class InvitationSystem:
    """Secure invitation code manager with expiry and usage tracking."""
    
    def __init__(self):
        self.invitations = {}
    
    def create_invite(self, creator, role="member", max_uses=1, 
                      expiry_hours=48):
        """Create a new invitation with constraints."""
        code = secrets.token_urlsafe(16)
        code_hash = hashlib.sha256(code.encode()).hexdigest()
        
        self.invitations[code_hash] = {
            'creator': creator,
            'role': role,
            'max_uses': max_uses,
            'current_uses': 0,
            'created': time.time(),
            'expiry': time.time() + (expiry_hours * 3600),
            'used_by': []
        }
        return code
    
    def redeem_invite(self, code, user_name):
        """Attempt to redeem an invitation code."""
        code_hash = hashlib.sha256(code.encode()).hexdigest()
        
        for stored_hash, invite in self.invitations.items():
            if not secrets.compare_digest(stored_hash, code_hash):
                continue
            
            if time.time() > invite['expiry']:
                return {"success": False, "error": "Invitation expired"}
            
            if invite['current_uses'] >= invite['max_uses']:
                return {"success": False, "error": "Invitation fully used"}
            
            invite['current_uses'] += 1
            invite['used_by'].append(user_name)
            return {
                "success": True,
                "role": invite['role'],
                "message": f"Welcome {user_name}! Role: {invite['role']}"
            }
        
        return {"success": False, "error": "Invalid invitation code"}
    
    def get_stats(self):
        """Get invitation usage statistics."""
        active = sum(1 for inv in self.invitations.values() 
                     if time.time() < inv['expiry'] 
                     and inv['current_uses'] < inv['max_uses'])
        expired = sum(1 for inv in self.invitations.values() 
                      if time.time() >= inv['expiry'])
        return {"active": active, "expired": expired, 
                "total": len(self.invitations)}

# Demo
system = InvitationSystem()

# Create invitations
admin_code = system.create_invite("alice", role="admin", max_uses=1)
team_code = system.create_invite("bob", role="editor", max_uses=5, 
                                  expiry_hours=72)

print(f"Admin invite: {admin_code}")
print(f"Team invite:  {team_code}")
print()

# Redeem invitations
print(system.redeem_invite(admin_code, "charlie"))
print(system.redeem_invite(admin_code, "dave"))  # Should fail
print(system.redeem_invite(team_code, "eve"))
print(system.redeem_invite("invalid-code", "mallory"))
print()

print(f"Stats: {system.get_stats()}")

Output:

Admin invite: xK7mN2pQ9rS1tUvW
Team invite:  aB3cD4eF5gH6iJ7k

{'success': True, 'role': 'admin', 'message': 'Welcome charlie! Role: admin'}
{'success': False, 'error': 'Invitation fully used'}
{'success': True, 'role': 'editor', 'message': 'Welcome eve! Role: editor'}
{'success': False, 'error': 'Invalid invitation code'}

Stats: {'active': 1, 'expired': 0, 'total': 2}

This system demonstrates several security best practices: hashed storage of codes, constant-time comparison, usage limits, and time-based expiration. In production, you would replace the in-memory dictionary with a database and add rate limiting to prevent brute-force code guessing.

Frequently Asked Questions

Can I just use random.SystemRandom() instead of secrets?

Yes, random.SystemRandom() also uses the OS entropy pool and is cryptographically secure. However, secrets is the recommended module since Python 3.6 because it provides a cleaner API specifically designed for security tasks. It also includes compare_digest() and token_urlsafe(), which SystemRandom does not.

How many bytes should my tokens be?

The Python documentation recommends at least 32 bytes (256 bits) for tokens used in security contexts. This provides sufficient entropy that brute-force guessing is infeasible even with massive computing resources. For lower-stakes use cases like email verification, 16 bytes (128 bits) is still very strong.

What happens if I call token_hex() with no arguments?

If you omit the byte count, secrets uses a default that is “reasonable for most use cases” — currently 32 bytes. However, it is better to specify the length explicitly so your code is self-documenting and immune to future default changes.

Is secrets slower than random?

Yes, but the difference is negligible for token generation. Generating 10,000 tokens with secrets takes about 50 milliseconds compared to 15 milliseconds with random. Since you typically generate tokens one at a time (not in bulk), the performance difference is irrelevant.

Should I use uuid4() or secrets for unique identifiers?

uuid.uuid4() generates random UUIDs using os.urandom(), so it is cryptographically secure. Use UUIDs when you need a standardized format (like database primary keys). Use secrets when you need a security token with a specific format (URL-safe, hex) or when you need compare_digest() for safe verification.

Conclusion

The secrets module gives you everything you need to generate cryptographically secure tokens, passwords, and random values in Python. We covered token_hex() for hexadecimal strings, token_urlsafe() for URL-compatible tokens, token_bytes() for raw binary data, and compare_digest() for timing-attack-resistant comparisons. We also built practical systems for password resets, API key generation, and invitation codes.

Try extending the invitation system with features like rate limiting, audit logging, or integration with a real database using SQLite or PostgreSQL. The secrets module is small but foundational — once you understand it, you can build secure authentication flows for any Python application.

For the full API reference, see the official Python secrets documentation.

How To Profile and Optimize Python Code Performance

How To Profile and Optimize Python Code Performance

Skill Level: Intermediate

Introduction to Modern HTTP Requests

For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.

Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.

In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.

Quick Example: GET Request in 5 Lines

# quick_get.py
import httpx

response = httpx.get("https://jsonplaceholder.typicode.com/posts/1")
print(response.status_code)
print(response.json())
200 {‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}

That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.

Timing reveals performance truths -- measure twice, optimize once

Timing reveals performance truths — measure twice, optimize once

What is httpx and Why Use It?

httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.

The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.

Let’s compare httpx to other HTTP libraries in the Python ecosystem:

Feature httpx requests aiohttp urllib3
Synchronous API Yes Yes No Yes
Async/Await Support Yes No Yes No
HTTP/2 Yes No Yes No
Connection Pooling Yes Yes Yes Yes
Streaming Support Yes Yes Yes Yes
API Complexity Low Low Medium High
Drop-in requests Replacement Mostly Yes N/A No No

httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.

Installing httpx

Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.

# install_httpx.sh
pip install httpx
Collecting httpx Downloading httpx-0.25.2-py3-none-any.whl (75 kB) Installing collected packages: httpx, certifi, sniffio, anyio Successfully installed httpx-0.25.2

If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.

# install_httpx_with_http2.sh
pip install httpx[http2]
Collecting httpx[http2] Downloading httpx-0.25.2-py3-none-any.whl Collecting h2 Downloading h2-4.1.0-py3-none-any.whl (65 kB) Successfully installed httpx, h2

That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.

Repeated benchmarks beat single measurements -- variance hides optimization truths

Repeated benchmarks beat single measurements — variance hides optimization truths

Making GET Requests

GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.

# get_requests.py
import httpx

# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")

# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")

# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200 Content-Type: application/json First user: Leanne Graham Posts for user 1: 10 {‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}

Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.

Making POST Requests

POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.

# post_requests.py
import httpx

# POST with JSON data
client = httpx.Client()
data = {
    "title": "New Post",
    "body": "This is a test post",
    "userId": 1
}
response = client.post(
    "https://jsonplaceholder.typicode.com/posts",
    json=data
)
print(f"Created post with ID: {response.json()['id']}")

# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
    "https://httpbin.org/post",
    data=form_data
)
print(f"Form post status: {response.status_code}")

# POST with custom timeout
try:
    response = client.post(
        "https://httpbin.org/delay/10",
        timeout=5.0
    )
except httpx.TimeoutException:
    print("Request timed out after 5 seconds")

client.close()
Created post with ID: 101 Form post status: 200 Request timed out after 5 seconds

When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.

Sort by cumtime -- the wall-clock time users actually experience

Sort by cumtime — the wall-clock time users actually experience

Using Async with httpx

This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.

# async_requests.py
import asyncio
import httpx

async def fetch_posts(user_id):
    """Fetch posts for a specific user"""
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
        )
        return response.json()

async def fetch_multiple_users():
    """Fetch posts for multiple users concurrently"""
    tasks = [
        fetch_posts(user_id)
        for user_id in range(1, 6)
    ]
    results = await asyncio.gather(*tasks)

    for i, posts in enumerate(results, 1):
        print(f"User {i}: {len(posts)} posts")

# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts User 2: 10 posts User 3: 9 posts User 4: 9 posts User 5: 9 posts

The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.

# async_with_timeout.py
import asyncio
import httpx

async def fetch_with_timeout():
    """Fetch with explicit timeout configuration"""
    timeout = httpx.Timeout(10.0)  # 10 second timeout for all operations

    async with httpx.AsyncClient(timeout=timeout) as client:
        try:
            response = await client.get(
                "https://jsonplaceholder.typicode.com/posts/1"
            )
            print(f"Success: {response.status_code}")
        except httpx.TimeoutException:
            print("Request timed out")
        except httpx.RequestError as e:
            print(f"Network error: {e}")

asyncio.run(fetch_with_timeout())
Success: 200

Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.

HTTP/2 Support

HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.

# http2_support.py
import httpx

# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")

# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()

# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2 HTTP Version (forced 1.1): HTTP/1.1 HTTP Version (with HTTP/2): HTTP/2

The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.

Local variables beat namespace lookups -- milliseconds add up in loops

Local variables beat namespace lookups — milliseconds add up in loops

Timeouts and Error Handling

Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.

# error_handling.py
import httpx

def fetch_with_fallback(url, fallback_url):
    """Fetch from primary URL with fallback"""
    try:
        response = httpx.get(url, timeout=5.0)
        response.raise_for_status()  # Raise exception for bad status codes
        return response.json()
    except httpx.TimeoutException:
        print(f"Timeout on {url}, trying fallback")
        return httpx.get(fallback_url).json()
    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e.response.status_code}")
        raise
    except httpx.RequestError as e:
        print(f"Request error: {e}")
        raise

try:
    data = fetch_with_fallback(
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2"
    )
    print(f"Fetched post: {data['title']}")
except Exception as e:
    print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…

httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.

# advanced_timeouts.py
import httpx

# Granular timeout control
timeout = httpx.Timeout(
    timeout=10.0,  # Default timeout
    connect=5.0,   # Connection timeout
    read=10.0,     # Read timeout
    write=10.0,    # Write timeout
    pool=10.0      # Connection pool timeout
)

client = httpx.Client(timeout=timeout)

# Override timeout for specific request
try:
    response = client.get(
        "https://httpbin.org/delay/2",
        timeout=2.0  # Override client timeout
    )
    print(f"Response: {response.status_code}")
except httpx.TimeoutException:
    print("Custom timeout exceeded")
finally:
    client.close()
Response: 200

Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.

Streaming Responses

When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.

# streaming_responses.py
import httpx

# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
    print(f"Status: {response.status_code}")
    print(f"Content-Length: {response.headers.get('content-length')}")

    # Process response in chunks
    for chunk in response.iter_bytes(chunk_size=256):
        print(f"Received {len(chunk)} bytes")

# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
    for line in response.iter_lines():
        if line:
            print(f"Line: {line[:50]}...")

# Async streaming
import asyncio

async def async_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
            async for chunk in response.aiter_bytes(chunk_size=128):
                print(f"Async received {len(chunk)} bytes")

asyncio.run(async_stream())
Status: 200 Content-Length: 1024 Received 256 bytes Received 256 bytes Received 256 bytes Received 256 bytes Line: {“slideshow”: {“author”: “Yours Truly”, “title”: … Async received 128 bytes Async received 128 bytes Async received 128 bytes Async received 128 bytes

Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.

Real-Life Example: Async API Data Aggregator

Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.

# api_aggregator.py
import asyncio
import httpx
from datetime import datetime

class APIAggregator:
    """Aggregates data from multiple APIs concurrently"""

    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.timeout = httpx.Timeout(10.0)

    async def fetch_post(self, client, post_id):
        """Fetch a single post"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/posts/{post_id}",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching post {post_id}: {e}")
            return None

    async def fetch_user_comments(self, client, user_id):
        """Fetch comments for a user"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching comments: {e}")
            return []

    async def aggregate(self):
        """Aggregate data from multiple endpoints"""
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            # Fetch posts concurrently
            post_tasks = [
                self.fetch_post(client, i)
                for i in range(1, 6)
            ]
            posts = await asyncio.gather(*post_tasks)

            # Fetch comments concurrently
            comment_tasks = [
                self.fetch_user_comments(client, i)
                for i in range(1, 3)
            ]
            comments = await asyncio.gather(*comment_tasks)

            return {
                "timestamp": datetime.now().isoformat(),
                "posts_fetched": len([p for p in posts if p]),
                "total_comments": sum(len(c) for c in comments if c),
                "sample_post": posts[0] if posts else None
            }

# Run the aggregator
async def main():
    aggregator = APIAggregator()
    results = await aggregator.aggregate()

    print(f"Aggregation completed at {results['timestamp']}")
    print(f"Posts fetched: {results['posts_fetched']}")
    print(f"Total comments: {results['total_comments']}")
    print(f"First post title: {results['sample_post']['title']}")

asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456 Posts fetched: 5 Total comments: 10 First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit

This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.

Frequently Asked Questions

Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.

Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.

Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.

Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.

Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.

Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.

Conclusion

httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.

Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.

  • How To Handle Asynchronous Programming in Python with asyncio
  • Understanding HTTP Status Codes: A Complete Guide
  • Building REST APIs with FastAPI and Python
  • Web Scraping with Python: Best Practices and Tools
  • Error Handling and Logging in Production Python Applications
How To Use the Python Walrus Operator := with Examples

How To Use the Python Walrus Operator := with Examples

Skill Level: Intermediate

Introduction to Modern HTTP Requests

For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.

Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.

In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.

Quick Example: GET Request in 5 Lines

# quick_get.py
import httpx

response = httpx.get("https://jsonplaceholder.typicode.com/posts/1")
print(response.status_code)
print(response.json())
200 {‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}

That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.

Assignment that makes code cooler

Assignment that makes code cooler

What is httpx and Why Use It?

httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.

The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.

Let’s compare httpx to other HTTP libraries in the Python ecosystem:

Feature httpx requests aiohttp urllib3
Synchronous API Yes Yes No Yes
Async/Await Support Yes No Yes No
HTTP/2 Yes No Yes No
Connection Pooling Yes Yes Yes Yes
Streaming Support Yes Yes Yes Yes
API Complexity Low Low Medium High
Drop-in requests Replacement Mostly Yes N/A No No

httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.

Installing httpx

Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.

# install_httpx.sh
pip install httpx
Collecting httpx Downloading httpx-0.25.2-py3-none-any.whl (75 kB) Installing collected packages: httpx, certifi, sniffio, anyio Successfully installed httpx-0.25.2

If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.

# install_httpx_with_http2.sh
pip install httpx[http2]
Collecting httpx[http2] Downloading httpx-0.25.2-py3-none-any.whl Collecting h2 Downloading h2-4.1.0-py3-none-any.whl (65 kB) Successfully installed httpx, h2

That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.

Assignment expressions unlock comprehension superpowers

Assignment expressions unlock comprehension superpowers

Making GET Requests

GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.

# get_requests.py
import httpx

# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")

# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")

# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200 Content-Type: application/json First user: Leanne Graham Posts for user 1: 10 {‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}

Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.

Making POST Requests

POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.

# post_requests.py
import httpx

# POST with JSON data
client = httpx.Client()
data = {
    "title": "New Post",
    "body": "This is a test post",
    "userId": 1
}
response = client.post(
    "https://jsonplaceholder.typicode.com/posts",
    json=data
)
print(f"Created post with ID: {response.json()['id']}")

# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
    "https://httpbin.org/post",
    data=form_data
)
print(f"Form post status: {response.status_code}")

# POST with custom timeout
try:
    response = client.post(
        "https://httpbin.org/delay/10",
        timeout=5.0
    )
except httpx.TimeoutException:
    print("Request timed out after 5 seconds")

client.close()
Created post with ID: 101 Form post status: 200 Request timed out after 5 seconds

When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.

When to walrus: regex matches and stream reading

When to walrus: regex matches and stream reading

Using Async with httpx

This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.

# async_requests.py
import asyncio
import httpx

async def fetch_posts(user_id):
    """Fetch posts for a specific user"""
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
        )
        return response.json()

async def fetch_multiple_users():
    """Fetch posts for multiple users concurrently"""
    tasks = [
        fetch_posts(user_id)
        for user_id in range(1, 6)
    ]
    results = await asyncio.gather(*tasks)

    for i, posts in enumerate(results, 1):
        print(f"User {i}: {len(posts)} posts")

# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts User 2: 10 posts User 3: 9 posts User 4: 9 posts User 5: 9 posts

The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.

# async_with_timeout.py
import asyncio
import httpx

async def fetch_with_timeout():
    """Fetch with explicit timeout configuration"""
    timeout = httpx.Timeout(10.0)  # 10 second timeout for all operations

    async with httpx.AsyncClient(timeout=timeout) as client:
        try:
            response = await client.get(
                "https://jsonplaceholder.typicode.com/posts/1"
            )
            print(f"Success: {response.status_code}")
        except httpx.TimeoutException:
            print("Request timed out")
        except httpx.RequestError as e:
            print(f"Network error: {e}")

asyncio.run(fetch_with_timeout())
Success: 200

Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.

HTTP/2 Support

HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.

# http2_support.py
import httpx

# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")

# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()

# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2 HTTP Version (forced 1.1): HTTP/1.1 HTTP Version (with HTTP/2): HTTP/2

The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.

Understanding when to skip the walrus keeps code maintainable

Understanding when to skip the walrus keeps code maintainable

Timeouts and Error Handling

Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.

# error_handling.py
import httpx

def fetch_with_fallback(url, fallback_url):
    """Fetch from primary URL with fallback"""
    try:
        response = httpx.get(url, timeout=5.0)
        response.raise_for_status()  # Raise exception for bad status codes
        return response.json()
    except httpx.TimeoutException:
        print(f"Timeout on {url}, trying fallback")
        return httpx.get(fallback_url).json()
    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e.response.status_code}")
        raise
    except httpx.RequestError as e:
        print(f"Request error: {e}")
        raise

try:
    data = fetch_with_fallback(
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2"
    )
    print(f"Fetched post: {data['title']}")
except Exception as e:
    print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…

httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.

# advanced_timeouts.py
import httpx

# Granular timeout control
timeout = httpx.Timeout(
    timeout=10.0,  # Default timeout
    connect=5.0,   # Connection timeout
    read=10.0,     # Read timeout
    write=10.0,    # Write timeout
    pool=10.0      # Connection pool timeout
)

client = httpx.Client(timeout=timeout)

# Override timeout for specific request
try:
    response = client.get(
        "https://httpbin.org/delay/2",
        timeout=2.0  # Override client timeout
    )
    print(f"Response: {response.status_code}")
except httpx.TimeoutException:
    print("Custom timeout exceeded")
finally:
    client.close()
Response: 200

Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.

Streaming Responses

When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.

# streaming_responses.py
import httpx

# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
    print(f"Status: {response.status_code}")
    print(f"Content-Length: {response.headers.get('content-length')}")

    # Process response in chunks
    for chunk in response.iter_bytes(chunk_size=256):
        print(f"Received {len(chunk)} bytes")

# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
    for line in response.iter_lines():
        if line:
            print(f"Line: {line[:50]}...")

# Async streaming
import asyncio

async def async_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
            async for chunk in response.aiter_bytes(chunk_size=128):
                print(f"Async received {len(chunk)} bytes")

asyncio.run(async_stream())
Status: 200 Content-Length: 1024 Received 256 bytes Received 256 bytes Received 256 bytes Received 256 bytes Line: {“slideshow”: {“author”: “Yours Truly”, “title”: … Async received 128 bytes Async received 128 bytes Async received 128 bytes Async received 128 bytes

Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.

Real-Life Example: Async API Data Aggregator

Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.

# api_aggregator.py
import asyncio
import httpx
from datetime import datetime

class APIAggregator:
    """Aggregates data from multiple APIs concurrently"""

    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.timeout = httpx.Timeout(10.0)

    async def fetch_post(self, client, post_id):
        """Fetch a single post"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/posts/{post_id}",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching post {post_id}: {e}")
            return None

    async def fetch_user_comments(self, client, user_id):
        """Fetch comments for a user"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching comments: {e}")
            return []

    async def aggregate(self):
        """Aggregate data from multiple endpoints"""
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            # Fetch posts concurrently
            post_tasks = [
                self.fetch_post(client, i)
                for i in range(1, 6)
            ]
            posts = await asyncio.gather(*post_tasks)

            # Fetch comments concurrently
            comment_tasks = [
                self.fetch_user_comments(client, i)
                for i in range(1, 3)
            ]
            comments = await asyncio.gather(*comment_tasks)

            return {
                "timestamp": datetime.now().isoformat(),
                "posts_fetched": len([p for p in posts if p]),
                "total_comments": sum(len(c) for c in comments if c),
                "sample_post": posts[0] if posts else None
            }

# Run the aggregator
async def main():
    aggregator = APIAggregator()
    results = await aggregator.aggregate()

    print(f"Aggregation completed at {results['timestamp']}")
    print(f"Posts fetched: {results['posts_fetched']}")
    print(f"Total comments: {results['total_comments']}")
    print(f"First post title: {results['sample_post']['title']}")

asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456 Posts fetched: 5 Total comments: 10 First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit

This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.

Frequently Asked Questions

Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.

Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.

Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.

Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.

Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.

Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.

Conclusion

httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.

Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.

  • How To Handle Asynchronous Programming in Python with asyncio
  • Understanding HTTP Status Codes: A Complete Guide
  • Building REST APIs with FastAPI and Python
  • Web Scraping with Python: Best Practices and Tools
  • Error Handling and Logging in Production Python Applications
How To Handle Rate Limiting in Python API Calls

How To Handle Rate Limiting in Python API Calls

Skill Level: Intermediate

Introduction to Modern HTTP Requests

For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.

Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.

In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.

Quick Example: GET Request in 5 Lines

# quick_get.py
import httpx

response = httpx.get("https://jsonplaceholder.typicode.com/posts/1")
print(response.status_code)
print(response.json())
200 {‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}

That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.

When the 429 wave hits, read the headers

When the 429 wave hits, read the headers

What is httpx and Why Use It?

httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.

The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.

Let’s compare httpx to other HTTP libraries in the Python ecosystem:

Feature httpx requests aiohttp urllib3
Synchronous API Yes Yes No Yes
Async/Await Support Yes No Yes No
HTTP/2 Yes No Yes No
Connection Pooling Yes Yes Yes Yes
Streaming Support Yes Yes Yes Yes
API Complexity Low Low Medium High
Drop-in requests Replacement Mostly Yes N/A No No

httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.

Installing httpx

Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.

# install_httpx.sh
pip install httpx
Collecting httpx Downloading httpx-0.25.2-py3-none-any.whl (75 kB) Installing collected packages: httpx, certifi, sniffio, anyio Successfully installed httpx-0.25.2

If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.

# install_httpx_with_http2.sh
pip install httpx[http2]
Collecting httpx[http2] Downloading httpx-0.25.2-py3-none-any.whl Collecting h2 Downloading h2-4.1.0-py3-none-any.whl (65 kB) Successfully installed httpx, h2

That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.

Headers are the bread crumbs out of the rate limit forest

Headers are the bread crumbs out of the rate limit forest

Making GET Requests

GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.

# get_requests.py
import httpx

# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")

# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")

# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200 Content-Type: application/json First user: Leanne Graham Posts for user 1: 10 {‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}

Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.

Making POST Requests

POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.

# post_requests.py
import httpx

# POST with JSON data
client = httpx.Client()
data = {
    "title": "New Post",
    "body": "This is a test post",
    "userId": 1
}
response = client.post(
    "https://jsonplaceholder.typicode.com/posts",
    json=data
)
print(f"Created post with ID: {response.json()['id']}")

# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
    "https://httpbin.org/post",
    data=form_data
)
print(f"Form post status: {response.status_code}")

# POST with custom timeout
try:
    response = client.post(
        "https://httpbin.org/delay/10",
        timeout=5.0
    )
except httpx.TimeoutException:
    print("Request timed out after 5 seconds")

client.close()
Created post with ID: 101 Form post status: 200 Request timed out after 5 seconds

When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.

Exponential backoff: be patient, then exponentially more patient

Exponential backoff: be patient, then exponentially more patient

Using Async with httpx

This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.

# async_requests.py
import asyncio
import httpx

async def fetch_posts(user_id):
    """Fetch posts for a specific user"""
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
        )
        return response.json()

async def fetch_multiple_users():
    """Fetch posts for multiple users concurrently"""
    tasks = [
        fetch_posts(user_id)
        for user_id in range(1, 6)
    ]
    results = await asyncio.gather(*tasks)

    for i, posts in enumerate(results, 1):
        print(f"User {i}: {len(posts)} posts")

# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts User 2: 10 posts User 3: 9 posts User 4: 9 posts User 5: 9 posts

The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.

# async_with_timeout.py
import asyncio
import httpx

async def fetch_with_timeout():
    """Fetch with explicit timeout configuration"""
    timeout = httpx.Timeout(10.0)  # 10 second timeout for all operations

    async with httpx.AsyncClient(timeout=timeout) as client:
        try:
            response = await client.get(
                "https://jsonplaceholder.typicode.com/posts/1"
            )
            print(f"Success: {response.status_code}")
        except httpx.TimeoutException:
            print("Request timed out")
        except httpx.RequestError as e:
            print(f"Network error: {e}")

asyncio.run(fetch_with_timeout())
Success: 200

Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.

HTTP/2 Support

HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.

# http2_support.py
import httpx

# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")

# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()

# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2 HTTP Version (forced 1.1): HTTP/1.1 HTTP Version (with HTTP/2): HTTP/2

The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.

Decorators beat manual retry loops every time

Decorators beat manual retry loops every time

Timeouts and Error Handling

Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.

# error_handling.py
import httpx

def fetch_with_fallback(url, fallback_url):
    """Fetch from primary URL with fallback"""
    try:
        response = httpx.get(url, timeout=5.0)
        response.raise_for_status()  # Raise exception for bad status codes
        return response.json()
    except httpx.TimeoutException:
        print(f"Timeout on {url}, trying fallback")
        return httpx.get(fallback_url).json()
    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e.response.status_code}")
        raise
    except httpx.RequestError as e:
        print(f"Request error: {e}")
        raise

try:
    data = fetch_with_fallback(
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2"
    )
    print(f"Fetched post: {data['title']}")
except Exception as e:
    print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…

httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.

# advanced_timeouts.py
import httpx

# Granular timeout control
timeout = httpx.Timeout(
    timeout=10.0,  # Default timeout
    connect=5.0,   # Connection timeout
    read=10.0,     # Read timeout
    write=10.0,    # Write timeout
    pool=10.0      # Connection pool timeout
)

client = httpx.Client(timeout=timeout)

# Override timeout for specific request
try:
    response = client.get(
        "https://httpbin.org/delay/2",
        timeout=2.0  # Override client timeout
    )
    print(f"Response: {response.status_code}")
except httpx.TimeoutException:
    print("Custom timeout exceeded")
finally:
    client.close()
Response: 200

Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.

Streaming Responses

When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.

# streaming_responses.py
import httpx

# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
    print(f"Status: {response.status_code}")
    print(f"Content-Length: {response.headers.get('content-length')}")

    # Process response in chunks
    for chunk in response.iter_bytes(chunk_size=256):
        print(f"Received {len(chunk)} bytes")

# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
    for line in response.iter_lines():
        if line:
            print(f"Line: {line[:50]}...")

# Async streaming
import asyncio

async def async_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
            async for chunk in response.aiter_bytes(chunk_size=128):
                print(f"Async received {len(chunk)} bytes")

asyncio.run(async_stream())
Status: 200 Content-Length: 1024 Received 256 bytes Received 256 bytes Received 256 bytes Received 256 bytes Line: {“slideshow”: {“author”: “Yours Truly”, “title”: … Async received 128 bytes Async received 128 bytes Async received 128 bytes Async received 128 bytes

Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.

Know your limits, respect the headers, fetch responsibly

Know your limits, respect the headers, fetch responsibly

Real-Life Example: Async API Data Aggregator

Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.

# api_aggregator.py
import asyncio
import httpx
from datetime import datetime

class APIAggregator:
    """Aggregates data from multiple APIs concurrently"""

    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.timeout = httpx.Timeout(10.0)

    async def fetch_post(self, client, post_id):
        """Fetch a single post"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/posts/{post_id}",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching post {post_id}: {e}")
            return None

    async def fetch_user_comments(self, client, user_id):
        """Fetch comments for a user"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching comments: {e}")
            return []

    async def aggregate(self):
        """Aggregate data from multiple endpoints"""
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            # Fetch posts concurrently
            post_tasks = [
                self.fetch_post(client, i)
                for i in range(1, 6)
            ]
            posts = await asyncio.gather(*post_tasks)

            # Fetch comments concurrently
            comment_tasks = [
                self.fetch_user_comments(client, i)
                for i in range(1, 3)
            ]
            comments = await asyncio.gather(*comment_tasks)

            return {
                "timestamp": datetime.now().isoformat(),
                "posts_fetched": len([p for p in posts if p]),
                "total_comments": sum(len(c) for c in comments if c),
                "sample_post": posts[0] if posts else None
            }

# Run the aggregator
async def main():
    aggregator = APIAggregator()
    results = await aggregator.aggregate()

    print(f"Aggregation completed at {results['timestamp']}")
    print(f"Posts fetched: {results['posts_fetched']}")
    print(f"Total comments: {results['total_comments']}")
    print(f"First post title: {results['sample_post']['title']}")

asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456 Posts fetched: 5 Total comments: 10 First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit

This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.

Frequently Asked Questions

Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.

Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.

Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.

Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.

Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.

Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.

Conclusion

httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.

Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.

  • How To Handle Asynchronous Programming in Python with asyncio
  • Understanding HTTP Status Codes: A Complete Guide
  • Building REST APIs with FastAPI and Python
  • Web Scraping with Python: Best Practices and Tools
  • Error Handling and Logging in Production Python Applications
How To Use Python httpx for Modern HTTP Requests

How To Use Python httpx for Modern HTTP Requests

Skill Level: Intermediate

Introduction to Modern HTTP Requests

For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.

Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.

In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.

Quick Example: GET Request in 5 Lines

# quick_get.py
import httpx

response = httpx.get("https://jsonplaceholder.typicode.com/posts/1")
print(response.status_code)
print(response.json())
200 {‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}

That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.

httpx handles synchronous and asynchronous requests from a single library

httpx handles synchronous and asynchronous requests from a single library

What is httpx and Why Use It?

httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.

The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.

Let’s compare httpx to other HTTP libraries in the Python ecosystem:

Feature httpx requests aiohttp urllib3
Synchronous API Yes Yes No Yes
Async/Await Support Yes No Yes No
HTTP/2 Yes No Yes No
Connection Pooling Yes Yes Yes Yes
Streaming Support Yes Yes Yes Yes
API Complexity Low Low Medium High
Drop-in requests Replacement Mostly Yes N/A No No

httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.

Installing httpx

Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.

# install_httpx.sh
pip install httpx
Collecting httpx Downloading httpx-0.25.2-py3-none-any.whl (75 kB) Installing collected packages: httpx, certifi, sniffio, anyio Successfully installed httpx-0.25.2

If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.

# install_httpx_with_http2.sh
pip install httpx[http2]
Collecting httpx[http2] Downloading httpx-0.25.2-py3-none-any.whl Collecting h2 Downloading h2-4.1.0-py3-none-any.whl (65 kB) Successfully installed httpx, h2

That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.

POST, GET, PUT, DELETE -- all methods work seamlessly with httpx

POST, GET, PUT, DELETE — all methods work seamlessly with httpx

Making GET Requests

GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.

# get_requests.py
import httpx

# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")

# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")

# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200 Content-Type: application/json First user: Leanne Graham Posts for user 1: 10 {‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}

Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.

Making POST Requests

POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.

# post_requests.py
import httpx

# POST with JSON data
client = httpx.Client()
data = {
    "title": "New Post",
    "body": "This is a test post",
    "userId": 1
}
response = client.post(
    "https://jsonplaceholder.typicode.com/posts",
    json=data
)
print(f"Created post with ID: {response.json()['id']}")

# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
    "https://httpbin.org/post",
    data=form_data
)
print(f"Form post status: {response.status_code}")

# POST with custom timeout
try:
    response = client.post(
        "https://httpbin.org/delay/10",
        timeout=5.0
    )
except httpx.TimeoutException:
    print("Request timed out after 5 seconds")

client.close()
Created post with ID: 101 Form post status: 200 Request timed out after 5 seconds

When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.

Connection pooling keeps your HTTP clients fast and memory-efficient

Connection pooling keeps your HTTP clients fast and memory-efficient

Using Async with httpx

This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.

# async_requests.py
import asyncio
import httpx

async def fetch_posts(user_id):
    """Fetch posts for a specific user"""
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
        )
        return response.json()

async def fetch_multiple_users():
    """Fetch posts for multiple users concurrently"""
    tasks = [
        fetch_posts(user_id)
        for user_id in range(1, 6)
    ]
    results = await asyncio.gather(*tasks)

    for i, posts in enumerate(results, 1):
        print(f"User {i}: {len(posts)} posts")

# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts User 2: 10 posts User 3: 9 posts User 4: 9 posts User 5: 9 posts

The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.

# async_with_timeout.py
import asyncio
import httpx

async def fetch_with_timeout():
    """Fetch with explicit timeout configuration"""
    timeout = httpx.Timeout(10.0)  # 10 second timeout for all operations

    async with httpx.AsyncClient(timeout=timeout) as client:
        try:
            response = await client.get(
                "https://jsonplaceholder.typicode.com/posts/1"
            )
            print(f"Success: {response.status_code}")
        except httpx.TimeoutException:
            print("Request timed out")
        except httpx.RequestError as e:
            print(f"Network error: {e}")

asyncio.run(fetch_with_timeout())
Success: 200

Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.

HTTP/2 Support

HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.

# http2_support.py
import httpx

# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")

# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()

# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2 HTTP Version (forced 1.1): HTTP/1.1 HTTP Version (with HTTP/2): HTTP/2

The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.

HTTP/2 multiplexing lets httpx handle multiple requests on one connection

HTTP/2 multiplexing lets httpx handle multiple requests on one connection

Timeouts and Error Handling

Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.

# error_handling.py
import httpx

def fetch_with_fallback(url, fallback_url):
    """Fetch from primary URL with fallback"""
    try:
        response = httpx.get(url, timeout=5.0)
        response.raise_for_status()  # Raise exception for bad status codes
        return response.json()
    except httpx.TimeoutException:
        print(f"Timeout on {url}, trying fallback")
        return httpx.get(fallback_url).json()
    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e.response.status_code}")
        raise
    except httpx.RequestError as e:
        print(f"Request error: {e}")
        raise

try:
    data = fetch_with_fallback(
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2"
    )
    print(f"Fetched post: {data['title']}")
except Exception as e:
    print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…

httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.

# advanced_timeouts.py
import httpx

# Granular timeout control
timeout = httpx.Timeout(
    timeout=10.0,  # Default timeout
    connect=5.0,   # Connection timeout
    read=10.0,     # Read timeout
    write=10.0,    # Write timeout
    pool=10.0      # Connection pool timeout
)

client = httpx.Client(timeout=timeout)

# Override timeout for specific request
try:
    response = client.get(
        "https://httpbin.org/delay/2",
        timeout=2.0  # Override client timeout
    )
    print(f"Response: {response.status_code}")
except httpx.TimeoutException:
    print("Custom timeout exceeded")
finally:
    client.close()
Response: 200

Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.

Streaming Responses

When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.

# streaming_responses.py
import httpx

# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
    print(f"Status: {response.status_code}")
    print(f"Content-Length: {response.headers.get('content-length')}")

    # Process response in chunks
    for chunk in response.iter_bytes(chunk_size=256):
        print(f"Received {len(chunk)} bytes")

# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
    for line in response.iter_lines():
        if line:
            print(f"Line: {line[:50]}...")

# Async streaming
import asyncio

async def async_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
            async for chunk in response.aiter_bytes(chunk_size=128):
                print(f"Async received {len(chunk)} bytes")

asyncio.run(async_stream())
Status: 200 Content-Length: 1024 Received 256 bytes Received 256 bytes Received 256 bytes Received 256 bytes Line: {“slideshow”: {“author”: “Yours Truly”, “title”: … Async received 128 bytes Async received 128 bytes Async received 128 bytes Async received 128 bytes

Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.

Streaming responses keep memory usage constant regardless of file size

Streaming responses keep memory usage constant regardless of file size

Real-Life Example: Async API Data Aggregator

Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.

# api_aggregator.py
import asyncio
import httpx
from datetime import datetime

class APIAggregator:
    """Aggregates data from multiple APIs concurrently"""

    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.timeout = httpx.Timeout(10.0)

    async def fetch_post(self, client, post_id):
        """Fetch a single post"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/posts/{post_id}",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching post {post_id}: {e}")
            return None

    async def fetch_user_comments(self, client, user_id):
        """Fetch comments for a user"""
        try:
            response = await client.get(
                f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            print(f"Error fetching comments: {e}")
            return []

    async def aggregate(self):
        """Aggregate data from multiple endpoints"""
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            # Fetch posts concurrently
            post_tasks = [
                self.fetch_post(client, i)
                for i in range(1, 6)
            ]
            posts = await asyncio.gather(*post_tasks)

            # Fetch comments concurrently
            comment_tasks = [
                self.fetch_user_comments(client, i)
                for i in range(1, 3)
            ]
            comments = await asyncio.gather(*comment_tasks)

            return {
                "timestamp": datetime.now().isoformat(),
                "posts_fetched": len([p for p in posts if p]),
                "total_comments": sum(len(c) for c in comments if c),
                "sample_post": posts[0] if posts else None
            }

# Run the aggregator
async def main():
    aggregator = APIAggregator()
    results = await aggregator.aggregate()

    print(f"Aggregation completed at {results['timestamp']}")
    print(f"Posts fetched: {results['posts_fetched']}")
    print(f"Total comments: {results['total_comments']}")
    print(f"First post title: {results['sample_post']['title']}")

asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456 Posts fetched: 5 Total comments: 10 First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit

This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.

Frequently Asked Questions

Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.

Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.

Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.

Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.

Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.

Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.

Conclusion

httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.

Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.

  • How To Handle Asynchronous Programming in Python with asyncio
  • Understanding HTTP Status Codes: A Complete Guide
  • Building REST APIs with FastAPI and Python
  • Web Scraping with Python: Best Practices and Tools
  • Error Handling and Logging in Production Python Applications
How To Hash Passwords Safely in Python with bcrypt

How To Hash Passwords Safely in Python with bcrypt

Skill Level: Intermediate

Why Password Security Matters in Your Applications

Every year, millions of passwords are exposed in data breaches. When companies store passwords in plain text or with weak encryption methods, attackers who gain access to the database can immediately use those credentials against users. A single security oversight can compromise not just one user, but your entire application’s integrity and your reputation. The good news? Protecting passwords isn’t complicated — bcrypt makes it remarkably straightforward to implement enterprise-grade password security in your Python applications today.

Bcrypt is a purpose-built password hashing library that automatically handles the complexity of secure password storage. Unlike generic hashing functions, bcrypt incorporates salt generation, adaptive work factors, and built-in protections against common attacks. Whether you’re building a small project or scaling to millions of users, bcrypt gives you the same security guarantees with just a few lines of code.

In this guide, we’ll explore how bcrypt works from first principles, then move through practical implementations including user registration, password verification, and real-world patterns you can use immediately. By the end, you’ll understand why bcrypt is the industry standard and how to integrate it into your projects with confidence.

Quick Example: Hash and Verify in 10 Lines

Before diving into the theory, let’s see bcrypt in action. This example demonstrates the complete workflow — hashing a password and then verifying a user’s input against that stored hash. It’s this simple and this secure.

# quick_hash.py
import bcrypt

password = "MySecurePassword123!"
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
print("Hashed:", hashed)

user_input = "MySecurePassword123!"
is_valid = bcrypt.checkpw(user_input.encode(), hashed)
print("Password match:", is_valid)
Hashed: b'$2b$12$abcdefghijklmnopqrstuvwxyzAbCdEfGhIjKlMnOpQrStUvWxYz'
Password match: True

That’s the essence of password hashing with bcrypt. The hashpw() function handles salt generation and hashing internally, while checkpw() verifies inputs without needing to decrypt anything. This simplicity masks sophisticated security mechanisms working behind the scenes.

What is Password Hashing and Why Use bcrypt?

Password hashing is a one-way cryptographic function that transforms a plaintext password into an irreversible string of characters. The database stores only the hash, not the original password. When a user logs in, the application hashes their input and compares it to the stored hash — if they match, the password is correct. If attackers compromise the database, they get hashes, not passwords.

Not all hashing functions are suitable for passwords. General-purpose algorithms like MD5, SHA1, and even SHA256 are too fast — an attacker with access to your password hashes can run billions of guesses per second using GPU clusters. Bcrypt solves this by intentionally being slow and computationally expensive, making brute-force attacks impractical. Here’s how different approaches compare:

Method Speed Salt Adaptive Security Level
Plaintext Instant None No Catastrophic
MD5 ~1M hashes/sec Optional No Broken
SHA256 ~100M hashes/sec Optional No Weak for passwords
Bcrypt ~5 hashes/sec (configurable) Automatic Yes Industry standard

Bcrypt’s adaptive cost factor is crucial — as hardware becomes faster, you can increase the work factor to keep password cracking slow. A hash that takes 0.2 seconds to compute today will still take 0.2 seconds tomorrow, even as computers improve. This forward-looking design is why bcrypt remains relevant decades after its creation.

Installing bcrypt

Bcrypt requires a simple pip installation. It’s a compiled C extension for performance, and the package handles all dependency management across operating systems. Most Python environments will install it without issues, though some systems may need development headers for the C compiler.

# install_bcrypt.py
import subprocess
import sys

# Install bcrypt
subprocess.check_call([sys.executable, "-m", "pip", "install", "bcrypt"])

# Verify installation
import bcrypt
print("Bcrypt version:", bcrypt.__version__)
print("Installation successful!")
Collecting bcrypt
  Downloading bcrypt-4.1.2-cp312-cp312-linux_x86_64.whl
Installing collected packages: bcrypt
Successfully installed bcrypt-4.1.2
Bcrypt version: 4.1.2
Installation successful!

Once installed, bcrypt is immediately available for import. We recommend adding bcrypt to your project’s requirements.txt or pyproject.toml file so it’s installed automatically for anyone who clones your repository. The package is maintained actively and released regularly with security updates, so keep it current with pip install --upgrade bcrypt periodically.

Investigating hash functions like a crime scene -- following the trail that only goes one way

Investigating hash functions like a crime scene — following the trail that only goes one way

Hashing a Password

The hashing process in bcrypt is deceptively simple from a user perspective, but implements several sophisticated techniques internally. When you call hashpw(), bcrypt generates a cryptographically random salt, applies the Blowfish cipher multiple times based on the cost factor, and returns a single string containing all the information needed to verify passwords later.

Here’s the standard pattern for creating password hashes when users register or change their password:

# hash_password_demo.py
import bcrypt

def hash_password(password: str) -> str:
    """
    Hash a plaintext password using bcrypt.

    Args:
        password: The user's plaintext password

    Returns:
        The bcrypt hash as a string (can be stored in database)
    """
    # Encode string to bytes (bcrypt requires bytes)
    password_bytes = password.encode('utf-8')

    # Generate salt and hash in one operation
    # Default cost factor is 12
    hashed = bcrypt.hashpw(password_bytes, bcrypt.gensalt())

    # Return as string for database storage
    return hashed.decode('utf-8')

# Example usage
user_password = "SecurePass123!"
stored_hash = hash_password(user_password)
print(f"Original: {user_password}")
print(f"Stored:   {stored_hash}")
print(f"Length:   {len(stored_hash)} characters")
Original: SecurePass123!
Stored:   $2b$12$QaNu.rBvNp8zXrLbSKc.MOaVfVL6qfKfhIU3SYvKzGdpLp4I8TS6O
Length:   60 characters

Every hash is exactly 60 characters long, making it database-friendly to store in a VARCHAR(60) column. The hash contains embedded information: the algorithm identifier ($2b$), the cost factor ($12$), and the salt. You never need to store these separately — everything is self-contained in that single string.

A critical consideration: passwords must be encoded to bytes before hashing. Python 3 strings are Unicode by default, but bcrypt operates on bytes. Always use .encode('utf-8') or specify the encoding explicitly. Different encodings will produce different hashes, which is why consistency matters — the verification step must use the same encoding.

Verifying a Password

When a user logs in, you don’t decrypt the stored hash or compare strings directly. Instead, you hash the incoming password and let bcrypt compare the hashes internally using timing-safe comparison. This prevents timing attacks where attackers measure response times to gain information about the hash.

Here’s how to verify passwords securely:

# verify_password_demo.py
import bcrypt

def verify_password(provided_password: str, stored_hash: str) -> bool:
    """
    Verify a plaintext password against a bcrypt hash.

    Args:
        provided_password: Password the user provided at login
        stored_hash: The bcrypt hash from the database

    Returns:
        True if the password matches, False otherwise
    """
    # Encode both to bytes
    provided_bytes = provided_password.encode('utf-8')
    stored_bytes = stored_hash.encode('utf-8')

    # checkpw handles timing-safe comparison internally
    return bcrypt.checkpw(provided_bytes, stored_bytes)

# Example: Simulating a login attempt
stored_hash = "$2b$12$QaNu.rBvNp8zXrLbSKc.MOaVfVL6qfKfhIU3SYvKzGdpLp4I8TS6O"

# Correct password
attempt1 = "SecurePass123!"
print(f"Attempt 1 ('{attempt1}'): {verify_password(attempt1, stored_hash)}")

# Wrong password
attempt2 = "WrongPassword123"
print(f"Attempt 2 ('{attempt2}'): {verify_password(attempt2, stored_hash)}")

# Close but not exact
attempt3 = "SecurePass123"
print(f"Attempt 3 ('{attempt3}'): {verify_password(attempt3, stored_hash)}")
Attempt 1 ('SecurePass123!'): True
Attempt 2 ('WrongPassword123'): False
Attempt 3 ('SecurePass123'): False

Notice that even a single character difference results in verification failure. Bcrypt’s comparison is exact — there’s no “close enough” or partial matches. This is intentional: password verification must be binary.

The checkpw() function uses constant-time comparison, meaning it takes the same amount of time regardless of where a mismatch occurs. A naive string comparison might return immediately upon finding the first different character, but bcrypt compares the entire hash every time. This prevents attackers from using timing measurements to guess parts of the hash.

Following the principle of least privilege -- storing only what you absolutely need

Following the principle of least privilege — storing only what you absolutely need

Understanding Salt and Work Factor

Bcrypt’s security relies on two mechanisms working together: salt and cost factor. The salt is a random string added to each password before hashing, ensuring identical passwords produce different hashes. Without salt, an attacker could create a rainbow table — a precomputed mapping of common passwords to their hashes — and look up compromised passwords instantly.

The cost factor (work factor) determines how many rounds of hashing bcrypt performs. A higher cost factor makes hashing slower, which also makes brute-force attacks slower. Each increment doubles the computational cost. Let’s explore these concepts:

# salt_cost_demo.py
import bcrypt
import time

password = "TestPassword123"
password_bytes = password.encode('utf-8')

# Demonstrate different cost factors
print("Cost Factor Performance Analysis:")
print("-" * 50)

for cost in [10, 12, 14]:
    start = time.time()
    hashed = bcrypt.hashpw(password_bytes, bcrypt.gensalt(rounds=cost))
    elapsed = time.time() - start

    print(f"Cost {cost}: {elapsed:.3f}s - Hash: {hashed.decode()[:30]}...")

print("\nDifferent salts, same password:")
print("-" * 50)

# Same password hashed multiple times produces different results
for i in range(3):
    hashed = bcrypt.hashpw(password_bytes, bcrypt.gensalt())
    print(f"Hash {i+1}: {hashed.decode()}")
Cost Factor Performance Analysis:
--------------------------------------------------
Cost 10: 0.089s - Hash: $2b$10$abc123defghijklmnopqrst...
Cost 12: 0.324s - Hash: $2b$12$def456ghijklmnopqrstuv...
Cost 14: 1.274s - Hash: $2b$14$ghi789jklmnopqrstuvwx...

Different salts, same password:
--------------------------------------------------
Hash 1: $2b$12$aBcDeFgHiJkLmNoPqRsT.uVwXyZ123456789012345678901a
Hash 2: $2b$12$BcDeFgHiJkLmNoPqRsT.uVwXyZ123456789012345678901aB
Hash 3: $2b$12$CdEfGhIjKlMnOpQrStUv.wXyZ123456789012345678901aBc

Notice how cost factor 10 is fast but cost factor 14 takes over a second. In production, you need to balance security with user experience — a login taking 5 seconds would frustrate users. The default cost of 12 (0.2-0.3 seconds) is a proven sweet spot for most applications. As hardware improves over time, you can increase the cost factor in your code to maintain the same hashing duration.

Each hash includes its salt and cost factor as part of the output string. The format is: $2b$COST$SALT_AND_HASH. When you verify a password with checkpw(), bcrypt extracts this information from the stored hash automatically, ensuring verification uses the same parameters as the original hashing. You never need to manage salt or cost separately.

Adjusting the Cost Factor

While the default cost of 12 works well, production applications often adjust this based on their specific needs. Services with high login volume might use cost 10 to keep latency low, while security-critical applications might use cost 13 or 14. The key is measuring and balancing performance against security.

Here’s how to implement configurable cost factors and test performance in your application:

# configurable_cost.py
import bcrypt
import time
from typing import Optional

class PasswordManager:
    """Manages password hashing with configurable cost factor."""

    def __init__(self, cost_factor: int = 12):
        """
        Initialize with a specific cost factor.

        Args:
            cost_factor: Bcrypt cost factor (10-12 recommended, max 31)
        """
        if not 4 <= cost_factor <= 31:
            raise ValueError("Cost factor must be between 4 and 31")
        self.cost_factor = cost_factor

    def hash_password(self, password: str) -> str:
        """Hash a password with the configured cost factor."""
        password_bytes = password.encode('utf-8')
        salt = bcrypt.gensalt(rounds=self.cost_factor)
        hashed = bcrypt.hashpw(password_bytes, salt)
        return hashed.decode('utf-8')

    def verify_password(self, password: str, stored_hash: str) -> bool:
        """Verify a password against a stored hash."""
        try:
            password_bytes = password.encode('utf-8')
            stored_bytes = stored_hash.encode('utf-8')
            return bcrypt.checkpw(password_bytes, stored_bytes)
        except ValueError:
            # Invalid hash format
            return False

    def benchmark(self, test_password: str = "TestPassword123") -> float:
        """Measure how long hashing takes with current cost factor."""
        start = time.time()
        self.hash_password(test_password)
        return time.time() - start

# Test different cost factors
print("Finding optimal cost factor:")
print("-" * 50)

for cost in [10, 11, 12, 13]:
    manager = PasswordManager(cost_factor=cost)
    elapsed = manager.benchmark()
    print(f"Cost {cost}: {elapsed:.3f} seconds")

# Use production setting
print("\nProduction PasswordManager:")
print("-" * 50)
pm = PasswordManager(cost_factor=12)
pwd = "MySecurePassword!"
hashed = pm.hash_password(pwd)
print(f"Hash: {hashed}")
print(f"Verify: {pm.verify_password(pwd, hashed)}")
Finding optimal cost factor:
--------------------------------------------------
Cost 10: 0.087 seconds
Cost 11: 0.175 seconds
Cost 12: 0.334 seconds
Cost 13: 0.672 seconds

Production PasswordManager:
--------------------------------------------------
Hash: $2b$12$XyZ789aBcDeFgHiJkLmNo.pQrStUvWxYzAbCdEfGhIjKlMnOpQrSt
Verify: True

This PasswordManager class wraps bcrypt operations and makes cost factor configurable without changing application logic everywhere. You can update the default cost factor in one place, and all password operations use the new value. For applications with mixed cost factors in the database (from previous versions), bcrypt’s automatic parameter extraction handles this — older hashes with cost 10 verify correctly even if your application now uses cost 12.

Measuring twice, hashing once -- balancing security with real-world performance constraints

Measuring twice, hashing once — balancing security with real-world performance constraints

Real-Life Example: User Registration CLI

Let’s build a practical user registration system that demonstrates password hashing in context. This example includes input validation, database simulation, and proper error handling — everything you’d need to adapt for a real application:

# user_registration_system.py
import bcrypt
import json
import os
from typing import Optional, Dict, Any

class UserDatabase:
    """Simulates a user database with file-based storage."""

    def __init__(self, db_file: str = "users.json"):
        self.db_file = db_file
        self._load_database()

    def _load_database(self) -> None:
        """Load users from file or create new database."""
        if os.path.exists(self.db_file):
            with open(self.db_file, 'r') as f:
                self.users = json.load(f)
        else:
            self.users = {}

    def _save_database(self) -> None:
        """Persist users to file."""
        with open(self.db_file, 'w') as f:
            json.dump(self.users, f, indent=2)

    def user_exists(self, username: str) -> bool:
        """Check if username already exists."""
        return username in self.users

    def register_user(self, username: str, email: str,
                     password: str) -> Dict[str, Any]:
        """
        Register a new user with password hashing.

        Returns:
            Dict with status and message
        """
        # Validation
        if self.user_exists(username):
            return {"success": False, "message": "Username already taken"}

        if len(password) < 8:
            return {"success": False,
                   "message": "Password must be 8+ characters"}

        if not email or '@' not in email:
            return {"success": False, "message": "Invalid email address"}

        # Hash password with cost factor 12
        password_bytes = password.encode('utf-8')
        salt = bcrypt.gensalt(rounds=12)
        password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')

        # Store user (never store plaintext password)
        self.users[username] = {
            "email": email,
            "password_hash": password_hash,
            "created": "2026-04-09"
        }
        self._save_database()

        return {"success": True,
               "message": f"User {username} registered successfully"}

    def authenticate_user(self, username: str,
                         password: str) -> Dict[str, Any]:
        """
        Verify username and password.

        Returns:
            Dict with authentication result
        """
        if not self.user_exists(username):
            return {"success": False, "message": "Invalid credentials"}

        user = self.users[username]
        password_bytes = password.encode('utf-8')
        stored_hash = user["password_hash"].encode('utf-8')

        # Use bcrypt to verify
        if bcrypt.checkpw(password_bytes, stored_hash):
            return {"success": True,
                   "message": f"Welcome back, {username}!"}
        else:
            return {"success": False, "message": "Invalid credentials"}

# Interactive CLI registration demo
if __name__ == "__main__":
    db = UserDatabase("demo_users.json")

    print("User Registration System")
    print("=" * 50)

    # Register a new user
    result = db.register_user(
        username="alice_dev",
        email="alice@example.com",
        password="AliceSecure123!"
    )
    print(f"Register: {result['message']}")

    # Attempt duplicate registration
    result = db.register_user(
        username="alice_dev",
        email="alice2@example.com",
        password="DifferentPass123"
    )
    print(f"Duplicate attempt: {result['message']}")

    # Correct login
    result = db.authenticate_user("alice_dev", "AliceSecure123!")
    print(f"Login (correct): {result['message']}")

    # Incorrect login
    result = db.authenticate_user("alice_dev", "WrongPassword123")
    print(f"Login (wrong): {result['message']}")

    # Cleanup
    if os.path.exists("demo_users.json"):
        os.remove("demo_users.json")
User Registration System
==================================================
Register: User alice_dev registered successfully
Duplicate attempt: Username already taken
Login (correct): Welcome back, alice_dev!
Login (wrong): Invalid credentials

This registration system demonstrates critical security patterns: passwords are never stored, only their bcrypt hashes are persisted. The validation ensures reasonable password strength before hashing. The authentication method uses bcrypt’s comparison, so timing attacks won’t reveal information about the hash. You can adapt this structure directly into web frameworks like Flask, Django, or FastAPI by replacing the file-based database with a proper database connection.

Catching attackers in the act -- timing-safe comparisons leave no fingerprints

Catching attackers in the act — timing-safe comparisons leave no fingerprints

Frequently Asked Questions

Q: Can I decrypt a bcrypt hash to see the original password?

No, bcrypt is one-way cryptography by design. There’s no decryption function. This is a feature, not a limitation — if a password is forgotten, the user must reset it, not retrieve it. If someone claims to have recovered your original password from a bcrypt hash, they’re either lying or the system wasn’t actually using bcrypt.

Q: How long does a bcrypt hash stay valid?

The hash itself never expires. A hash created in 2015 will still verify passwords correctly in 2026. However, you can re-hash passwords when users log in with a higher cost factor. For example, migrate users to cost 13 only when they authenticate, avoiding the need to rehash all passwords at once.

Q: Is cost factor 12 always the right choice?

Cost 12 is a solid default that takes about 0.2-0.3 seconds on modern hardware. However, measure your specific use case: if you have thousands of concurrent logins, cost 10 might be necessary; if you’re a high-security application with low login volume, cost 13 or 14 is justified. The OWASP recommendation is to adjust cost so hashing takes 0.5-2 seconds on your target hardware.

Q: What if my bcrypt hash contains non-ASCII characters?

Bcrypt hashes only contain ASCII characters in the format $2a$, $2b$, or $2y$ followed by base-64 encoded characters. If you’re seeing non-ASCII, something went wrong during encoding. Always store bcrypt hashes as UTF-8 strings, and encode/decode carefully at boundaries with your database and external systems.

Q: Can I use the same salt for multiple passwords?

No, and bcrypt prevents this automatically. Every call to bcrypt.gensalt() generates a cryptographically unique salt. Using the same salt for multiple passwords would allow attackers to detect if two users have the same password. Let bcrypt generate a fresh salt for each password.

Q: Should I add my own salt on top of bcrypt’s salt?

No, that’s unnecessary and unlikely to improve security. Bcrypt’s salt is cryptographically sound. Adding additional layers of salting adds complexity without meaningful security benefit, and it could introduce vulnerabilities. Trust bcrypt’s design and focus on protecting your source code and deployment infrastructure instead.

Conclusion

Password hashing with bcrypt is one of the few areas where security and simplicity align perfectly. In just a few lines of code, you get enterprise-grade protection against the most common password attacks. The key patterns — always use bcrypt.hashpw() for storage and bcrypt.checkpw() for verification — should become second nature in any application handling user accounts.

Start with a cost factor of 12 and monitor your application’s performance. As hardware improves in the coming years, you can increase the cost factor to maintain security without redesigning your system. Test your implementation with common passwords and verify that both hashing and verification complete in acceptable timeframes for your users.

For deeper technical details and the latest version of bcrypt, visit the official bcrypt repository on GitHub. The PyCA project maintains bcrypt as part of Python’s cryptographic toolkit, with regular security audits and updates.

Understanding Python Memory Management and Garbage Collection

Understanding Python Memory Management and Garbage Collection

Skill Level: Intermediate

Introduction to Python Memory Management

Memory management is one of the most critical yet often overlooked aspects of Python programming. Unlike languages such as C or C++, Python abstracts away manual memory allocation and deallocation, but understanding how Python manages memory under the hood is essential for writing efficient, scalable applications. Whether you’re building web services, data processing pipelines, or long-running applications, inefficient memory management can lead to performance degradation, excessive resource consumption, and even application crashes.

Python employs several sophisticated mechanisms to manage memory, including reference counting as its primary garbage collection strategy, supplemented by a cyclic garbage collector to handle circular references. Additionally, Python provides memory profiling tools and optimization techniques that allow developers to monitor and improve memory usage. This comprehensive guide explores how Python allocates and manages memory, how garbage collection works, and practical strategies for optimizing memory consumption in your applications.

By the end of this tutorial, you’ll understand how Python’s memory management system operates at a fundamental level, be able to identify and fix memory leaks, profile your applications to find memory hotspots, and implement best practices for memory-efficient Python code. These skills are particularly valuable for developing production-grade applications where resource efficiency directly impacts cost, performance, and user experience.

Quick Example: Memory Tracking in Python

Python memory tracking with tracemalloc
Tracking memory like a hawk because trusting Python to clean up after itself is bold

Before diving deep, let’s see a quick example of how to track memory usage:

import sys
import tracemalloc

# Start tracking memory
tracemalloc.start()

# Create objects
data_list = [i for i in range(1000)]
data_dict = {i: i**2 for i in range(1000)}

# Get memory snapshot
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: {current / 1024:.2f} KB")
print(f"Peak memory usage: {peak / 1024:.2f} KB")

tracemalloc.stop()

# Check object size
print(f"List size: {sys.getsizeof(data_list)} bytes")
print(f"Dict size: {sys.getsizeof(data_dict)} bytes")
Output:
Current memory usage: 45.23 KB
Peak memory usage: 45.67 KB
List size: 9016 bytes
Dict size: 49264 bytes

How Python Allocates Memory

How Python allocates memory on the private heap
The private heap called — it wants you to stop allocating objects you never delete

Python memory allocation happens at multiple levels, from the operating system level down to individual object allocation. When Python starts, it reserves a block of memory from the operating system. This memory is divided into different pools and arenas for efficient allocation and deallocation.

Memory Pools and Arenas: Python uses a memory pool architecture where small objects (smaller than 512 bytes) are allocated from pre-allocated pools. These pools are organized into arenas, which are allocated from the system heap. This approach reduces fragmentation and improves allocation speed compared to direct system calls for every object.

Object Structure: Every Python object has a reference count and type information stored alongside the actual data. The PyObject structure in CPython includes:

# Conceptual representation of a Python object
class PyObject:
    def __init__(self, value):
        self.ob_refcnt = 1  # Reference count
        self.ob_type = type(value)  # Type information
        self.value = value  # Actual data

Memory Layout Example:

import sys

# Demonstrate object memory layout
my_list = [1, 2, 3]
my_dict = {"a": 1, "b": 2}
my_string = "Hello, World!"

print(f"List object size: {sys.getsizeof(my_list)} bytes")
print(f"Dict object size: {sys.getsizeof(my_dict)} bytes")
print(f"String object size: {sys.getsizeof(my_string)} bytes")

# The actual memory usage is larger because of internal structures
print(f"\nActual memory for list contents: {sys.getsizeof(my_list) + sum(sys.getsizeof(x) for x in my_list)} bytes")
Output:
List object size: 56 bytes
Dict object size: 240 bytes
String object size: 32 bytes

Actual memory for list contents: 116 bytes

Understanding Reference Counting

Python reference counting mechanism explained
Reference count hits zero and the object vanishes faster than your weekend plans

Python’s primary garbage collection mechanism is reference counting. Each object maintains a count of how many references point to it. When the reference count drops to zero, the memory is immediately freed. This mechanism is automatic and happens transparently, making it simple for developers but requiring careful attention to avoid circular references.

How Reference Counting Works:

import sys

# Create an object
my_list = [1, 2, 3]
print(f"Initial ref count: {sys.getrefcount(my_list)}")  # At least 1 (from variable)

# Create another reference
another_list = my_list
print(f"After assignment: {sys.getrefcount(my_list)}")  # Now 2

# Function call increments ref count temporarily
def check_refcount(obj):
    return sys.getrefcount(obj)

refcount = check_refcount(my_list)
print(f"Inside function: {refcount}")  # Higher due to function parameter

# Delete reference
del another_list
print(f"After deletion: {sys.getrefcount(my_list)}")  # Back to 1

# Variable goes out of scope
del my_list  # Memory is freed here
Output:
Initial ref count: 2
After assignment: 3
Inside function: 4
After deletion: 2

Reference Counting Advantages and Limitations:

Aspect Advantages Limitations
Memory Freeing Immediate, deterministic Overhead on every assignment
Circular References Simple for non-circular data Cannot handle cycles automatically
Pause Time No stop-the-world pauses Continuous overhead
Performance Predictable for most cases Reference count updates can be slow

Python’s Garbage Collection Module

Python garbage collection handling circular references
Circular references thinking they’re safe until the garbage collector shows up uninvited

While reference counting handles most memory management, Python includes a garbage collector to detect and clean up circular references—situations where objects reference each other and create a cycle that reference counting cannot break.

Understanding Circular References:

# Circular reference example
class Node:
    def __init__(self, value):
        self.value = value
        self.next = None

# Create circular reference
node1 = Node(1)
node2 = Node(2)
node1.next = node2
node2.next = node1  # Creates a cycle

# Even after deleting variables, memory isn't freed without gc
del node1
del node2  # Memory may still be held due to circular reference

How the Garbage Collector Works:

import gc

# Check garbage collection status
print(f"Garbage collection enabled: {gc.isenabled()}")

# Get collection statistics
stats = gc.get_stats()
for stat in stats:
    print(f"Generation {stat['collections']}: {stat}")

# Manually trigger garbage collection
collected = gc.collect()
print(f"Objects collected: {collected}")

# Disable automatic garbage collection for performance-critical code
gc.disable()
# ... performance-critical code ...
gc.collect()  # Manual collection
gc.enable()
Output:
Garbage collection enabled: True
Generation 0: {‘collections’: 142, ‘collected’: 3256, ‘uncollectable’: 0}
Generation 1: {‘collections’: 12, ‘collected’: 256, ‘uncollectable’: 0}
Generation 2: {‘collections’: 1, ‘collected’: 45, ‘uncollectable’: 0}
Objects collected: 0

Generational Garbage Collection: Python’s garbage collector uses generational collection, based on the hypothesis that younger objects are more likely to be garbage than older objects. Objects are divided into three generations (0, 1, and 2), with more frequent collection of younger generations.

import gc

# Get garbage collection thresholds
thresholds = gc.get_threshold()
print(f"Collection thresholds: {thresholds}")

# Set custom thresholds
# gen0_threshold, gen1_threshold, gen2_threshold
gc.set_threshold(700, 10, 10)

# Get current generation stats
for i in range(3):
    count = gc.get_count()[i]
    print(f"Generation {i} object count: {count}")

# Force collection of specific generation
gc.collect(generation=0)
print("Generation 0 collection completed")
Output:
Collection thresholds: (700, 10, 10)
Generation 0 object count: 432
Generation 1 object count: 8
Generation 2 object count: 2
Generation 0 collection completed

Detecting and Preventing Memory Leaks

Memory leaks in Python happen when objects remain referenced long after they are useful. This is common with global caches, circular references in custom data structures, and forgotten event listeners. The tracemalloc module and objgraph library are your best tools for tracking these down.

A practical approach is to take memory snapshots at different points in your application and compare them. If certain object types keep growing between snapshots, you have found your leak. Combined with proper debugging techniques, you can isolate the exact line of code responsible.

import tracemalloc
import gc

tracemalloc.start()

# Take first snapshot
snapshot1 = tracemalloc.take_snapshot()

# Simulate work that might leak
leaked_objects = []
for i in range(10000):
    leaked_objects.append({'data': 'x' * 100, 'index': i})

# Take second snapshot and compare
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')

print("Top memory changes:")
for stat in top_stats[:5]:
    print(stat)

You should also be aware that __del__ finalizer methods can prevent garbage collection of circular references in older Python versions. If you define __del__, Python’s cyclic garbage collector may not be able to determine a safe order to destroy objects that reference each other. The modern best practice is to use weakref for breaking circular references and avoid __del__ entirely when possible.

Detecting and fixing memory leaks in Python
Hunting memory leaks at 2 AM because past you thought gc.collect() was optional

Real-Life Example: Memory-Efficient Data Pipeline

Suppose you need to process a 2 GB CSV file, but your server only has 512 MB of RAM. Loading the entire file into a list would crash your application instantly. Instead, you can use generators and careful memory management to process it in chunks:

import gc
import tracemalloc

tracemalloc.start()

def process_csv_in_chunks(filepath, chunk_size=1000):
    """Process a large CSV file without loading it all into memory."""
    chunk = []
    processed = 0
    
    with open(filepath, 'r') as f:
        header = f.readline().strip().split(',')
        
        for line in f:
            row = dict(zip(header, line.strip().split(',')))
            chunk.append(row)
            
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
                processed += chunk_size
                
                # Force garbage collection every 10k rows
                if processed % 10000 == 0:
                    gc.collect()
        
        if chunk:
            yield chunk

# Usage
for batch in process_csv_in_chunks('transactions.csv'):
    # Process each batch - only chunk_size rows in memory at a time
    totals = sum(float(row.get('amount', 0)) for row in batch)
    print(f"Batch total: {totals}")

# Check peak memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory: {current / 1024 / 1024:.1f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()

This pattern keeps memory usage constant regardless of file size. The generator yields one chunk at a time, the previous chunk becomes unreferenced and gets collected, and periodic gc.collect() calls ensure circular references do not accumulate. This is the same approach used in production data pipelines at companies processing millions of records daily. For even better performance, you can combine this with Python profiling and optimization techniques to identify exactly where your memory bottlenecks occur.

Frequently Asked Questions

Does Python have manual memory management?

No, Python handles memory allocation and deallocation automatically through its memory manager and garbage collector. However, you can influence the process using gc.collect() to trigger manual garbage collection, gc.disable() to turn off automatic collection, and weakref to create references that do not prevent garbage collection. You cannot directly allocate or free memory like in C or C++.

What causes memory leaks in Python?

The most common causes are: global variables or caches that grow unbounded, circular references between objects with __del__ finalizers (which prevent the cyclic garbage collector from cleaning them up), closures that capture large objects unintentionally, and C extension modules that do not properly release memory. Using tracemalloc to compare snapshots is the most reliable way to track down leaks.

How does Python’s garbage collector handle circular references?

Python’s cyclic garbage collector uses a generational approach with three generations (0, 1, and 2). New objects start in generation 0, and objects that survive collection are promoted to older generations. The collector detects reference cycles by temporarily removing all internal references between container objects and checking which objects become unreachable. This process runs automatically when the threshold for a generation is exceeded.

Should I call gc.collect() manually?

In most applications, you do not need to call gc.collect() manually — Python’s automatic collection works well for typical workloads. However, calling it manually is useful in specific scenarios: after deleting a large number of objects, during natural pauses in a long-running process, or when you need predictable memory usage in a memory-constrained environment. Avoid calling it in tight loops, as collection itself has a performance cost.

What is the difference between reference counting and garbage collection?

Reference counting is Python’s primary memory management mechanism — every object tracks how many references point to it, and the object is immediately freed when the count reaches zero. Garbage collection is the secondary mechanism that handles cases reference counting cannot: specifically, circular references where two or more objects reference each other and their counts never reach zero. Both work together to keep memory usage efficient. Understanding these mechanisms is also useful when working with Python 3.13’s free-threaded mode, which changes how reference counting works in concurrent code.

How can I monitor memory usage in a Python application?

Python provides several built-in and third-party tools: tracemalloc (built-in) traces memory allocations with source file and line information, sys.getsizeof() returns the size of individual objects, gc.get_objects() lists all tracked objects, and the third-party objgraph library visualizes object reference graphs. For production monitoring, psutil tracks overall process memory from outside the Python runtime. Combining tracemalloc snapshots with proper exception handling ensures you capture memory data even when errors occur.

Conclusion

Python’s memory management system is a carefully designed partnership between reference counting, the cyclic garbage collector, and the private heap allocator. Reference counting handles the majority of object cleanup instantly, while the generational garbage collector sweeps up circular references that reference counting misses. Together, they free you from manual memory management while still giving you tools like tracemalloc, gc, and weakref to monitor and control memory when performance demands it.

The key takeaways are: use generators and iterators for large datasets instead of loading everything into lists, watch out for circular references especially when defining __del__ methods, use tracemalloc to diagnose memory issues before they become production incidents, and understand that type hints combined with static analysis tools can help catch patterns that lead to memory issues early. Memory management might happen behind the scenes, but understanding how it works makes you a fundamentally better Python developer.

How To Debug Python Code Like a Pro

How To Debug Python Code Like a Pro

How To Debug Python Code Like a Pro

Skill Level: Intermediate

Debugging is an essential skill for every Python developer. Whether you’re tracking down a subtle logic error, identifying memory leaks, or understanding why your code behaves unexpectedly in production, having a solid debugging toolkit can save you hours of frustration. This comprehensive guide walks you through professional-grade debugging techniques, from simple print statements to advanced IDE features and logging strategies.

The journey from casual debugging involves understanding the right tool for each situation. Some developers rely entirely on print statements, while others prefer strategic logging. Effective debugging requires a multi-faceted approach: knowing when to use print statements for quick checks, when to deploy the interactive debugger for deep inspection, and when logging is your best friend for production issues.

Throughout this article, we’ll explore practical examples using Python’s standard library tools, popular IDEs, and battle-tested patterns that professional development teams use daily.

Debug Dee in home office
Your debugger is worth a thousand print statements. Learn to use it.

Print Debugging vs. Professional Debuggers

The Case for Print Statements

Print debugging is often dismissed by purists, but it’s actually a legitimate technique for certain scenarios. When you need quick answers about variable values at specific points in your code, a strategically placed print statement can give you instant feedback.

def calculate_discount(price, discount_rate):
    print(f\"Input price: {price}, discount_rate: {discount_rate}\")
    discounted = price * (1 - discount_rate)
    return discounted

result = calculate_discount(100, 0.2)
Input price: 100, discount_rate: 0.2

Why Professional Debuggers Matter

Professional debuggers offer capabilities that print statements simply cannot provide. They allow you to pause execution, inspect the entire program state, step through code line by line, and modify variables on the fly.

Python’s Built-in pdb Debugger

Python includes the pdb (Python Debugger) module, a powerful interactive debugging tool. Use pdb.set_trace() to insert breakpoints. Once paused, inspect variables, step through code with ‘n’ (next), ‘s’ (step), ‘c’ (continue), or ‘p variable’ (print).

Sudo Sam at vintage computer terminal
pdb turns debugging from guesswork into systematic exploration.

IDE Debugging: VS Code and PyCharm

Visual Studio Code

VS Code’s Python extension provides full-featured graphical debugging. Create a launch configuration in .vscode/launch.json and set breakpoints by clicking line margins. VS Code displays variables in the sidebar and allows inline inspection via hover.

PyCharm’s Advanced Features

PyCharm offers Data Inspector for viewing complex structures, Evaluate Expression for running code immediately, Conditional Breakpoints, Logpoint for non-stopping messages, and Remote Debugging support.

Loop Larry confused with errors
Get systematic with your debugging approach.

The Logging Module for Production

Python’s logging module is far superior to print statements for production code. Use appropriate log levels: DEBUG for diagnostics, INFO for confirmation, WARNING for unexpected events, ERROR for serious problems, CRITICAL for system failures.

Structured logging (JSON format) enables easier analysis in log aggregation systems, turning a million log lines into actionable insights.

Pyro Pete with logging dashboard
Structured logging turns chaos into searchable truth.

Debugging Patterns

Rubber Duck Debugging

Explain your code line-by-line to an inanimate object. This forces you to articulate assumptions and often reveals logical errors you missed.

Assertion-Based Debugging

Assertions validate preconditions and catch violations early. Remember they’re disabled in production with the -O flag, so use for development only.

Context Managers

Create reusable debugging utilities using context managers to log execution time without cluttering main code.

Cache Katie with magnifying glass
Right patterns turn logs into insights.

Reading Tracebacks

Always read tracebacks from bottom to top. The innermost frame usually indicates the actual error. Tracebacks show error type, message, and execution chain with line numbers pointing to the problem.

Different Environments

Local Development

Use full debugging capabilities, verbose logging, and interactive debuggers for maximum visibility.

Production Safety

Can’t attach debuggers to running servers. Rely on comprehensive logging to files and external services. Avoid logging sensitive data.

FAQ

Q1: Debug multi-threaded code?

Use thread-safe logging with thread identifiers. Avoid print statements. Consider using threading.Lock() to control execution order during debugging.

Q2: Debug remote code?

Most IDEs support remote debugging. VS Code and PyCharm allow connecting to Python processes on other machines.

Q3: Find memory leaks?

Use tracemalloc module to track memory allocation and show top allocators in your code.

Q4: Debugging vs profiling?

Debugging finds why code is broken. Profiling measures performance. Use debugging when code fails; profiling when code is correct but slow.

Q5: Debug in Docker?

Run Python with unbuffered output (-u flag), map debugger ports for IDE debugging, or rely on logging. VS Code Remote Containers extension helps locally.

Q6: Leave debug code in production?

Never leave breakpoint() calls. Remove pdb.set_trace(). Logging is appropriate but use right levels and never log sensitive data.

Conclusion

Professional debugging separates junior from experienced engineers. Master the spectrum: print statements for iteration, pdb for exploration, logging for production visibility, and assertions for early issue catching. Develop intuition through practice on real projects. Debugging isn’t failure—it’s inevitable in development. Efficient debugging means more time building features.

Official Python Resources

Related Articles

pdb: The Built-In Debugger

Python ships with an interactive debugger you don’t have to install. Drop breakpoint() into any line and the script pauses there, dropping you into a REPL with full access to local variables:

# example.py
def transform(items):
    result = []
    for i in items:
        breakpoint()       # script stops here, pdb prompt appears
        result.append(i * 2)
    return result

transform([1, 2, 3])

The pdb prompt accepts: n (next line), s (step into function), c (continue to next breakpoint), p var (print value), l (list source around current line), w (show stack trace), q (quit). Just typing a variable name evaluates it.

For Python 3.7+, breakpoint() is preferred over the older import pdb; pdb.set_trace() — it respects the PYTHONBREAKPOINT env var, so you can swap debuggers without editing source. Set PYTHONBREAKPOINT=ipdb.set_trace to use ipdb (better tab completion, colors); set PYTHONBREAKPOINT=0 to disable all breakpoints in production.

Post-Mortem Debugging

When a script crashes, you can drop into pdb at the moment of failure to inspect what went wrong — without modifying the source:

# Run the script normally
python script.py
# (it crashes)

# Now run it under post-mortem pdb
python -m pdb script.py
# at the prompt: c (continue) — runs until exception
# then automatically lands you at the crash point with the stack intact

# Or trigger post-mortem from inside code:
import pdb, sys, traceback
try:
    risky_operation()
except Exception:
    traceback.print_exc()
    pdb.post_mortem()

Post-mortem is the single most underused debugging technique. Beats adding print statements after the fact and re-running.

logging vs print()

For anything more complex than a script, replace print() with logging. The difference matters: logging has levels (DEBUG, INFO, WARNING, ERROR, CRITICAL), can be routed to files / syslog / cloud, includes timestamps and call sites, and can be tuned per module without code changes:

import logging

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] %(name)s:%(lineno)d - %(message)s",
)
logger = logging.getLogger(__name__)

def fetch_users(account_id: int):
    logger.debug("fetch_users called with account_id=%s", account_id)
    try:
        users = db.query(...)
        logger.info("fetched %s users", len(users))
        return users
    except Exception:
        logger.exception("fetch_users failed")  # auto-includes traceback
        raise

logger.exception() inside an except block is the killer feature — it records the message AND the full stack trace, ready to ship to your log aggregator.

IDE Debuggers: VS Code and PyCharm

Graphical debuggers beat pdb for complex sessions. VS Code’s Python extension and PyCharm both give you:

  • Click-to-set breakpoints in the gutter
  • Conditional breakpoints (“break only if x > 100”)
  • Step over / into / out buttons
  • Variable inspector showing all locals (and globals)
  • Watch expressions that auto-evaluate as you step
  • Call stack with click-to-jump-to-frame

For a one-off bug, pdb at the terminal is faster. For a meaty multi-hour debugging session, an IDE debugger pays for itself.

Tracing and Profiling: Beyond Debugging

Sometimes “where is this slow?” or “why is this called 1000 times?” matters more than “what’s the bug?”. For those questions, profiling tools are the right answer:

# Built-in cProfile — function call counts and timings
import cProfile
cProfile.run("expensive_function()", sort="cumulative")

# tracemalloc — memory allocation tracking
import tracemalloc
tracemalloc.start()
result = process_data()
snapshot = tracemalloc.take_snapshot()
for stat in snapshot.statistics("lineno")[:10]:
    print(stat)

# py-spy — sampling profiler that runs against a live process
# pip install py-spy
# Then: py-spy top --pid 12345
# Or:   py-spy record -o profile.svg -- python myscript.py

For production performance debugging, py-spy is unmatched — it attaches to a running process without restarting it, so you can profile a real workload as it happens.

Common Pitfalls

  • Leaving breakpoints in production. A forgotten breakpoint() in a server will block the request thread until somebody types c at the (invisible) prompt. Set PYTHONBREAKPOINT=0 in production env to neutralize them.
  • print() debugging on large data. print(huge_list) prints thousands of lines, scrolling away the useful info. Use pprint with a depth limit, or print just len(huge_list).
  • Catching exceptions too broadly. try: ... except: pass hides bugs forever. Catch specific exceptions and re-raise unexpected ones.
  • Ignoring warnings. Python’s warning system flags deprecated APIs and likely bugs. Run with python -W error in dev to surface them.
  • Debugging multi-threaded code without thread names. Logs from 10 threads interleave into chaos unless you include the thread name in the format: %(threadName)s.

FAQ

Q: pdb, ipdb, pudb, or PyCharm?
A: pdb when you can’t install anything. ipdb for tab completion and colors. pudb for a TUI experience. PyCharm/VS Code when you want a real GUI and have the tooling installed.

Q: How do I debug a script that runs in production?
A: Don’t drop into pdb. Use structured logging + log aggregation + py-spy attach. Production debugging is reading logs and traces, not stepping through code.

Q: How do I debug async code?
A: breakpoint() works inside async functions. The pdb prompt suspends the coroutine. For more complex async debugging, the IDE debuggers handle await stepping much better than pdb.

Q: How do I find which test is slow?
A: pytest --durations=10 shows the 10 slowest tests. Combine with pytest -k "test_name" + cProfile for line-by-line breakdown.

Q: How do I debug memory leaks?
A: tracemalloc with snapshots before/after the suspect code, compare allocation sites. For production, install memray for a heap profile across a real workload.

Wrapping Up

The single biggest debugging improvement most Python developers can make: stop using print() and start using logging + breakpoint(). The second-biggest: learn post-mortem pdb — it turns every crash into a diagnostic session. The IDE debugger is the heavyweight option when those aren’t enough. And for “why is this slow?”, profilers (cProfile, py-spy) answer questions that no amount of stepping through code will reveal.

Python Exception Handling Best Practices

Python Exception Handling Best Practices

Skill Level: Intermediate

Writing Python That Doesn’t Crash at 3 AM

Every Python developer has written code that works perfectly in testing and then explodes in production. The difference between amateur and professional Python code often comes down to one thing: how you handle exceptions. Proper exception handling isn’t about wrapping everything in try/except blocks–it’s about anticipating failure modes, recovering gracefully, and giving yourself enough information to fix problems quickly.

This guide covers everything from basic try/except mechanics to advanced patterns used in production systems. You’ll learn when to catch exceptions, when to let them propagate, how to create custom exception hierarchies, and patterns that will save you hours of debugging. We’re not just covering syntax–we’re covering the thinking behind robust error handling.

Here’s what we’ll work through: the exception hierarchy in Python, try/except/else/finally blocks, catching specific vs broad exceptions, raising and re-raising exceptions, custom exception classes, context managers for cleanup, logging strategies, and real-world patterns for API error handling. By the end, your code will fail gracefully instead of catastrophically.

Quick Example: The Right Way vs The Wrong Way

Before diving deep, here’s the difference between amateur and professional exception handling:

# Bad: Catches everything, hides bugs
try:
    result = process_data(user_input)
except:
    print("Something went wrong")

# Good: Specific, informative, recoverable
try:
    result = process_data(user_input)
except ValidationError as e:
    logger.warning(f"Invalid input from user: {e}")
    return {"error": str(e), "field": e.field_name}, 400
except DatabaseError as e:
    logger.error(f"Database failure processing request: {e}", exc_info=True)
    return {"error": "Service temporarily unavailable"}, 503
except Exception as e:
    logger.critical(f"Unexpected error: {e}", exc_info=True)
    raise

The first example swallows every error silently. The second catches specific exceptions, logs useful context, returns appropriate HTTP status codes, and re-raises unexpected errors so they don’t go unnoticed. That’s the pattern we’re building toward.

Debug Dee examining error message
Reading the actual error message is the most underrated debugging technique in programming.

Understanding Python’s Exception Hierarchy

Python’s exceptions form a class hierarchy rooted at BaseException. Understanding this hierarchy is critical because your except clauses catch the specified exception and all its subclasses.

# The key parts of the hierarchy
BaseException
├── SystemExit          # sys.exit() calls
├── KeyboardInterrupt   # Ctrl+C
├── GeneratorExit       # Generator cleanup
└── Exception           # All "normal" exceptions
    ├── StopIteration
    ├── ArithmeticError
    │   ├── ZeroDivisionError
    │   └── OverflowError
    ├── LookupError
    │   ├── IndexError
    │   └── KeyError
    ├── OSError
    │   ├── FileNotFoundError
    │   ├── PermissionError
    │   └── ConnectionError
    ├── ValueError
    ├── TypeError
    ├── AttributeError
    └── RuntimeError

This is why you should almost never write except BaseException or bare except:–they catch KeyboardInterrupt and SystemExit, preventing users from stopping your program with Ctrl+C. Always catch Exception at most.

Exception When It Occurs Common Cause
ValueError Right type, wrong value int(\”abc\”), invalid arguments
TypeError Wrong type entirely len(42), \”hello\” + 5
KeyError Dict key missing my_dict[\”nonexistent\”]
IndexError List index out of range my_list[999]
AttributeError Object lacks attribute None.split()
FileNotFoundError File doesn’t exist open(\”missing.txt\”)
ConnectionError Network failure requests.get() timeout

Mastering try/except/else/finally

Most developers know try/except. Fewer use else and finally correctly. Each block has a specific purpose, and using all four makes your intent crystal clear.

import json

def load_config(filepath):
    \"\"\"Load and parse a JSON config file with proper error handling\"\"\"
    try:
        # Only put code that might raise the expected exception here
        with open(filepath, 'r') as f:
            raw_data = f.read()
    except FileNotFoundError:
        print(f\"Config file not found: {filepath}\")
        return get_default_config()
    except PermissionError:
        print(f\"No permission to read: {filepath}\")
        raise
    else:
        # Runs only if try block succeeded (no exception)
        # Put code here that depends on try's success
        # but shouldn't be protected by the except
        try:
            config = json.loads(raw_data)
        except json.JSONDecodeError as e:
            print(f\"Invalid JSON in {filepath}: {e}\")
            return get_default_config()
        return config
    finally:
        # Always runs, even if an exception was raised
        # Use for cleanup that must happen regardless
        print(f\"Config loading attempt for {filepath} complete\")


def get_default_config():
    return {\"debug\": False, \"log_level\": \"INFO\"}


# Usage
config = load_config(\"settings.json\")
print(config)

Output (file exists with valid JSON):

Config loading attempt for settings.json complete
{\"debug\": True, \"log_level\": \"DEBUG\", \"max_retries\": 3}

Output (file missing):

Config file not found: settings.json
Config loading attempt for settings.json complete
{\"debug\": False, \"log_level\": \"INFO\"}

The else block is often overlooked but serves an important purpose: it separates \”code that might fail\” from \”code that should run only on success.\” This prevents accidentally catching exceptions you didn’t intend to handle.

Catching Specific Exceptions (And Why Order Matters)

Python evaluates except clauses top to bottom and executes the first matching one. Since exceptions form a hierarchy, order matters–put specific exceptions before general ones.

import requests
import json

def fetch_api_data(url):
    \"\"\"Fetch data from an API with granular error handling\"\"\"
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        data = response.json()
        return data

    except requests.exceptions.Timeout:
        # Most specific: request timed out
        print(f\"Request to {url} timed out after 10 seconds\")
        return None

    except requests.exceptions.ConnectionError:
        # Network-level failure
        print(f\"Could not connect to {url}\")
        return None

    except requests.exceptions.HTTPError as e:
        # Server returned error status code
        status = e.response.status_code
        if status == 404:
            print(f\"Resource not found: {url}\")
        elif status == 429:
            print(f\"Rate limited. Retry after: {e.response.headers.get('Retry-After', 'unknown')}\")
        elif status >= 500:
            print(f\"Server error ({status}) at {url}\")
        return None

    except json.JSONDecodeError:
        # Response wasn't valid JSON
        print(f\"Invalid JSON response from {url}\")
        return None

    except requests.exceptions.RequestException as e:
        # Catch-all for requests library (parent class)
        print(f\"Unexpected request error: {e}\")
        return None


# Test with various URLs
data = fetch_api_data(\"https://api.github.com/users/python\")
print(f\"Got data: {type(data)}\")

Output:

Got data: <class 'dict'>

If you reversed the order and put RequestException first, it would catch Timeout, ConnectionError, and HTTPError before their specific handlers ever ran. Always go from most specific to most general.

Sudo Sam drawing exception hierarchy
Understanding the exception hierarchy saves you from catching errors you never meant to handle.

Raising and Re-Raising Exceptions

Sometimes you need to raise exceptions yourself, or catch one and re-raise it after logging. Python gives you three patterns for this.

import logging

logger = logging.getLogger(__name__)

def validate_age(age):
    \"\"\"Validate age with descriptive error messages\"\"\"
    if not isinstance(age, (int, float)):
        raise TypeError(f\"Age must be a number, got {type(age).__name__}\")
    if age < 0:
        raise ValueError(f\"Age cannot be negative: {age}\")
    if age > 150:
        raise ValueError(f\"Age seems unrealistic: {age}\")
    return int(age)


def process_user_registration(data):
    \"\"\"Process registration with re-raising pattern\"\"\"
    try:
        age = validate_age(data.get('age'))
        # ... more processing
        return {\"status\": \"success\", \"age\": age}

    except (TypeError, ValueError) as e:
        # Log and re-raise: preserves original traceback
        logger.error(f\"Validation failed for user data: {e}\")
        raise  # Re-raises the SAME exception with original traceback

    except Exception as e:
        # Wrap in a new exception: chain for context
        logger.critical(f\"Unexpected error during registration: {e}\")
        raise RuntimeError(\"Registration system failure\") from e


# Exception chaining with 'from'
def connect_to_database(config):
    \"\"\"Demonstrate exception chaining\"\"\"
    try:
        # Simulating a connection attempt
        if not config.get('host'):
            raise KeyError('host')
    except KeyError as e:
        # 'from e' chains the original exception
        raise ConnectionError(
            f\"Cannot connect: missing config key '{e}'\"
        ) from e


# Test it
try:
    validate_age(-5)
except ValueError as e:
    print(f\"Caught: {e}\")

try:
    validate_age(\"twenty\")
except TypeError as e:
    print(f\"Caught: {e}\")

try:
    connect_to_database({})
except ConnectionError as e:
    print(f\"Caught: {e}\")
    print(f\"Caused by: {e.__cause__}\")

Output:

Caught: Age cannot be negative: -5
Caught: Age must be a number, got str
Caught: Cannot connect: missing config key 'host'
Caused by: 'host'

The raise without arguments re-raises the current exception with its original traceback intact. The raise X from Y syntax creates an exception chain, so the traceback shows both the new error and what caused it.

Creating Custom Exception Hierarchies

For any non-trivial application, create custom exceptions. They make your code self-documenting and let callers handle specific error conditions without parsing error messages.

class AppError(Exception):
    \"\"\"Base exception for our application\"\"\"
    def __init__(self, message, code=None, details=None):
        super().__init__(message)
        self.code = code
        self.details = details or {}


class ValidationError(AppError):
    \"\"\"Input validation failed\"\"\"
    def __init__(self, message, field=None, **kwargs):
        super().__init__(message, code=\"VALIDATION_ERROR\", **kwargs)
        self.field = field


class NotFoundError(AppError):
    \"\"\"Requested resource doesn't exist\"\"\"
    def __init__(self, resource_type, resource_id):
        message = f\"{resource_type} with id '{resource_id}' not found\"
        super().__init__(message, code=\"NOT_FOUND\")
        self.resource_type = resource_type
        self.resource_id = resource_id


class AuthenticationError(AppError):
    \"\"\"User authentication failed\"\"\"
    def __init__(self, message=\"Authentication required\"):
        super().__init__(message, code=\"AUTH_ERROR\")


class RateLimitError(AppError):
    \"\"\"Too many requests\"\"\"
    def __init__(self, retry_after=60):
        message = f\"Rate limit exceeded. Retry after {retry_after} seconds\"
        super().__init__(message, code=\"RATE_LIMIT\")
        self.retry_after = retry_after


# Using custom exceptions
def get_user(user_id):
    users = {\"1\": \"Alice\", \"2\": \"Bob\"}
    if not isinstance(user_id, str):
        raise ValidationError(\"User ID must be a string\", field=\"user_id\")
    if user_id not in users:
        raise NotFoundError(\"User\", user_id)
    return users[user_id]


def handle_request(user_id):
    \"\"\"Handler showing how custom exceptions simplify error responses\"\"\"
    try:
        user = get_user(user_id)
        return {\"status\": \"success\", \"user\": user}
    except ValidationError as e:
        return {\"error\": e.code, \"message\": str(e), \"field\": e.field}
    except NotFoundError as e:
        return {\"error\": e.code, \"message\": str(e)}
    except AppError as e:
        return {\"error\": e.code, \"message\": str(e)}


# Test
print(handle_request(\"1\"))
print(handle_request(\"99\"))
print(handle_request(42))

Output:

{'status': 'success', 'user': 'Alice'}
{'error': 'NOT_FOUND', 'message': \"User with id '99' not found\"}
{'error': 'VALIDATION_ERROR', 'message': 'User ID must be a string', 'field': 'user_id'}
Loop Larry tangled in exception blocks
Custom exception hierarchies turn something broke into this specific thing broke for this specific reason.

Context Managers for Guaranteed Cleanup

Context managers (the with statement) are Python’s best tool for ensuring cleanup happens even when exceptions occur. They’re cleaner than try/finally for resource management.

import sqlite3
from contextlib import contextmanager

@contextmanager
def database_connection(db_path):
    \"\"\"Context manager for database connections with automatic rollback\"\"\"
    conn = sqlite3.connect(db_path)
    try:
        yield conn
        conn.commit()  # Only commits if no exception occurred
    except Exception:
        conn.rollback()  # Rollback on any error
        raise  # Re-raise so caller knows something failed
    finally:
        conn.close()  # Always close the connection


@contextmanager
def temporary_file(filepath, mode='w'):
    \"\"\"Write to a temp file, then atomically rename on success\"\"\"
    import os
    temp_path = filepath + '.tmp'
    f = open(temp_path, mode)
    try:
        yield f
        f.close()
        os.replace(temp_path, filepath)  # Atomic rename
    except Exception:
        f.close()
        if os.path.exists(temp_path):
            os.remove(temp_path)  # Clean up temp file on failure
        raise


# Usage
with database_connection(\":memory:\") as conn:
    cursor = conn.cursor()
    cursor.execute(\"CREATE TABLE users (name TEXT, age INTEGER)\")
    cursor.execute(\"INSERT INTO users VALUES ('Alice', 30)\")
    cursor.execute(\"SELECT * FROM users\")
    print(cursor.fetchall())

# The connection is guaranteed to be closed, committed on success
# or rolled back on failure

Output:

[('Alice', 30)]

The @contextmanager decorator from contextlib lets you write context managers as generator functions. Everything before yield is your setup, and everything after is your cleanup. The try/except/finally inside ensures proper handling regardless of what happens.

Logging Exceptions Effectively

Print statements aren’t sufficient for production. Use Python’s logging module with structured information that helps you diagnose issues quickly.

import logging
import traceback
import sys

# Configure logging with useful format
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(\"myapp\")


def process_payment(user_id, amount):
    \"\"\"Demonstrate different logging levels for exceptions\"\"\"
    try:
        if amount <= 0:
            raise ValueError(f\"Invalid amount: {amount}\")

        if amount > 10000:
            # Warning: not an error, but worth noting
            logger.warning(
                \"Large payment detected\",
                extra={\"user_id\": user_id, \"amount\": amount}
            )

        # Simulate processing
        if user_id == \"blocked\":
            raise PermissionError(\"User account is blocked\")

        logger.info(f\"Payment processed: user={user_id}, amount=${amount:.2f}\")
        return True

    except ValueError as e:
        # Client error: log as warning, don't need full traceback
        logger.warning(f\"Invalid payment request: {e}\")
        return False

    except PermissionError as e:
        # Expected business logic error
        logger.error(f\"Payment blocked: user={user_id}, reason={e}\")
        return False

    except Exception as e:
        # Unexpected error: log as critical WITH traceback
        logger.critical(
            f\"Payment system failure: user={user_id}, amount={amount}\",
            exc_info=True  # This includes the full traceback
        )
        raise


# Test scenarios
process_payment(\"user123\", 50.00)
process_payment(\"user456\", -10)
process_payment(\"blocked\", 100)

Output:

2026-04-08 10:30:00 - myapp - INFO - Payment processed: user=user123, amount=$50.00
2026-04-08 10:30:00 - myapp - WARNING - Invalid payment request: Invalid amount: -10
2026-04-08 10:30:00 - myapp - ERROR - Payment blocked: user=blocked, reason=User account is blocked

The key insight: use exc_info=True for unexpected exceptions where you need the full stack trace. For expected exceptions (validation errors, business logic), a simple message is sufficient. Don’t log full tracebacks for every caught exception–it creates noise that obscures real problems.

Pyro Pete setting up logging
Good logging turns a mystery crash into a five-minute fix.

Real-World Example: Resilient API Client with Retry Logic

Here’s a production-grade pattern combining everything we’ve covered: custom exceptions, context managers, logging, and retry logic for an API client.

import time
import logging
import json
from functools import wraps

logger = logging.getLogger(__name__)


class APIError(Exception):
    \"\"\"Base API exception\"\"\"
    def __init__(self, message, status_code=None, response_body=None):
        super().__init__(message)
        self.status_code = status_code
        self.response_body = response_body


class RetryableError(APIError):
    \"\"\"Error that should trigger a retry\"\"\"
    pass


class FatalError(APIError):
    \"\"\"Error that should NOT be retried\"\"\"
    pass


def retry_on_failure(max_retries=3, base_delay=1, backoff_factor=2):
    \"\"\"Decorator that retries on RetryableError with exponential backoff\"\"\"
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except RetryableError as e:
                    last_exception = e
                    if attempt < max_retries:
                        delay = base_delay * (backoff_factor ** attempt)
                        logger.warning(
                            f\"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. \"
                            f\"Retrying in {delay}s...\"
                        )
                        time.sleep(delay)
                    else:
                        logger.error(
                            f\"All {max_retries + 1} attempts failed for {func.__name__}\"
                        )
                except FatalError:
                    # Don't retry fatal errors
                    raise

            raise last_exception

        return wrapper
    return decorator


class APIClient:
    \"\"\"Resilient API client with proper exception handling\"\"\"

    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.api_key = api_key
        self._session = None

    def _classify_error(self, status_code, response_body):
        \"\"\"Classify HTTP errors as retryable or fatal\"\"\"
        if status_code in (429, 502, 503, 504):
            raise RetryableError(
                f\"Server returned {status_code}\",
                status_code=status_code,
                response_body=response_body
            )
        elif status_code in (400, 401, 403, 404, 422):
            raise FatalError(
                f\"Client error {status_code}: {response_body}\",
                status_code=status_code,
                response_body=response_body
            )
        else:
            raise APIError(
                f\"Unexpected status {status_code}\",
                status_code=status_code,
                response_body=response_body
            )

    @retry_on_failure(max_retries=3, base_delay=1)
    def get(self, endpoint):
        \"\"\"GET request with automatic retry on transient failures\"\"\"
        import urllib.request
        import urllib.error

        url = f\"{self.base_url}/{endpoint}\"
        req = urllib.request.Request(url)
        req.add_header('Authorization', f'Bearer {self.api_key}')

        try:
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read().decode())
                logger.info(f\"GET {endpoint}: success\")
                return data

        except urllib.error.HTTPError as e:
            body = e.read().decode() if e.fp else \"\"
            self._classify_error(e.code, body)

        except urllib.error.URLError as e:
            raise RetryableError(f\"Connection failed: {e.reason}\")

        except json.JSONDecodeError as e:
            raise FatalError(f\"Invalid JSON response: {e}\")

    @retry_on_failure(max_retries=2, base_delay=2)
    def post(self, endpoint, data):
        \"\"\"POST request with retry logic\"\"\"
        import urllib.request
        import urllib.error

        url = f\"{self.base_url}/{endpoint}\"
        payload = json.dumps(data).encode('utf-8')
        req = urllib.request.Request(url, data=payload, method='POST')
        req.add_header('Authorization', f'Bearer {self.api_key}')
        req.add_header('Content-Type', 'application/json')

        try:
            with urllib.request.urlopen(req, timeout=30) as response:
                result = json.loads(response.read().decode())
                logger.info(f\"POST {endpoint}: success\")
                return result

        except urllib.error.HTTPError as e:
            body = e.read().decode() if e.fp else \"\"
            self._classify_error(e.code, body)

        except urllib.error.URLError as e:
            raise RetryableError(f\"Connection failed: {e.reason}\")


# Usage example
client = APIClient(\"https://api.example.com\", \"my-api-key\")

try:
    users = client.get(\"users\")
    print(f\"Fetched {len(users)} users\")
except RetryableError as e:
    print(f\"Service unavailable after retries: {e}\")
except FatalError as e:
    print(f\"Request invalid: {e}\")
except APIError as e:
    print(f\"API error: {e}\")

This pattern handles transient failures (network issues, rate limits, server errors) with automatic retry and exponential backoff. Fatal errors like 401 or 404 fail immediately since retrying won't help. The decorator separates retry logic from business logic, keeping your code clean.

Cache Katie racing through retries
Exponential backoff is the polite way of saying I will keep trying, but I will wait longer each time.

Frequently Asked Questions

Should I use except Exception or except BaseException?

Almost always except Exception. The BaseException class includes SystemExit, KeyboardInterrupt, and GeneratorExit, which you almost never want to catch. Catching KeyboardInterrupt prevents Ctrl+C from working. Only use BaseException in top-level cleanup code where you truly need to catch everything.

When should I use EAFP vs LBYL style?

EAFP (Easier to Ask Forgiveness than Permission) means using try/except. LBYL (Look Before You Leap) means checking conditions first. Python idiomatically prefers EAFP. Use try/except when the check would be expensive or racy (like file existence checks). Use LBYL when the check is cheap and makes code clearer, like if key in dict.

Is it bad to use bare except clauses?

Yes, except: without specifying an exception type catches everything including SystemExit and KeyboardInterrupt. It also makes debugging harder because you don't know what went wrong. Always specify at least except Exception, and prefer more specific exception types.

How do I handle exceptions in async code?

Async exception handling uses the same try/except syntax inside async functions. For gathering multiple coroutines, use asyncio.gather(return_exceptions=True) to collect exceptions instead of failing on the first one. For task groups in Python 3.11+, use asyncio.TaskGroup which raises ExceptionGroup containing all child exceptions.

What's the performance cost of try/except?

In Python, entering a try block has virtually zero cost when no exception occurs. Exceptions are \"zero-cost\" on the happy path. However, actually raising and catching exceptions is expensive--roughly 10-100x slower than a simple if/else check. Don't use exceptions for normal control flow (like iterating with IndexError). Use them for truly exceptional conditions.

How do I test exception handling code?

Use pytest.raises as a context manager to verify exceptions are raised correctly. For testing retry logic, use unittest.mock.patch to simulate failures. Test both the happy path and each exception path. Verify that the right exception type, message, and attributes are set.

Wrapping Up

Exception handling separates production-ready code from scripts that work on your laptop. The patterns we covered--specific exception catching, custom hierarchies, context managers, logging strategies, and retry decorators--form the foundation of resilient Python applications.

The key principles to remember: catch specific exceptions rather than broad ones, use else and finally blocks intentionally, create custom exception classes for your domain, log with appropriate severity levels, and design your error recovery strategy before writing the happy path code.

Start applying these patterns incrementally. Pick one codebase and replace bare except clauses with specific ones. Add custom exceptions to your next project. Build a retry decorator for your API calls. Each improvement makes your code more debuggable and your production systems more reliable.

Official Resources

Related Articles

The Basic try/except Pattern

try:
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    data = response.json()
except requests.Timeout:
    logger.warning("Request timed out")
    return None
except requests.HTTPError as e:
    logger.error("HTTP %s for %s", e.response.status_code, url)
    return None
except requests.RequestException:
    logger.exception("Network error fetching %s", url)
    return None

Catch the most specific exception first, then progressively broader ones. Bare except: catches EVERYTHING including KeyboardInterrupt — almost always wrong.

The Three Anti-Patterns

  • Bare except. except: with no class catches every exception, including system-level ones like KeyboardInterrupt. Use except Exception: as the fallback if you genuinely don't know.
  • Silently swallowing. except Exception: pass hides bugs forever. At minimum, log the exception. Better, re-raise after cleanup.
  • Catching too broadly. try: do_a(); do_b(); do_c() except Exception: can't tell you which call failed. Wrap each risky operation separately.

The else and finally Clauses

try:
    data = parse_user_input(raw)
except ValueError as e:
    logger.error("Invalid input: %s", e)
    return error_response(400)
else:
    # Runs ONLY if try block succeeded — no implicit catch of other errors
    save_to_db(data)
    notify_user(data)
finally:
    # Runs whether the try succeeded, failed, or raised
    close_connection()

The else block runs only on success — useful when you want code that's clearly NOT covered by the except. finally always runs — perfect for cleanup that must happen regardless.

Raising With Context

# Re-raise the same exception
try:
    risky()
except ValueError:
    log_diagnostics()
    raise  # re-raises the current exception

# Wrap with a higher-level exception
try:
    response = requests.get(url)
    return response.json()
except requests.RequestException as e:
    raise APIError("Could not fetch user data") from e

# Chain explicitly (preserves original traceback)
raise ValidationError("bad input") from original_exception

The from keyword preserves the chain — readers see "while handling X, Y happened" — invaluable for debugging.

Custom Exception Hierarchies

class APIError(Exception):
    pass

class APIConnectionError(APIError):
    pass

class APIRateLimit(APIError):
    def __init__(self, retry_after):
        self.retry_after = retry_after
        super().__init__("Rate limited, retry in %s seconds" % retry_after)

# Callers can catch broadly OR narrowly
try:
    call_api()
except APIRateLimit as e:
    time.sleep(e.retry_after)
except APIError:
    raise

# Or in one catch with different handling
try:
    call_api()
except APIError as e:
    if isinstance(e, APIRateLimit):
        retry(after=e.retry_after)
    else:
        raise

Exception Groups (Python 3.11+)

try:
    asyncio.run(parallel_tasks())
except* ValueError as eg:
    for e in eg.exceptions:
        log.warning("Validation failed: %s", e)
except* ConnectionError as eg:
    log.error("%s connections failed", len(eg.exceptions))

except* handles multiple errors raised in parallel — essential for async code where multiple tasks may fail simultaneously.

Common Pitfalls

  • except Exception as e: print(e). Loses the traceback. Use logger.exception(...) or traceback.print_exc().
  • Catching to silence linter warnings. A try/except that does nothing isn't a fix — it's hiding a bug.
  • Raising strings. raise "error" is a TypeError. Always raise an exception instance: raise ValueError("...").
  • Not closing resources on exception. Use with blocks (context managers) instead of try/finally for file handles, locks, DB sessions.
  • Catching system-exit exceptions. KeyboardInterrupt, SystemExit, GeneratorExit don't inherit from Exception. except Exception skips them — usually what you want.

FAQ

Q: Look before you leap, or ask forgiveness?
A: Python idiom is EAFP — try the operation, catch on failure. LBYL has race conditions (the file might be deleted between exists() and open()).

Q: When to raise vs return None?
A: Raise for programming errors (bad arguments, missing files, network failures the caller can't handle). Return None for "expected absence" (key not in dict, optional config value).

Q: How do I see the full traceback in production?
A: logger.exception("msg") inside the except block. Or set up a global handler with sys.excepthook.

Q: Should I catch and ignore in tests?
A: Use pytest.raises(ExpectedError) to assert errors. Don't catch errors in tests — they're meant to fail loudly.

Q: Performance cost of try/except?
A: Negligible if no exception fires. Raising is expensive — don't use exceptions for normal control flow.

Wrapping Up

Good exception handling is mostly about NOT catching things you shouldn't. Catch specific exceptions, never bare except, always log or re-raise. Use context managers for cleanup, custom exceptions for domain errors, and raise X from Y to preserve diagnostic chains. These patterns prevent the silent failures that cause production headaches.