You need to find, extract, or replace text patterns in strings — email addresses in a document, phone numbers in a log file, or URLs scattered across a web page. Python’s built-in re module gives you regular expressions, a powerful pattern-matching language that handles all of these tasks with concise, reusable patterns.
Regular expressions look intimidating at first, but they follow logical rules. Once you learn the core patterns, you will reach for them constantly — for data validation, log parsing, text cleanup, and search-and-replace operations. Python’s re module is part of the standard library, so there is nothing to install.
In this tutorial, you will learn the essential regex patterns, how to use re.search(), re.findall(), re.sub(), and re.compile(), work with groups and named groups, handle common real-world patterns, and build a complete log parser project.
Pattern Matching: Quick Example
Here is a minimal example that extracts all email addresses from a block of text.
# quick_regex.py
import re
text = "Contact us at support@example.com or sales@company.org for help."
pattern = r'[\w.+-]+@[\w-]+\.[\w.]+'
emails = re.findall(pattern, text)
for email in emails:
print(email)
Output:
support@example.com
sales@company.org
The r prefix creates a raw string, which prevents Python from interpreting backslashes as escape characters. Always use raw strings for regex patterns.
Pattern matching is just string searching with superpowers — and a learning curve.
What Are Regular Expressions and Why Use Them?
Regular expressions (regex) are sequences of characters that define search patterns. Think of them as advanced wildcards — where a file search might use *.txt to match all text files, regex gives you the precision to match patterns like “any string that looks like a phone number” or “every word followed by a colon.”
Pattern
Matches
Example
\d
Any digit (0-9)
\d{3} matches “123”
\w
Word character (letter, digit, underscore)
\w+ matches “hello_world”
\s
Whitespace (space, tab, newline)
\s+ matches ” “
.
Any character except newline
a.c matches “abc”, “a1c”
+
One or more of previous
\d+ matches “42”, “1000”
*
Zero or more of previous
\d* matches “”, “5”, “99”
?
Zero or one of previous
colou?r matches “color”, “colour”
[abc]
Any character in the set
[aeiou] matches vowels
^
Start of string
^Hello matches “Hello world”
$
End of string
world$ matches “Hello world”
Core Functions: search, match, findall, sub
The re module provides four main functions. Each serves a different use case.
# core_functions.py
import re
text = "Order #12345 was placed on 2024-03-15 for $49.99"
# search() - find first match anywhere in string
match = re.search(r'#(\d+)', text)
if match:
print(f"Order number: {match.group(1)}")
# findall() - find all matches, return as list
prices = re.findall(r'\$[\d.]+', text)
print(f"Prices found: {prices}")
# sub() - search and replace
cleaned = re.sub(r'\$[\d.]+', '[PRICE]', text)
print(f"Cleaned: {cleaned}")
# match() - match at the START of string only
result = re.match(r'Order', text)
print(f"Starts with Order: {result is not None}")
Output:
Order number: 12345
Prices found: ['$49.99']
Cleaned: Order #12345 was placed on 2024-03-15 for [PRICE]
Starts with Order: True
Use search() when you want to find a pattern anywhere in the string. Use match() only when you specifically need to check the beginning of the string.
Groups and Named Groups
Parentheses in regex create capture groups that let you extract specific parts of a match. Named groups make your code more readable.
# groups.py
import re
log_line = '2024-03-15 14:30:22 ERROR [auth] Failed login for user admin from 192.168.1.100'
# Numbered groups
pattern = r'(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}) (\w+) \[(\w+)\] (.+)'
match = re.search(pattern, log_line)
if match:
print(f"Date: {match.group(1)}")
print(f"Time: {match.group(2)}")
print(f"Level: {match.group(3)}")
print(f"Module: {match.group(4)}")
print(f"Message: {match.group(5)}")
# Named groups (much more readable)
pattern = r'(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) (?P<level>\w+)'
match = re.search(pattern, log_line)
if match:
print(f"\nNamed: {match.group('date')} at {match.group('time')} [{match.group('level')}]")
Output:
Date: 2024-03-15
Time: 14:30:22
Level: ERROR
Module: auth
Message: Failed login for user admin from 192.168.1.100
Named: 2024-03-15 at 14:30:22 [ERROR]
Named groups use the (?P<name>...) syntax. They are especially valuable in complex patterns where numbered groups become confusing.
Capture groups let you grab exactly what you need from the chaos.
Compiled Patterns with re.compile()
When you use the same regex pattern multiple times, compile it first for better performance and cleaner code.
# compiled.py
import re
# Compile once, use many times
email_pattern = re.compile(r'[\w.+-]+@[\w-]+\.[\w.]+')
phone_pattern = re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b')
contacts = [
"Call John at 555-123-4567 or email john@example.com",
"Reach Jane at jane@company.org or 555.987.6543",
"No contact info here, sorry!",
]
for text in contacts:
emails = email_pattern.findall(text)
phones = phone_pattern.findall(text)
print(f"Emails: {emails}, Phones: {phones}")
Compiled patterns have the same methods as the re module itself — search(), findall(), sub(), etc. The performance benefit comes from compiling the regex once instead of recompiling it on every call.
Common Real-World Patterns
Here are battle-tested patterns for common data extraction tasks.
# common_patterns.py
import re
# URL extraction
text = "Visit https://example.com or http://docs.python.org/3/ for details"
urls = re.findall(r'https?://[\w./\-?=&]+', text)
print("URLs:", urls)
# Date extraction (YYYY-MM-DD)
text = "Events on 2024-03-15 and 2024-12-25"
dates = re.findall(r'\d{4}-\d{2}-\d{2}', text)
print("Dates:", dates)
# IP address
text = "Server 192.168.1.100 responded, backup at 10.0.0.1"
ips = re.findall(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', text)
print("IPs:", ips)
# HTML tag stripping
html = "<p>Hello <b>world</b></p>"
clean = re.sub(r'<[^>]+>', '', html)
print("Stripped:", clean)
These patterns handle the most common extraction tasks. For production use, consider edge cases — the email pattern above works for most addresses but does not cover every valid format defined in the RFC.
re.sub(), re.split(), re.findall() — pick your weapon wisely.
Real-Life Example: Building a Log File Parser
# log_parser.py
import re
from collections import Counter
# Sample log data
log_data = """
2024-03-15 14:30:22 ERROR [auth] Failed login for user admin from 192.168.1.100
2024-03-15 14:30:25 INFO [auth] Successful login for user alice from 10.0.0.50
2024-03-15 14:31:00 WARNING [db] Slow query detected: 2.5s
2024-03-15 14:31:05 ERROR [api] Timeout connecting to payment service
2024-03-15 14:31:10 INFO [auth] Successful login for user bob from 10.0.0.51
2024-03-15 14:31:15 ERROR [auth] Failed login for user admin from 192.168.1.100
2024-03-15 14:31:20 INFO [api] Health check passed
2024-03-15 14:31:30 ERROR [db] Connection pool exhausted
""".strip()
log_pattern = re.compile(
r'(?P\d{4}-\d{2}-\d{2}) '
r'(?P
This parser combines compiled regex patterns, named groups, and Python’s Counter class to analyze log files efficiently. You can extend it to read from actual log files, send alerts for suspicious activity, or export metrics to a dashboard.
Frequently Asked Questions
What is the difference between greedy and lazy matching?
By default, quantifiers like + and * are greedy — they match as much as possible. Adding ? makes them lazy, matching as little as possible. For example, <.+> on “<b>bold</b>” matches the entire string, while <.+?> matches just “<b>”.
How do I match across multiple lines?
Use the re.DOTALL flag to make . match newlines, or re.MULTILINE to make ^ and $ match at line boundaries instead of string boundaries. Pass these as the flags parameter: re.search(pattern, text, flags=re.MULTILINE).
When do I need to escape special characters?
Escape regex metacharacters (. * + ? [ ] ( ) { } ^ $ | \) with a backslash when you want to match them literally. Use re.escape() to automatically escape a string: re.escape("price is $9.99") returns "price\ is\ \$9\.99".
Are regular expressions slow?
For simple pattern matching, string methods like str.startswith(), str.endswith(), and in are faster. Use regex when you need pattern matching that string methods cannot handle. Compile patterns with re.compile() if you use them repeatedly.
When should I NOT use regex?
Do not use regex to parse HTML or XML — use BeautifulSoup or lxml instead. Do not use regex for JSON — use the json module. Regex is best for flat text patterns, not nested structures. If your pattern needs recursive matching, you need a proper parser.
Conclusion
Regular expressions are one of the most powerful tools in a Python developer’s toolkit. We covered the core syntax, the main re module functions, capture groups, compiled patterns, common real-world patterns, and building a complete log parser.
For the full re module documentation, visit docs.python.org/3/library/re. For interactive regex testing, try regex101.com which provides real-time explanations of your patterns.
You need a database for your Python project, but setting up MySQL or PostgreSQL feels like overkill. SQLite is a lightweight, serverless database that comes built into Python — no installation, no configuration, no separate server process. Just import sqlite3 and start storing data in a single file.
Python’s built-in sqlite3 module provides a full-featured interface to SQLite databases. It supports SQL queries, transactions, parameterized statements, and works identically on every platform. Your database is just a file on disk that you can copy, backup, or share like any other file.
In this tutorial, you will learn how to create databases and tables, insert and query data, use parameterized queries to prevent SQL injection, handle transactions, work with context managers, and build a complete project. By the end, you will be able to add persistent data storage to any Python project in minutes.
SQLite in Python: Quick Example
Here is a complete working example that creates a database, inserts data, and queries it — all in under 15 lines.
# quick_sqlite.py
import sqlite3
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com"))
conn.commit()
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(row)
conn.close()
The ? placeholders in the INSERT statement are parameterized queries — they prevent SQL injection by keeping data separate from SQL code. Never use f-strings or string concatenation to build SQL queries.
What Is SQLite and When Should You Use It?
SQLite is a self-contained, file-based relational database engine. Unlike MySQL or PostgreSQL, it does not require a separate server process. The entire database lives in a single file on your filesystem.
Feature
SQLite
PostgreSQL
MySQL
Server required
No
Yes
Yes
Setup complexity
Zero
Medium
Medium
Concurrent writers
Limited
Excellent
Good
Max database size
281 TB
Unlimited
Unlimited
Best for
Apps, prototypes, scripts
Web apps, analytics
Web apps, CMS
Use SQLite when you need persistent storage for a single-user application, prototype, CLI tool, data processing script, or any project where simplicity matters more than concurrent access. Switch to PostgreSQL when you need multiple simultaneous writers or a web application with many users.
SQLite: a database in a file. Embedded. Reliable.
Creating Databases and Tables
Connecting to a database file creates it automatically if it does not exist. Use CREATE TABLE IF NOT EXISTS to make your scripts idempotent.
# create_tables.py
import sqlite3
conn = sqlite3.connect("bookstore.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
author TEXT NOT NULL,
price REAL DEFAULT 0.0,
published_year INTEGER,
in_stock INTEGER DEFAULT 1
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
)
""")
conn.commit()
print("Tables created successfully")
# Verify tables exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print("Tables:", [t[0] for t in tables])
conn.close()
Output:
Tables created successfully
Tables: ['books', 'categories']
The AUTOINCREMENT keyword makes SQLite assign a unique incrementing ID to each new row. NOT NULL prevents inserting rows with missing required fields.
Inserting Data Safely
Always use parameterized queries (the ? placeholder pattern) when inserting data. This prevents SQL injection and handles special characters in your data automatically.
The executemany() method is significantly faster than calling execute() in a loop because it batches the operations into a single transaction.
SQLite for prototypes. Postgres for production. Usually.
Querying Data
SQLite supports full SQL query syntax. Use fetchall() for all results, fetchone() for a single row, or iterate directly over the cursor.
# query_data.py
import sqlite3
conn = sqlite3.connect("bookstore.db")
conn.row_factory = sqlite3.Row # Enable column name access
cursor = conn.cursor()
# Select all books
cursor.execute("SELECT * FROM books ORDER BY price DESC")
print("All books by price:")
for row in cursor.fetchall():
print(f" {row['title']} by {row['author']} - ${row['price']}")
# Filtered query with parameter
cursor.execute("SELECT * FROM books WHERE price < ? ORDER BY title", (40.0,))
print("\nBooks under $40:")
for row in cursor.fetchall():
print(f" {row['title']} (${row['price']})")
# Aggregate query
cursor.execute("SELECT COUNT(*) as count, AVG(price) as avg_price FROM books")
stats = cursor.fetchone()
print(f"\nTotal: {stats['count']} books, Average price: ${stats['avg_price']:.2f}")
conn.close()
Output:
All books by price:
Fluent Python by Luciano Ramalho - $55.99
Clean Code by Robert C. Martin - $39.99
Python Crash Course by Eric Matthes - $35.99
Automate the Boring Stuff by Al Sweigart - $29.99
Books under $40:
Automate the Boring Stuff ($29.99)
Clean Code ($39.99)
Python Crash Course ($35.99)
Total: 4 books, Average price: $40.49
Setting conn.row_factory = sqlite3.Row lets you access columns by name instead of index. This makes your code more readable and resilient to schema changes.
Using Context Managers
Context managers ensure your database connection is properly closed even if an error occurs. They also handle transaction commits automatically on success and rollbacks on failure.
# context_manager.py
import sqlite3
# The connection as context manager handles commits/rollbacks
with sqlite3.connect("bookstore.db") as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"INSERT INTO books (title, author, price, published_year) VALUES (?, ?, ?, ?)",
("Learning Python", "Mark Lutz", 59.99, 2013)
)
# Commit happens automatically when exiting the with block
cursor.execute("SELECT COUNT(*) as count FROM books")
print(f"Total books: {cursor.fetchone()['count']}")
# Connection is committed and closed here
Output:
Total books: 5
The with statement commits the transaction when the block exits normally and rolls it back if an exception occurs. This is the recommended pattern for all database operations.
Single-process, zero-network. Faster than you think.
Real-Life Example: Building a Contact Book CLI
# contact_book.py
import sqlite3
class ContactBook:
def __init__(self, db_path="contacts.db"):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.conn.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def add(self, name, phone=None, email=None):
self.conn.execute(
"INSERT INTO contacts (name, phone, email) VALUES (?, ?, ?)",
(name, phone, email)
)
self.conn.commit()
print(f"Added: {name}")
def search(self, query):
cursor = self.conn.execute(
"SELECT * FROM contacts WHERE name LIKE ? OR email LIKE ?",
(f"%{query}%", f"%{query}%")
)
return cursor.fetchall()
def list_all(self):
cursor = self.conn.execute("SELECT * FROM contacts ORDER BY name")
return cursor.fetchall()
def delete(self, contact_id):
self.conn.execute("DELETE FROM contacts WHERE id = ?", (contact_id,))
self.conn.commit()
def close(self):
self.conn.close()
# Demo usage
book = ContactBook(":memory:")
book.add("Alice Smith", "555-0101", "alice@example.com")
book.add("Bob Jones", "555-0102", "bob@example.com")
book.add("Charlie Brown", "555-0103", "charlie@example.com")
print("\nAll contacts:")
for c in book.list_all():
print(f" {c['name']} - {c['phone']} - {c['email']}")
print("\nSearch 'ali':")
for c in book.search("ali"):
print(f" {c['name']} - {c['email']}")
book.close()
Output:
Added: Alice Smith
Added: Bob Jones
Added: Charlie Brown
All contacts:
Alice Smith - 555-0101 - alice@example.com
Bob Jones - 555-0102 - bob@example.com
Charlie Brown - 555-0103 - charlie@example.com
Search 'ali':
Alice Smith - alice@example.com
Using ":memory:" creates an in-memory database for testing. Replace it with a filename like "contacts.db" for persistent storage. The class encapsulates all database operations, making it easy to extend with update, export, or import features.
Frequently Asked Questions
Is SQLite thread-safe in Python?
By default, a SQLite connection should only be used in the thread that created it. Set check_same_thread=False if you need to share a connection across threads, but be aware that concurrent writes can cause issues. For multi-threaded applications, give each thread its own connection.
How much data can SQLite handle?
SQLite can handle databases up to 281 terabytes. In practice, it works well for databases up to a few gigabytes. Performance depends more on query patterns than raw size -- proper indexing makes a bigger difference than database engine choice for most workloads.
How do I back up a SQLite database?
The simplest backup is to copy the database file when no writes are happening. For live databases, use Python's conn.backup() method which creates a consistent snapshot even during active writes. Schedule backups with cron or the schedule library.
How do I change the schema of an existing table?
SQLite has limited ALTER TABLE support. You can add columns with ALTER TABLE books ADD COLUMN rating REAL, but you cannot remove or rename columns in older SQLite versions. For complex changes, create a new table, copy data over, drop the old table, and rename the new one.
Should I use SQLite directly or through an ORM?
For simple scripts and small projects, the built-in sqlite3 module is perfect. For larger projects with complex queries and relationships, consider SQLAlchemy which provides an ORM layer. Start with raw SQL to understand the fundamentals, then add an ORM if the project grows.
Conclusion
SQLite with Python gives you a full relational database with zero configuration. We covered creating databases and tables, safe data insertion with parameterized queries, querying with filtering and aggregation, context managers for clean resource handling, and building a complete contact book application.
SQL injection is the #1 security mistake in database code. Never build queries with f-strings or string concatenation. Use parameterized queries — sqlite3 substitutes safely:
import sqlite3
con = sqlite3.connect("app.db")
cur = con.cursor()
# WRONG — injection vulnerability
user_input = "alice'; DROP TABLE users; --"
cur.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# RIGHT — placeholder + tuple
cur.execute("SELECT * FROM users WHERE name = ?", (user_input,))
print(cur.fetchall())
# Named parameters work too
cur.execute(
"INSERT INTO users (name, email, age) VALUES (:name, :email, :age)",
{"name": "Bob", "email": "bob@x.com", "age": 30},
)
con.commit()
The ? style is portable across DB-API drivers (psycopg2, mysql, etc.). The named :name style works in sqlite3 and reads better for queries with many parameters.
Context Managers for Transactions
Forgetting commit() after writes is the #2 mistake. The connection object is a context manager that auto-commits on exit, auto-rollbacks on exception:
import sqlite3
con = sqlite3.connect("app.db")
# Auto-commit on clean exit, auto-rollback on error
with con:
con.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Carol", 25))
con.execute("UPDATE accounts SET balance = balance - 100 WHERE user_id = 1")
con.execute("UPDATE accounts SET balance = balance + 100 WHERE user_id = 2")
# If any line raises, all three are rolled back atomically
# Standalone connection cleanup
con.close()
This is the safest write pattern — either the whole transaction commits, or nothing changes. Use it for any multi-statement update.
Row Factory for Named Columns
By default, fetchall() returns tuples — fine for two columns, painful for ten. Set row_factory = sqlite3.Row to get dict-like access:
con = sqlite3.connect("app.db")
con.row_factory = sqlite3.Row # rows now act like namedtuples
cur = con.execute("SELECT id, name, email, created_at FROM users LIMIT 5")
for row in cur:
# Access by column name OR index
print(row["name"], row["email"])
print(row[1], row[2])
# Convert to dict if needed
print(dict(row))
SQLite-Specific Features
SQLite has a few features other databases lack — worth knowing:
# INSERT OR REPLACE — upsert in one statement
cur.execute(
"INSERT OR REPLACE INTO users (id, name) VALUES (?, ?)",
(1, "Alice"),
)
# JSON support (built into modern SQLite)
cur.execute("CREATE TABLE events (id INTEGER PRIMARY KEY, data TEXT)")
cur.execute(
"INSERT INTO events (data) VALUES (json(?))",
('{"user_id": 42, "action": "login"}',),
)
cur.execute("SELECT json_extract(data, '$.user_id') FROM events WHERE id = 1")
# Full-text search via FTS5
cur.executescript('''
CREATE VIRTUAL TABLE docs USING fts5(title, body);
INSERT INTO docs VALUES ('Python', 'A programming language');
INSERT INTO docs VALUES ('SQLite', 'An embedded database');
''')
cur.execute("SELECT title FROM docs WHERE docs MATCH ?", ("programming",))
Performance: Indexes and EXPLAIN QUERY PLAN
SQLite is fast — but only with the right indexes. Every column you filter or join on should be indexed:
# Create indexes
cur.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_orders_user ON orders(user_id, created_at)")
# Diagnose slow queries
cur.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("a@b.com",))
print(cur.fetchall())
# Look for "SEARCH USING INDEX" — good
# Look for "SCAN" — slow, missing index
WAL Mode for Concurrent Reads
The default SQLite journaling mode (rollback) blocks readers during writes. Switch to WAL (Write-Ahead Logging) for better concurrency:
con = sqlite3.connect("app.db")
con.execute("PRAGMA journal_mode = WAL")
con.execute("PRAGMA synchronous = NORMAL") # slightly less durable, much faster
con.execute("PRAGMA foreign_keys = ON") # enforce FK constraints
# WAL is per-database — run once, persists across connections
WAL mode lets readers and writers operate without blocking each other (within reasonable limits). This is the single biggest performance unlock for read-heavy SQLite workloads.
Common Pitfalls
Forgetting commit() without context manager. If you don't use with con:, you must call con.commit() after writes — or they're lost when the connection closes.
Using one connection across threads. sqlite3 connections aren't thread-safe by default. Open one connection per thread, or use check_same_thread=False with a lock.
String concatenation in queries. SQL injection is a real risk even in single-user apps — never build queries with user input. Parameterize everything.
No foreign-key enforcement. SQLite doesn't enforce FK constraints by default. Set PRAGMA foreign_keys = ON on every connection.
Path issues. A relative path to connect() resolves against the cwd, not the script's directory. Use an absolute path or pathlib.Path(__file__).parent / "app.db".
FAQ
Q: SQLite or PostgreSQL?
A: SQLite for embedded, single-process, prototype, or low-write apps. Postgres for multi-process, concurrent writes, or anything needing real network access. SQLite is shockingly fast for read-heavy workloads.
Q: How big can a SQLite database get?
A: 281 TB theoretical maximum, but practically anything over 100 GB is pushing it. Multi-GB databases work fine; multi-TB is when you should evaluate PostgreSQL or DuckDB.
Q: How do I back up a SQLite database?
A: While the DB is in use, run VACUUM INTO 'backup.db' for a consistent snapshot. Or use the built-in backup API: src.backup(dst) in the sqlite3 module.
Q: Can multiple processes write to the same SQLite file?
A: Yes, with WAL mode, but writes serialize. For high write concurrency, switch to Postgres. For occasional concurrent writes (a few per second), SQLite is fine.
Q: SQLAlchemy or raw sqlite3?
A: Raw sqlite3 for small scripts where SQL is the simplest expression of intent. SQLAlchemy when you have many tables, migrations, complex relationships, or want portability across databases.
Wrapping Up
SQLite is the most-deployed database in the world (every smartphone has hundreds of SQLite databases). For Python, it's built in — no install, no service to run, no network. Master parameterized queries, transactions via with con:, row factories for ergonomics, indexes for speed, and WAL mode for concurrency. Those five skills cover 90% of SQLite work and prevent 99% of the foot-guns.
You have built a scraper that works perfectly on practice sites, but real-world websites fight back. Rate limiting, CAPTCHAs, IP blocking, and browser fingerprinting are all common defenses designed to stop automated access. Understanding these measures — and the ethical ways to work around them — is essential for any serious scraping project.
Python gives you several tools to handle anti-scraping defenses ethically: rotating User-Agent headers, managing request timing, using sessions to maintain cookies, and respecting robots.txt files. The key principle is to make your scraper behave like a polite human visitor rather than an aggressive bot.
In this tutorial, you will learn about common anti-scraping techniques websites use, how to add proper headers and User-Agents, manage request timing and rate limits, handle cookies and sessions, check and respect robots.txt, and deal with common blocking scenarios. By the end, you will know how to build scrapers that work reliably without abusing the websites you scrape.
Handling Blocks: Quick Example
Many websites block requests that do not include a User-Agent header. Adding one is the simplest fix for “403 Forbidden” responses.
# quick_headers.py
import requests
url = "https://httpbin.org/headers"
# Without User-Agent (might get blocked on real sites)
response_bare = requests.get(url)
print("Status:", response_bare.status_code)
# With proper headers
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
}
response = requests.get(url, headers=headers)
data = response.json()
print("Sent headers:", list(data["headers"].keys()))
Output:
Status: 200
Sent headers: ['Accept', 'Accept-Language', 'Host', 'User-Agent', 'X-Amzn-Trace-Id']
The httpbin.org/headers endpoint echoes back the headers your request sent, which is perfect for verifying that your headers are being sent correctly.
Common Anti-Scraping Measures
Understanding what defenses websites use helps you build scrapers that handle them correctly. Here are the most common anti-scraping measures you will encounter.
Measure
How It Works
Your Response
Missing User-Agent block
Rejects requests without browser-like headers
Add realistic headers
Rate limiting
Limits requests per time period per IP
Add delays between requests
IP blocking
Bans IPs making too many requests
Slow down, use proxies ethically
CAPTCHA
Requires human verification
Respect it — do not automate CAPTCHA solving
JavaScript rendering
Content loads only via JS
Use Playwright or Selenium
robots.txt
Declares which paths bots should avoid
Always respect it
CAPTCHAs, rate limits, fingerprinting. The web fights back.
Respecting robots.txt
Every ethical scraper should check robots.txt before scraping. Python’s built-in urllib.robotparser module handles this for you.
# check_robots.py
from urllib.robotparser import RobotFileParser
rp = RobotFileParser()
rp.set_url("https://quotes.toscrape.com/robots.txt")
rp.read()
# Check if we can scrape specific paths
paths = ["/", "/page/2/", "/login", "/admin/"]
for path in paths:
allowed = rp.can_fetch("*", path)
print(f"{path}: {'Allowed' if allowed else 'Blocked'}")
# Check crawl delay
delay = rp.crawl_delay("*")
print(f"Crawl delay: {delay or 'Not specified'}")
The can_fetch() method returns True if the specified user agent is allowed to access the path according to the site’s robots.txt rules. Always check this before scraping any new website.
Managing Request Timing
The most common reason scrapers get blocked is making too many requests too quickly. Smart timing makes your scraper both polite and resilient.
# rate_limiting.py
import requests
import time
import random
def polite_get(url, session, min_delay=1.0, max_delay=3.0):
"""Make a request with random delay to avoid detection."""
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
response = session.get(url)
print(f"[{response.status_code}] {url} (waited {delay:.1f}s)")
return response
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
})
urls = [
"https://httpbin.org/get",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
]
for url in urls:
response = polite_get(url, session)
Random delays between 1-3 seconds mimic human browsing patterns. Using requests.Session() maintains cookies across requests, which also makes your scraper behave more like a real browser.
Browser fingerprints: 32 dimensions of who-you-are.
Implementing Retry Logic
Network errors and temporary blocks happen. Retry logic with exponential backoff handles these gracefully without hammering the server.
# retry_logic.py
import requests
import time
from requests.exceptions import RequestException
def fetch_with_retry(url, max_retries=3, backoff_factor=2):
"""Fetch URL with exponential backoff retry."""
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
})
for attempt in range(max_retries):
try:
response = session.get(url, timeout=10)
if response.status_code == 200:
return response
elif response.status_code == 429:
wait = backoff_factor ** attempt
print(f"Rate limited. Waiting {wait}s before retry...")
time.sleep(wait)
else:
print(f"Got status {response.status_code}")
return response
except RequestException as e:
wait = backoff_factor ** attempt
print(f"Error: {e}. Retrying in {wait}s...")
time.sleep(wait)
print("All retries exhausted")
return None
result = fetch_with_retry("https://httpbin.org/get")
if result:
print(f"Success: {result.status_code}")
Output:
Success: 200
Exponential backoff doubles the wait time with each retry (1s, 2s, 4s). This prevents your scraper from making the situation worse when a server is already under load. The timeout=10 parameter prevents hanging on unresponsive servers.
Session and Cookie Management
Using sessions properly is critical. Sessions maintain cookies across requests, which many websites require for normal browsing. Without proper session management, websites may treat each request as a new visitor and trigger anti-bot measures.
# session_management.py
import requests
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
})
# First request sets cookies
response = session.get("https://httpbin.org/cookies/set/session_id/abc123")
print(f"Cookies after first request: {dict(session.cookies)}")
# Subsequent requests send cookies automatically
response = session.get("https://httpbin.org/cookies")
print(f"Server sees cookies: {response.json()['cookies']}")
Output:
Cookies after first request: {'session_id': 'abc123'}
Server sees cookies: {'session_id': 'abc123'}
Always use requests.Session() instead of bare requests.get() for multi-page scraping. It handles cookies, connection pooling, and header persistence automatically.
Respect robots.txt. Rotate user agents. Throttle. Be a good citizen.
Real-Life Example: Building a Resilient Scraper
# resilient_scraper.py
import requests
import time
import random
from urllib.robotparser import RobotFileParser
from bs4 import BeautifulSoup
class ResilientScraper:
def __init__(self, base_url, min_delay=1.0, max_delay=3.0):
self.base_url = base_url
self.min_delay = min_delay
self.max_delay = max_delay
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
})
self.robot_parser = RobotFileParser()
self.robot_parser.set_url(f"{base_url}/robots.txt")
self.robot_parser.read()
def can_scrape(self, path):
return self.robot_parser.can_fetch("*", path)
def get_page(self, path, max_retries=3):
if not self.can_scrape(path):
print(f"Blocked by robots.txt: {path}")
return None
url = f"{self.base_url}{path}"
for attempt in range(max_retries):
delay = random.uniform(self.min_delay, self.max_delay)
time.sleep(delay)
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200:
return BeautifulSoup(response.text, "html.parser")
elif response.status_code == 429:
wait = 2 ** attempt
print(f"Rate limited on {path}. Waiting {wait}s...")
time.sleep(wait)
except Exception as e:
print(f"Error on {path}: {e}")
return None
scraper = ResilientScraper("https://quotes.toscrape.com")
all_quotes = []
for page_num in range(1, 4):
soup = scraper.get_page(f"/page/{page_num}/")
if soup:
quotes = soup.find_all("div", class_="quote")
for q in quotes:
text = q.find("span", class_="text")
author = q.find("small", class_="author")
if text and author:
all_quotes.append({"text": text.text, "author": author.text})
print(f"Page {page_num}: {len(quotes)} quotes")
print(f"Total scraped: {len(all_quotes)} quotes")
This ResilientScraper class combines every technique from this tutorial: robots.txt checking, session management, random delays, retry logic with exponential backoff, and defensive parsing.
Frequently Asked Questions
Is it ethical to bypass anti-scraping measures?
Adding headers and managing timing are standard practices that mimic normal browser behavior. However, bypassing CAPTCHAs, breaking authentication, or ignoring robots.txt crosses ethical lines. Always respect the website’s terms of service and only scrape publicly available data.
Should I use proxy rotation?
Proxy rotation can help distribute requests across multiple IPs, but it should not be used to circumvent explicit blocking. If a website is actively blocking you, that is a signal to stop rather than escalate. Proxies are more appropriate for geographic content access.
What does HTTP 429 mean?
HTTP 429 means “Too Many Requests.” The server is rate limiting you. The correct response is to slow down with exponential backoff, not to try harder. Check the Retry-After header if present — it tells you exactly how long to wait.
Do I need to rotate User-Agent strings?
For most scraping projects, a single realistic User-Agent string is sufficient. Rotating User-Agents is only necessary for high-volume scraping where a single agent string might get flagged. The more important factor is request timing.
Can I get in legal trouble for scraping?
Web scraping laws vary by jurisdiction. In the US, scraping publicly available data is generally legal, but violating a website’s terms of service could have legal consequences. In the EU, GDPR adds restrictions around personal data. When in doubt, consult a legal professional.
Conclusion
Handling anti-scraping measures responsibly is about being a good citizen of the web. We covered checking robots.txt, adding proper headers, managing request timing, implementing retry logic, session management, and building a resilient scraper class that combines all these techniques.
The ResilientScraper class gives you a production-ready foundation. Extend it with logging, database storage, or email alerts for long-running scraping projects.
Some websites load their content dynamically with JavaScript, which means a simple HTTP request with requests only gets you an empty shell. Playwright solves this by controlling a real browser — Chromium, Firefox, or WebKit — letting you scrape pages that rely on JavaScript rendering, single-page applications, and content loaded behind user interactions.
Microsoft’s Playwright library for Python provides a clean async and sync API for browser automation. It installs its own browser binaries, handles waits automatically, and runs headless by default. Combined with BeautifulSoup for HTML parsing, it gives you the power to scrape virtually any website.
In this tutorial, you will learn how to install Playwright, launch a browser, navigate to pages, wait for dynamic content, extract data from JavaScript-rendered pages, handle clicks and scrolling, and build a complete scraper for dynamic websites.
Scraping a Dynamic Page: Quick Example
Here is a minimal example that scrapes quotes from a JavaScript-rendered page that returns empty HTML to regular HTTP requests.
# quick_playwright.py
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://quotes.toscrape.com/js/")
page.wait_for_selector("div.quote")
quotes = page.query_selector_all("div.quote span.text")
for quote in quotes[:5]:
print(quote.text_content())
browser.close()
Output:
"The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking."
"It is our choices, Harry, that show what we truly are, far more than our abilities."
"There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle."
"The person, be it gentleman or lady, who has not pleasure in a good novel, must be intolerably stupid."
"Imperfection is beauty, madness is genius and it's better to be absolutely ridiculous than absolutely boring."
The key difference from BeautifulSoup is that Playwright actually executes the page’s JavaScript before you extract data. The wait_for_selector() call ensures the dynamic content has loaded before scraping.
What Is Playwright and When Do You Need It?
Playwright is a browser automation library that controls real browsers programmatically. Unlike requests + BeautifulSoup which only works with static HTML, Playwright can handle any page a human can see in a browser — including single-page applications built with React, Vue, or Angular.
Feature
requests + BeautifulSoup
Playwright
JavaScript rendering
No
Yes
Speed
Very fast
Slower (browser overhead)
Memory usage
Low
Higher
Login/cookies
Manual
Automatic
Click/scroll
No
Yes
Best for
Static HTML pages
Dynamic/JS-heavy pages
Use Playwright when the page you need to scrape loads content with JavaScript. If the page works with JavaScript disabled, stick with requests and BeautifulSoup — it will be much faster.
Playwright: Selenium’s modern younger sibling. Faster, better.
Installing Playwright for Python
Playwright requires two installation steps: the Python package and the browser binaries.
The playwright install chromium command downloads a specific Chromium build. You can also install firefox or webkit if you need cross-browser testing.
Sync vs Async API
Playwright offers both synchronous and asynchronous APIs. The sync API is simpler for scripts and scraping. The async API is better for high-concurrency applications.
# sync_example.py
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto("https://quotes.toscrape.com/js/")
title = page.title()
print(f"Page title: {title}")
browser.close()
Output:
Page title: Quotes to Scrape
# async_example.py
import asyncio
from playwright.async_api import async_playwright
async def main():
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto("https://quotes.toscrape.com/js/")
title = await page.title()
print(f"Page title: {title}")
await browser.close()
asyncio.run(main())
Output:
Page title: Quotes to Scrape
For most scraping projects, the sync API is perfectly fine. Use async only when you need to scrape multiple pages concurrently.
Headless Chrome, Firefox, WebKit. One API, three browsers.
Waiting for Dynamic Content
The most common mistake with browser-based scraping is trying to extract data before it has loaded. Playwright provides several waiting strategies to handle this.
# waiting_strategies.py
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto("https://quotes.toscrape.com/js/")
# Wait for a specific element to appear
page.wait_for_selector("div.quote", timeout=10000)
# Wait for network to be idle (all AJAX calls done)
page.wait_for_load_state("networkidle")
quotes = page.query_selector_all("div.quote")
print(f"Found {len(quotes)} quotes after waiting")
browser.close()
Output:
Found 10 quotes after waiting
The wait_for_selector() method pauses execution until the element appears in the DOM. The timeout parameter (in milliseconds) prevents infinite waiting if the element never appears.
Extracting Data from the Page
Playwright gives you two approaches for extracting data: using Playwright’s built-in selectors, or passing the rendered HTML to BeautifulSoup.
# extract_with_playwright.py
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto("https://quotes.toscrape.com/js/")
page.wait_for_selector("div.quote")
# Method 1: Playwright selectors
first_quote = page.query_selector("span.text")
print("Playwright:", first_quote.text_content()[:60] + "...")
# Method 2: Pass HTML to BeautifulSoup
html = page.content()
soup = BeautifulSoup(html, "html.parser")
bs_quote = soup.select_one("span.text")
print("BeautifulSoup:", bs_quote.text[:60] + "...")
browser.close()
Output:
Playwright: "The world as we have created it is a process of our thinking...
BeautifulSoup: "The world as we have created it is a process of our thinking...
The BeautifulSoup approach is often better for complex parsing because BeautifulSoup has richer navigation methods. Use page.content() to get the fully rendered HTML and parse it with BeautifulSoup.
Auto-waiting selectors: no more sleep(5) hacks.
Real-Life Example: Scraping a JavaScript-Rendered Quote Page
# js_quote_scraper.py
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import json
def scrape_js_quotes():
all_quotes = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
for page_num in range(1, 11):
url = f"https://quotes.toscrape.com/js/page/{page_num}/"
page.goto(url)
page.wait_for_selector("div.quote", timeout=5000)
html = page.content()
soup = BeautifulSoup(html, "html.parser")
for quote_div in soup.find_all("div", class_="quote"):
text = quote_div.find("span", class_="text")
author = quote_div.find("small", class_="author")
tags_div = quote_div.find("div", class_="tags")
tags = [t.text for t in tags_div.find_all("a")] if tags_div else []
all_quotes.append({
"text": text.text if text else "Unknown",
"author": author.text if author else "Unknown",
"tags": tags
})
print(f"Page {page_num}: {len(soup.find_all('div', class_='quote'))} quotes")
browser.close()
return all_quotes
quotes = scrape_js_quotes()
print(f"Total: {len(quotes)} quotes from JS-rendered pages")
with open("js_quotes.json", "w", encoding="utf-8") as f:
json.dump(quotes, f, indent=2, ensure_ascii=False)
print("Saved to js_quotes.json")
This scraper combines Playwright for rendering with BeautifulSoup for parsing. Reusing the same browser instance across pages is significantly faster than launching a new browser for each page.
Frequently Asked Questions
What does headless mode mean?
Headless mode means the browser runs without a visible window. This is the default and preferred mode for scraping because it uses less memory and runs faster. Set headless=False during development to see what the browser is doing.
How does Playwright compare to Selenium?
Playwright is newer and generally faster than Selenium. It has better auto-waiting, built-in support for multiple browser contexts, and a cleaner API. Selenium has a larger community and longer track record. For new scraping projects, Playwright is the recommended choice.
Can websites detect Playwright?
Yes, some websites use bot detection that can identify automated browsers. Playwright provides stealth options, but sophisticated anti-bot systems can still detect automation. Always respect website terms of service and consider whether scraping a particular site is appropriate.
How can I make Playwright scraping faster?
Disable images and CSS loading with route interception to speed up page loads. Reuse browser instances across multiple pages. Use networkidle wait state only when necessary. For multiple pages, use async mode with concurrent page objects.
Does Playwright use a lot of memory?
Yes, each browser instance uses 50-200MB of RAM. For large scraping jobs, process pages sequentially and close browser contexts when done. Monitor memory usage and restart the browser periodically for long-running scrapers.
Conclusion
Playwright opens up scraping possibilities that static HTTP libraries cannot touch. We covered installation, sync and async APIs, waiting for dynamic content, extracting data with both Playwright selectors and BeautifulSoup, and building a complete scraper for JavaScript-rendered pages.
Playwright installs the browser binaries alongside the Python package — no separate driver dance like Selenium. One install command brings Chrome, Firefox, and WebKit:
# pip install playwright
# python -m playwright install (downloads browser binaries)
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://example.com")
print(page.title())
browser.close()
The sync_playwright() context manager handles the entire browser-process lifecycle. Use browser.close() in production code to free resources promptly.
Async API for Concurrent Scraping
Playwright’s killer feature for scrapers: native async. Crawl 100 URLs concurrently without spinning up 100 browser processes:
import asyncio
from playwright.async_api import async_playwright
async def fetch_title(url, page):
await page.goto(url, wait_until="networkidle")
return url, await page.title()
async def main(urls):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
# One page per concurrent task
results = await asyncio.gather(*[
fetch_title(url, await context.new_page())
for url in urls
])
await browser.close()
for url, title in results:
print(url, "->", title)
asyncio.run(main([
"https://example.com",
"https://python.org",
"https://github.com",
]))
The shared context reuses cookies, local storage, and authentication state across all tabs — efficient when scraping a logged-in site.
Auto-Waiting Selectors
Playwright’s selectors auto-wait for the target to be ready before acting. No more WebDriverWait + EC.presence_of_element_located:
page.goto("https://shop.example.com")
# Click — waits for element to be visible AND enabled
page.click("button#add-to-cart")
# Fill — waits for the input to be present
page.fill("input[name='email']", "user@example.com")
# Select option — waits for dropdown
page.select_option("select#country", "US")
# Wait for navigation after click
with page.expect_navigation():
page.click("a.checkout")
# Wait for a specific selector to appear
page.wait_for_selector("text=Order confirmed", timeout=10000)
The implicit waits are why Playwright tests are dramatically less flaky than Selenium tests. No time.sleep(2) hacks.
Locator API and Best Practices
For complex pages, use the locator() API instead of raw selectors. Locators are lazy and re-resolve on each action — perfect for SPAs that re-render constantly:
# Reusable locator
cart_button = page.locator("button#add-to-cart")
cart_button.click()
cart_button.wait_for(state="hidden") # waits until removed from DOM
# Chained locators (parent -> child)
form = page.locator("form#checkout")
form.locator("input[name='email']").fill("alice@example.com")
form.locator("input[name='zip']").fill("90210")
form.locator("button[type='submit']").click()
# Text-based selector — robust against class-name changes
page.get_by_text("Sign in").click()
page.get_by_role("button", name="Submit").click()
page.get_by_label("Email address").fill("alice@example.com")
The get_by_role and get_by_label helpers are accessibility-first — they target the same semantic elements that screen readers find, making them resilient to CSS / class changes.
Network Interception
Playwright lets you intercept and modify network requests — useful for blocking ads, mocking APIs, or extracting JSON responses:
# Block all images (faster scraping)
page.route("**/*.{png,jpg,jpeg,gif,svg}", lambda route: route.abort())
# Mock an API response
def handle(route):
route.fulfill(json={"status": "ok", "data": [1, 2, 3]})
page.route("**/api/users", handle)
# Capture all XHR responses
page.on("response", lambda resp: print(resp.url, resp.status) if "api" in resp.url else None)
Common Pitfalls
Forgetting to install browsers.pip install playwright doesn’t bring the browser binaries. Run python -m playwright install once after install.
Using time.sleep instead of wait_for. Sleeps make tests flaky and slow. page.wait_for_selector, page.wait_for_load_state, and locator’s implicit waits handle every case.
One context per page when you should share. Each context costs memory. For 100 pages of the same site, use one context and 100 pages.
Ignoring page.on(“dialog”). JavaScript alert() / confirm() dialogs block Playwright forever unless you register a handler that dismisses them.
Headless != real. Some sites detect headless Chrome by missing browser features. Use headless=False for debugging, then test in headless mode separately.
FAQ
Q: Playwright or Selenium?
A: Playwright for new projects — faster, less flaky, better API. Selenium when you need broader browser support (Edge, IE legacy) or you have existing Selenium tests.
Q: Sync or async API?
A: Async for scrapers that hit many URLs concurrently. Sync for test suites that already use synchronous frameworks (pytest, unittest). The APIs are nearly identical; switching later is easy.
Q: Does Playwright handle bot protection?
A: Better than Selenium, but not perfectly. For Cloudflare / DataDome, use the playwright-stealth plugin and rotate proxies. Some sites you simply can’t scrape reliably without residential IPs.
Q: How do I take a screenshot for debugging?
A: page.screenshot(path="screenshot.png", full_page=True). For video recording, set context = browser.new_context(record_video_dir="videos/").
Q: Can I reuse browser state across runs?
A: Yes — save context state with context.storage_state(path="state.json") and load with browser.new_context(storage_state="state.json"). Cookies and local storage persist between sessions.
Wrapping Up
Playwright fixes nearly every Selenium pain point — driver dance, flaky waits, slow startup, awkward async. For new web-scraping or browser-automation projects in Python, it’s the right default. The locator API + auto-waiting + native async covers 95% of use cases. The remaining 5% (Cloudflare bypass, mobile emulation, headless detection) needs specialty tools regardless of framework.
You need data from a website, but there is no API available. Maybe you want to track product prices, collect research data, or aggregate job listings from multiple sources. Web scraping with Python and BeautifulSoup lets you extract structured data from HTML pages quickly and reliably — and in 2026, the fundamentals remain as relevant as ever.
Python’s requests library handles HTTP connections while BeautifulSoup (from the bs4 package) parses the HTML and lets you navigate the document tree with simple, readable methods. Both are pure Python, install in seconds with pip, and work on every platform.
In this tutorial, you will learn how to fetch web pages, parse HTML with BeautifulSoup, extract text and attributes, work with tables, handle multiple pages, and build a complete scraping project. By the end, you will have a reusable scraping toolkit you can adapt to any static website.
Scraping a Web Page: Quick Example
Let us start with a minimal working example that fetches quotes from a practice website and prints them out.
# quick_scrape.py
import requests
from bs4 import BeautifulSoup
url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
quotes = soup.find_all("span", class_="text")
for quote in quotes[:5]:
print(quote.text)
Output:
"The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking."
"It is our choices, Harry, that show what we truly are, far more than our abilities."
"There are only two ways to live your life. One is as though nothing is a miracle. The other is as though everything is a miracle."
"The person, be it gentleman or lady, who has not pleasure in a good novel, must be intolerably stupid."
"Imperfection is beauty, madness is genius and it's better to be absolutely ridiculous than absolutely boring."
Three lines do the heavy lifting: requests.get() fetches the page, BeautifulSoup() parses the HTML, and find_all() extracts matching elements. The rest of this tutorial builds on these patterns.
What Is BeautifulSoup and Why Use It?
BeautifulSoup is a Python library that parses HTML and XML documents into a tree structure you can search and navigate with Python code. Think of it like a smart search tool for web pages — instead of working with raw text strings, you work with structured elements that know about their tags, attributes, parents, and children.
The library handles malformed HTML gracefully, which matters because real-world web pages are often messy. Missing closing tags, inconsistent indentation, and mixed encoding are all things BeautifulSoup handles without crashing.
Feature
BeautifulSoup
Regular Expressions
lxml
Learning curve
Low
High
Medium
Handles broken HTML
Yes
No
Partially
Speed
Moderate
Fast
Very fast
CSS selectors
Yes
No
Yes
Best for
Most scraping tasks
Simple patterns
Large documents
For most web scraping projects, BeautifulSoup with the html.parser backend gives you the best balance of simplicity and reliability.
Finding Elements: find() and find_all()
The two methods you will use most are find() (returns the first match) and find_all() (returns all matches). Both accept tag names, CSS classes, IDs, and attribute dictionaries.
# finding_elements.py
import requests
from bs4 import BeautifulSoup
url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
first_quote = soup.find("span", class_="text")
print("First quote:", first_quote.text[:60] + "...")
authors = soup.find_all("small", class_="author")
print("Authors on page:", [a.text for a in authors[:5]])
tags_div = soup.find("div", attrs={"class": "tags"})
if tags_div:
tag_links = tags_div.find_all("a", class_="tag")
print("Tags:", [t.text for t in tag_links])
Output:
First quote: "The world as we have created it is a process of our...
Authors on page: ['Albert Einstein', 'J.K. Rowling', 'Albert Einstein', 'Jane Austen', 'Marilyn Monroe']
Tags: ['change', 'deep-thoughts', 'thinking', 'world']
Notice the defensive pattern with if tags_div: before calling find_all(). Real websites are messy — elements might be missing, classes might change, or content might be empty. Defensive parsing separates a scraper that crashes on page 3 from one that runs reliably across thousands of pages.
Using CSS Selectors with select()
If you already know CSS, the select() method lets you use CSS selector syntax to find elements. This is often more concise than chaining find() calls.
# css_selectors.py
import requests
from bs4 import BeautifulSoup
url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
quotes = soup.select("div.quote span.text")
print(f"Found {len(quotes)} quotes using CSS selector")
author_links = soup.select("div.quote span a")
print("Author pages:", [a["href"] for a in author_links[:3]])
first_tag = soup.select_one("a.tag")
print("First tag:", first_tag.text if first_tag else "None found")
Output:
Found 10 quotes using CSS selector
Author pages: ['/author/Albert-Einstein', '/author/J-K-Rowling', '/author/Albert-Einstein']
First tag: change
The select() method returns a list while select_one() returns the first match or None. CSS selectors are particularly useful when elements are deeply nested.
Extracting Text, Attributes, and Links
Once you have found an element, you need to extract useful data from it. BeautifulSoup gives you .text for visible text content, .get() for attributes, and .attrs for the full attribute dictionary.
# extracting_data.py
import requests
from bs4 import BeautifulSoup
url = "https://quotes.toscrape.com"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
quote_div = soup.find("div", class_="quote")
if quote_div:
quote_text = quote_div.find("span", class_="text")
print("Quote:", quote_text.text if quote_text else "Unknown")
author_link = quote_div.find("a")
href = author_link.get("href", "#") if author_link else "#"
print("Author page:", href)
tags_container = quote_div.find("div", class_="tags")
if tags_container:
tags = [tag.text for tag in tags_container.find_all("a")]
print("Tags:", tags)
Output:
Quote: "The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking."
Author page: /author/Albert-Einstein
Tags: ['change', 'deep-thoughts', 'thinking', 'world']
The .get() method with a default value is safer than direct dictionary access like element["href"] — it returns your default instead of raising a KeyError if the attribute is missing.
Handling Pagination
Most websites split content across multiple pages. To scrape all pages, find the “next page” link and follow it until there are no more pages.
# pagination.py
import requests
from bs4 import BeautifulSoup
import time
base_url = "https://quotes.toscrape.com"
all_quotes = []
url = base_url + "/page/1/"
while url:
print(f"Scraping: {url}")
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
for quote_div in soup.find_all("div", class_="quote"):
text = quote_div.find("span", class_="text")
author = quote_div.find("small", class_="author")
if text and author:
all_quotes.append({"text": text.text, "author": author.text})
next_btn = soup.find("li", class_="next")
if next_btn:
next_link = next_btn.find("a")
url = base_url + next_link["href"] if next_link else None
else:
url = None
time.sleep(1)
print(f"Total quotes scraped: {len(all_quotes)}")
print(f"Sample: {all_quotes[0]['author']}: {all_quotes[0]['text'][:50]}...")
Output:
Scraping: https://quotes.toscrape.com/page/1/
Scraping: https://quotes.toscrape.com/page/2/
...
Scraping: https://quotes.toscrape.com/page/10/
Total quotes scraped: 100
Sample: Albert Einstein: "The world as we have created it is a process of o...
The time.sleep(1) between requests is essential. Without it, you risk overwhelming the server and getting your IP blocked.
Saving Scraped Data to CSV and JSON
Once you have extracted data, you typically want to save it for analysis. Python’s built-in csv and json modules handle this without extra dependencies.
# save_data.py
import csv
import json
quotes = [
{"text": "Be yourself; everyone else is already taken.", "author": "Oscar Wilde"},
{"text": "Two things are infinite: the universe and human stupidity.", "author": "Albert Einstein"},
{"text": "Be the change that you wish to see in the world.", "author": "Mahatma Gandhi"},
]
with open("quotes.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["text", "author"])
writer.writeheader()
writer.writerows(quotes)
print("Saved to quotes.csv")
with open("quotes.json", "w", encoding="utf-8") as f:
json.dump(quotes, f, indent=2, ensure_ascii=False)
print("Saved to quotes.json")
Output:
Saved to quotes.csv
Saved to quotes.json
Always specify encoding="utf-8" when opening files for scraped data. Web content often contains special characters that will cause errors with the default system encoding.
Real-Life Example: Building a Job Listing Scraper
Let us build a complete scraper that extracts job listings from the Fake Jobs practice site, processes the data, and saves it as both CSV and JSON.
# job_scraper.py
import requests
from bs4 import BeautifulSoup
import csv
import json
def scrape_jobs(url):
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
jobs = []
cards = soup.find_all("div", class_="card-content")
for card in cards:
title_elem = card.find("h2", class_="title")
company_elem = card.find("h3", class_="company")
location_elem = card.find("p", class_="location")
date_elem = card.find("time")
job = {
"title": title_elem.text.strip() if title_elem else "Unknown",
"company": company_elem.text.strip() if company_elem else "Unknown",
"location": location_elem.text.strip() if location_elem else "Unknown",
"date_posted": date_elem.text.strip() if date_elem else "Unknown",
}
apply_link = card.find("a", string="Apply")
job["apply_url"] = apply_link["href"] if apply_link else "N/A"
jobs.append(job)
return jobs
def save_results(jobs, csv_path, json_path):
with open(csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=jobs[0].keys())
writer.writeheader()
writer.writerows(jobs)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(jobs, f, indent=2, ensure_ascii=False)
url = "https://realpython.github.io/fake-jobs/"
jobs = scrape_jobs(url)
print(f"Scraped {len(jobs)} job listings")
save_results(jobs, "jobs.csv", "jobs.json")
print("Data saved to jobs.csv and jobs.json")
Output:
Scraped 100 job listings
Data saved to jobs.csv and jobs.json
This scraper uses defensive parsing throughout — every find() result is checked before accessing .text, and default values handle missing elements. You can extend this by adding pagination, filtering by location, or scheduling it to run daily.
Python’s itertools module is a powerhouse for creating efficient iterators that save memory and speed up your data processing. Instead of building entire lists in memory, itertools generates values on the fly, making it perfect for working with large datasets or creating complex iteration patterns.
If you have ever needed to combine multiple lists, group data by a key, or generate all possible permutations of a set, itertools has a clean, optimized solution ready to go. These tools are implemented in C under the hood, so they run significantly faster than equivalent pure Python code.
In this tutorial, you will learn the most practical itertools functions through real examples. We will cover infinite iterators, combinatoric generators, data grouping, chaining, and filtering — everything you need to write more Pythonic and memory-efficient code.
Quick Answer
The itertools module provides fast, memory-efficient iterator building blocks. Key functions include chain() for combining iterables, groupby() for grouping data, product() for cartesian products, combinations() and permutations() for combinatorics, and islice() for slicing iterators. Import with from itertools import chain, groupby, product, combinations.
Quick Example
from itertools import chain, islice, count
# Chain multiple iterables together seamlessly
combined = chain([1, 2, 3], ['a', 'b'], [True, False])
print(list(combined))
# Take the first 5 even numbers from an infinite counter
evens = (x for x in count(0, 2))
first_five = list(islice(evens, 5))
print(first_five)
[1, 2, 3, ‘a’, ‘b’, True, False]
[0, 2, 4, 6, 8]
The chain() function combines three separate iterables into one seamless stream without creating a new list in memory. The islice() function safely takes a slice from an infinite iterator, something you cannot do with regular list slicing.
What Is the Itertools Module?
The itertools module is part of Python’s standard library and provides a collection of fast, memory-efficient tools for creating and working with iterators. The module is inspired by constructs from functional programming languages like APL, Haskell, and SML.
The key advantage of itertools is lazy evaluation. Instead of building a complete list in memory, each function produces values one at a time as they are requested. This means you can work with datasets much larger than your available RAM, process infinite sequences, and build complex data pipelines that remain efficient.
The functions in itertools fall into three categories: infinite iterators that produce values forever, finite iterators that process input sequences, and combinatoric generators that produce arrangements of elements. All of them are implemented in C for maximum performance.
itertools.chain, cycle, repeat. The infinite generators you didn’t know you needed.
Infinite Iterators
count — Infinite Counter
The count() function generates an endless sequence of numbers starting from a given value with a specified step. You must always use it with something that limits the output, like islice() or a break in a loop.
from itertools import count, islice
# Count from 10 with step 5
counter = count(10, 5)
print(list(islice(counter, 6)))
# Useful for generating IDs
def id_generator(prefix="ID"):
for num in count(1):
yield f"{prefix}-{num:04d}"
ids = id_generator("ORD")
print([next(ids) for _ in range(4)])
The cycle() function takes an iterable and repeats it infinitely. This is perfect for round-robin scheduling, alternating patterns, or rotating through a fixed set of options.
from itertools import cycle, islice
# Alternate between teams for task assignment
teams = cycle(["Alpha", "Beta", "Gamma"])
tasks = ["Deploy v2.1", "Fix login bug", "Update docs",
"Database migration", "API refactor", "Write tests", "Code review"]
assignments = {task: next(teams) for task in tasks}
for task, team in assignments.items():
print(f" {team}: {task}")
The repeat() function produces the same value over and over, either infinitely or a specified number of times. It is commonly used with map() or zip() to provide a constant value alongside changing data.
from itertools import repeat
# Create a list of default configurations
defaults = list(repeat({"enabled": True, "retries": 3}, 4))
print(defaults)
# Use with map for element-wise operations
import operator
bases = [2, 3, 4, 5]
squared = list(map(operator.pow, bases, repeat(2)))
print(f"Squared: {squared}")
cubed = list(map(operator.pow, bases, repeat(3)))
print(f"Cubed: {cubed}")
The chain() function links multiple iterables together into a single continuous stream. It processes the first iterable completely, then moves to the second, and so on — all without creating an intermediate list.
from itertools import chain
# Merge data from multiple sources
database_users = ["alice", "bob"]
api_users = ["charlie", "diana"]
file_users = ["eve"]
all_users = list(chain(database_users, api_users, file_users))
print(f"All users: {all_users}")
# chain.from_iterable flattens a list of lists
nested_data = [["python", "java"], ["rust", "go"], ["ruby"]]
flat = list(chain.from_iterable(nested_data))
print(f"Flattened: {flat}")
The groupby() function groups consecutive elements that share the same key. The data must be sorted by the grouping key first, or you will get unexpected results.
from itertools import groupby
from operator import itemgetter
# Group transactions by category
transactions = [
{"category": "food", "amount": 25.50},
{"category": "food", "amount": 12.00},
{"category": "transport", "amount": 35.00},
{"category": "transport", "amount": 15.50},
{"category": "entertainment", "amount": 45.00},
{"category": "food", "amount": 8.75},
]
# Sort first, then group
sorted_trans = sorted(transactions, key=itemgetter("category"))
for category, group in groupby(sorted_trans, key=itemgetter("category")):
items = list(group)
total = sum(t["amount"] for t in items)
print(f" {category}: {len(items)} transactions, ${total:.2f}")
Important: The data must be sorted by the same key you pass to groupby. If unsorted, groupby will create a new group every time the key changes, resulting in multiple groups for the same key value.
islice — Slice Any Iterator
The islice() function works like regular list slicing but on any iterator, including infinite ones and generators. Unlike list slicing, it does not support negative indices because iterators cannot go backwards.
from itertools import islice
# Slice a generator (can't use regular slicing)
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Get fibonacci numbers 10 through 15
fib_slice = list(islice(fibonacci(), 10, 16))
print(f"Fibonacci 10-15: {fib_slice}")
# Read first 3 lines from a large dataset (simulated)
data_lines = (f"Row {i}: data_{i}" for i in range(1000000))
preview = list(islice(data_lines, 3))
print(f"Preview: {preview}")
Lazy iteration. Memory stays constant, time stays linear.
Combinatoric Iterators
product — Cartesian Product
The product() function computes the cartesian product of input iterables, equivalent to nested for loops. This is perfect for generating all combinations of options.
from itertools import product
# Generate all t-shirt variants
sizes = ["S", "M", "L"]
colors = ["Red", "Blue"]
styles = ["V-neck", "Crew"]
variants = list(product(sizes, colors, styles))
print(f"Total variants: {len(variants)}")
for v in variants[:6]:
print(f" {v[0]} {v[1]} {v[2]}")
# Generate grid coordinates
grid = list(product(range(3), range(3)))
print(f"\n3x3 grid: {grid}")
Total variants: 12
S Red V-neck
S Red Crew
S Blue V-neck
S Blue Crew
M Red V-neck
M Red Crew
The combinations() function generates all unique groups of a given size where order does not matter. The permutations() function generates all arrangements where order does matter.
from itertools import combinations, permutations
# Pick 2-person teams from 4 candidates
candidates = ["Alice", "Bob", "Charlie", "Diana"]
teams = list(combinations(candidates, 2))
print(f"Possible teams ({len(teams)}):")
for team in teams:
print(f" {team[0]} & {team[1]}")
# Order matters for race finish positions
runners = ["A", "B", "C"]
finishes = list(permutations(runners))
print(f"\nPossible race finishes ({len(finishes)}):")
for f in finishes:
print(f" 1st: {f[0]}, 2nd: {f[1]}, 3rd: {f[2]}")
Possible teams (6):
Alice & Bob
Alice & Charlie
Alice & Diana
Bob & Charlie
Bob & Diana
Charlie & Diana
Possible race finishes (6):
1st: A, 2nd: B, 3rd: C
1st: A, 2nd: C, 3rd: B
1st: B, 2nd: A, 3rd: C
1st: B, 2nd: C, 3rd: A
1st: C, 2nd: A, 3rd: B
1st: C, 2nd: B, 3rd: A
Real-Life Project: Building a Data Pipeline with Itertools
Let us build a practical data processing pipeline that uses multiple itertools functions to efficiently process log data. This pipeline chains, filters, groups, and summarizes data while keeping memory usage minimal.
from itertools import chain, groupby, islice, accumulate
from operator import itemgetter
from collections import Counter
import operator
# Simulated log data from multiple servers
server1_logs = [
{"timestamp": "2024-01-15 10:01", "level": "INFO", "service": "auth", "message": "User login"},
{"timestamp": "2024-01-15 10:02", "level": "ERROR", "service": "auth", "message": "Invalid token"},
{"timestamp": "2024-01-15 10:05", "level": "INFO", "service": "api", "message": "Request processed"},
{"timestamp": "2024-01-15 10:07", "level": "WARNING", "service": "db", "message": "Slow query"},
]
server2_logs = [
{"timestamp": "2024-01-15 10:01", "level": "INFO", "service": "api", "message": "Health check"},
{"timestamp": "2024-01-15 10:03", "level": "ERROR", "service": "api", "message": "Timeout"},
{"timestamp": "2024-01-15 10:04", "level": "ERROR", "service": "db", "message": "Connection lost"},
{"timestamp": "2024-01-15 10:06", "level": "INFO", "service": "auth", "message": "Token refreshed"},
]
# Step 1: Chain all logs together
all_logs = list(chain(server1_logs, server2_logs))
print(f"Total log entries: {len(all_logs)}")
# Step 2: Sort and group by service
sorted_by_service = sorted(all_logs, key=itemgetter("service"))
print("\nLogs by service:")
for service, logs in groupby(sorted_by_service, key=itemgetter("service")):
log_list = list(logs)
error_count = sum(1 for l in log_list if l["level"] == "ERROR")
print(f" {service}: {len(log_list)} entries ({error_count} errors)")
# Step 3: Sort and group by log level
sorted_by_level = sorted(all_logs, key=itemgetter("level"))
print("\nLogs by level:")
for level, logs in groupby(sorted_by_level, key=itemgetter("level")):
count = len(list(logs))
print(f" {level}: {count}")
# Step 4: Running error count using accumulate
error_flags = [1 if log["level"] == "ERROR" else 0 for log in all_logs]
running_errors = list(accumulate(error_flags, operator.add))
print(f"\nRunning error count: {running_errors}")
print(f"Total errors: {running_errors[-1]}")
# Step 5: Get the latest 3 entries
latest = list(islice(sorted(all_logs, key=itemgetter("timestamp"), reverse=True), 3))
print("\nLatest 3 entries:")
for entry in latest:
print(f" [{entry['level']}] {entry['timestamp']} - {entry['service']}: {entry['message']}")
groupby: structure your iterator by neighboring keys.
Common Pitfalls and Troubleshooting
Problem
Cause
Solution
groupby returns unexpected groups
Data not sorted by grouping key
Always sort by the same key before calling groupby
Iterator exhausted after first use
Iterators can only be consumed once
Convert to list if you need multiple passes, or use tee()
Memory error with product()
Cartesian product of large sets creates huge output
Use islice() to limit output, or process items one at a time
Infinite loop with count() or cycle()
No termination condition
Always pair infinite iterators with islice(), takewhile(), or break
accumulate gives wrong type
Initial value type mismatch
Pass an explicit initial value matching your expected type
combinations_with_replacement unexpected
Confused with regular combinations
Use combinations() for no repeats, combinations_with_replacement() when repeats are allowed
Frequently Asked Questions
What is the difference between itertools.chain and list concatenation?
chain() creates a lazy iterator that processes elements one at a time without creating a new list in memory. List concatenation with + creates an entirely new list containing all elements from both sources. For large datasets, chain is significantly more memory-efficient because it never holds more than one element at a time.
Can I use itertools with pandas DataFrames?
Yes, but with some caveats. Itertools functions work with any iterable, so you can use them on DataFrame columns, rows from iterrows(), or index values. However, pandas has its own optimized methods like groupby, merge, and concat that are usually faster for DataFrame operations. Use itertools when working with pure Python iterables or when pandas does not have an equivalent function.
How do I restart an exhausted iterator?
You cannot restart a consumed iterator. Instead, use itertools.tee() to create independent copies before consuming, store the data in a list if it fits in memory, or recreate the iterator from the original source. For generators, call the generator function again to get a fresh iterator.
When should I use product() versus nested for loops?
product() is cleaner and more readable than nested loops when you need all combinations of multiple iterables. It also makes it easy to dynamically change the number of nested dimensions. Use nested for loops when you need complex logic between iterations, early exits, or when only some combinations should be processed.
Is itertools faster than list comprehensions?
For simple operations, list comprehensions and itertools have similar speed. The main advantage of itertools is memory efficiency rather than raw speed. When processing millions of items, itertools avoids building large intermediate lists, which prevents memory issues and can actually be faster due to reduced memory allocation overhead. For small datasets, the difference is negligible.
Conclusion
The itertools module transforms how you handle iteration in Python. You learned how infinite iterators like count() and cycle() create endless streams, how chain() and groupby() organize data efficiently, how combinatoric tools like product() and combinations() generate arrangements, and how islice() safely limits output from any iterator.
The data pipeline example showed how these tools compose naturally to build efficient processing chains. Start by replacing list concatenation with chain() and nested loops with product() in your existing code — those two changes will immediately make your code more readable and memory-efficient. As you get comfortable, explore groupby() and accumulate() for more advanced data processing patterns.
Python’s functools module is a treasure chest of higher-order functions that transform how you write and compose functions. Whether you need to cache expensive computations, create partial function applications, or build powerful decorators, functools provides elegant solutions that make your code cleaner and faster.
If you have ever written the same wrapper logic around multiple functions, or wished you could “freeze” some arguments into a function call, functools is exactly what you need. It sits in the standard library, so there is nothing extra to install — just import and go.
In this tutorial, you will master the most practical functools tools with hands-on examples you can use in real projects today. We will cover caching, partial application, function composition, comparison helpers, and more.
Quick Answer
The functools module provides higher-order functions for working with callable objects. The most commonly used tools are @lru_cache for memoization, partial() for freezing function arguments, reduce() for cumulative operations, and @wraps for building proper decorators. Import with from functools import lru_cache, partial, reduce, wraps.
Quick Example
from functools import lru_cache
@lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(50))
print(fibonacci.cache_info())
Without the cache, computing fibonacci(50) would take an impossibly long time due to exponential recursive calls. With @lru_cache, it returns instantly because each unique input is computed only once.
What Is the Functools Module?
The functools module is part of Python's standard library and provides functions that act on or return other functions. The name stands for "function tools," and every utility in the module helps you work with callables more effectively.
Higher-order functions are functions that take other functions as arguments or return functions as results. This is a core concept in functional programming, and functools brings these ideas into Python in a practical, Pythonic way. You do not need to adopt a fully functional style — you can sprinkle these tools into your existing object-oriented or procedural code wherever they help.
The module has been part of Python since version 2.5 and has gained powerful additions over the years. Python 3.8 added cached_property, and Python 3.9 improved cache as a simpler alias for unbounded LRU caching.
@lru_cache: when the same call returns the same result, remember it.
Core Functools Tools
lru_cache — Automatic Memoization
The @lru_cache decorator caches function results based on the arguments passed. LRU stands for "Least Recently Used," meaning when the cache reaches its maximum size, the oldest unused entries get evicted first.
from functools import lru_cache
import time
@lru_cache(maxsize=256)
def expensive_lookup(user_id):
# Simulate a slow database query
time.sleep(0.5)
return {"id": user_id, "name": f"User_{user_id}", "active": True}
# First call takes 0.5 seconds
start = time.time()
result = expensive_lookup(42)
print(f"First call: {time.time() - start:.3f}s -> {result}")
# Second call is instant (cached)
start = time.time()
result = expensive_lookup(42)
print(f"Cached call: {time.time() - start:.6f}s -> {result}")
# Check cache statistics
print(expensive_lookup.cache_info())
# Clear the cache when needed
expensive_lookup.cache_clear()
Important: All arguments to a cached function must be hashable (strings, numbers, tuples). You cannot cache functions that accept lists or dictionaries as arguments. Convert them to tuples or frozensets first.
cache — Unbounded Memoization
Python 3.9 introduced @cache as a simpler alternative to @lru_cache(maxsize=None). It caches every unique call forever, which is perfect when you know the set of possible inputs is bounded.
from functools import cache
@cache
def factorial(n):
if n <= 1:
return 1
return n * factorial(n - 1)
print(factorial(10))
print(factorial(20)) # Reuses cached results for 1-10
print(factorial.cache_info())
The partial() function creates a new callable with some arguments pre-filled. This is incredibly useful when you need to pass a function somewhere that expects fewer arguments than your function takes.
25
64
[INFO] AUTH: User logged in
[WARNING] AUTH: Failed login attempt
[ERROR] DATABASE: Connection timeout
reduce — Cumulative Operations
The reduce() function applies a two-argument function cumulatively to the items of a sequence, reducing it to a single value. It processes items left to right, carrying the result forward at each step.
from functools import reduce
# Sum of all numbers (same as built-in sum)
numbers = [1, 2, 3, 4, 5]
total = reduce(lambda a, b: a + b, numbers)
print(f"Sum: {total}")
# Find maximum value (same as built-in max)
largest = reduce(lambda a, b: a if a > b else b, numbers)
print(f"Max: {largest}")
# Flatten nested lists
nested = [[1, 2], [3, 4], [5, 6]]
flat = reduce(lambda a, b: a + b, nested)
print(f"Flattened: {flat}")
# Build a dictionary from pairs
pairs = [("name", "Alice"), ("age", 30), ("city", "Melbourne")]
result = reduce(lambda d, pair: {**d, pair[0]: pair[1]}, pairs, {})
print(f"Dict: {result}")
Tip: While reduce is powerful, Python's built-in functions like sum(), max(), min(), and any() are more readable for common cases. Use reduce when you need a custom accumulation pattern that does not have a built-in equivalent.
wraps — Build Proper Decorators
When you write a decorator, the wrapper function replaces the original function's metadata (name, docstring, signature). The @wraps decorator preserves this metadata, which is essential for debugging and documentation tools.
from functools import wraps
import time
def timing_decorator(func):
@wraps(func) # Preserves func's __name__, __doc__, etc.
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
@timing_decorator
def process_data(items):
"""Process a list of items and return the total."""
return sum(items)
# The function metadata is preserved
print(f"Name: {process_data.__name__}")
print(f"Doc: {process_data.__doc__}")
print(f"Result: {process_data(range(1000000))}")
Name: process_data
Doc: Process a list of items and return the total.
process_data took 0.0234s
Result: 499999500000
cached_property — One-Time Computed Attributes
The @cached_property decorator turns a method into a property that is computed once and then cached as a normal attribute. This is perfect for expensive calculations that do not change after the object is created.
Computing mean...
5000.5
5000.5
Building summary...
Computing standard deviation...
{'count': 10000, 'mean': 5000.5, 'std_dev': 2886.896, 'min': 1, 'max': 10000}
total_ordering — Complete Comparison Methods
The @total_ordering class decorator lets you define just __eq__ and one ordering method (__lt__, __le__, __gt__, or __ge__), and it fills in the rest automatically.
Real-Life Project: Building a Plugin System with Functools
Let us build a practical plugin system for a web application that uses several functools features together. This system registers handler functions, caches their results, and supports partial configuration.
from functools import wraps, partial, lru_cache, reduce
from collections import defaultdict
import time
import json
class PluginRegistry:
"""A plugin system using functools for caching and composition."""
def __init__(self):
self._plugins = defaultdict(list)
self._middleware = []
def register(self, event_type, priority=0):
"""Decorator to register a handler for an event type."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._priority = priority
self._plugins[event_type].append(wrapper)
self._plugins[event_type].sort(
key=lambda f: f._priority, reverse=True
)
return wrapper
return decorator
def add_middleware(self, middleware_func):
"""Add a middleware function that processes all events."""
self._middleware.append(middleware_func)
@lru_cache(maxsize=64)
def get_handlers(self, event_type):
"""Get cached tuple of handlers for an event type."""
return tuple(self._plugins.get(event_type, []))
def emit(self, event_type, data):
"""Emit an event through middleware then to handlers."""
# Apply middleware chain using reduce
processed = reduce(
lambda d, mw: mw(event_type, d),
self._middleware,
data
)
handlers = self._plugins.get(event_type, [])
results = []
for handler in handlers:
result = handler(processed)
if result is not None:
results.append(result)
return results
# Create registry and register plugins
registry = PluginRegistry()
# Middleware: add timestamp to all events
def timestamp_middleware(event_type, data):
return {**data, "timestamp": time.time()}
# Middleware: log all events
def logging_middleware(event_type, data):
print(f" [LOG] Event '{event_type}' with keys: {list(data.keys())}")
return data
registry.add_middleware(timestamp_middleware)
registry.add_middleware(logging_middleware)
@registry.register("user.login", priority=10)
def validate_login(data):
"""Check if the user credentials are valid."""
if data.get("username") and data.get("password"):
return {"status": "validated", "user": data["username"]}
return {"status": "invalid"}
@registry.register("user.login", priority=5)
def record_login(data):
"""Record the login attempt for analytics."""
return {"recorded": True, "user": data.get("username")}
@registry.register("data.transform", priority=10)
def normalize_data(data):
"""Normalize string fields to lowercase."""
return {
k: v.lower() if isinstance(v, str) else v
for k, v in data.items()
}
# Using partial to create pre-configured event emitters
emit_login = partial(registry.emit, "user.login")
emit_transform = partial(registry.emit, "data.transform")
# Emit events
print("Login event:")
results = emit_login({"username": "alice", "password": "secret123"})
for r in results:
print(f" Result: {r}")
print("\nTransform event:")
results = emit_transform({"Name": "BOB", "City": "MELBOURNE", "age": 25})
for r in results:
print(f" Result: {r}")
partial(): freeze arguments and pass the function. Currying, almost.
Common Pitfalls and Troubleshooting
Problem
Cause
Solution
TypeError: unhashable type with lru_cache
Passing a list or dict as argument to cached function
Convert to tuple or frozenset before passing
Memory growing with @cache
Unbounded cache stores every unique call
Use @lru_cache(maxsize=N) to limit cache size
cached_property not updating
Value computed once and stored as attribute
Delete the attribute with del obj.prop to force recompute
Decorated function loses metadata
Missing @wraps in decorator
Add @wraps(func) to every decorator wrapper
reduce gives unexpected result
Missing initial value argument
Pass initializer as third argument to reduce()
partial kwargs overridden
Caller passes same keyword argument
Document which args are frozen or use positional args
Frequently Asked Questions
What is the difference between @cache and @lru_cache?
@cache is equivalent to @lru_cache(maxsize=None). It stores every unique call result forever without evicting old entries. Use @lru_cache(maxsize=N) when you want to limit memory usage by keeping only the N most recent unique results. For most applications, @lru_cache with a reasonable maxsize is the safer choice.
Can I use lru_cache on class methods?
Yes, but with a caveat. The self parameter becomes part of the cache key, meaning each instance gets its own cache entries. For instance-level caching, consider @cached_property instead. For class-level caching, use a module-level function or a custom descriptor.
Is functools.reduce the same as a for loop?
Functionally, yes — reduce performs the same cumulative operation you could write with a for loop. However, reduce expresses the intent more declaratively. Use reduce when the accumulation pattern is clear and concise. If the logic is complex or needs multiple lines, a regular for loop is more readable and Pythonic.
How do I clear a cached_property value?
Delete the attribute using del instance.property_name. The next access will recompute and re-cache the value. This works because cached_property stores the result as a regular instance attribute that shadows the descriptor.
When should I use partial instead of a lambda?
Use partial() when you want to freeze arguments of an existing function — it is more readable, preserves the original function's metadata, and works better with pickling. Use a lambda when you need a quick inline expression or when the logic goes beyond simple argument freezing. In general, partial is preferred for configuration-style currying.
@wraps: keep the metadata when you wrap a function. Use it always.
Conclusion
The functools module gives you powerful tools that make Python functions more flexible and efficient. You learned how @lru_cache and @cache can dramatically speed up expensive or recursive functions, how partial() creates specialized versions of general functions, how reduce() handles cumulative operations, and how @wraps keeps your decorators well-behaved.
These tools work beautifully together, as the plugin system example showed. Start by adding @lru_cache to your most expensive functions and @wraps to your decorators — those two changes alone will improve most Python projects. From there, explore partial() and cached_property as your needs grow.
partial freezes some arguments; the resulting callable accepts the rest. Perfect for adapting a function's signature to a callback that expects fewer arguments.
@lru_cache turns any pure function into a memoized one. Calls with the same arguments return the cached result. maxsize=None for unbounded; finite values evict least-recently-used entries.
@cache: lru_cache With Unbounded Memory
from functools import cache
@cache
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
print(fibonacci(100)) # instant, even though recursive
@cache (Python 3.9+) is shorthand for @lru_cache(maxsize=None) — slightly faster, simpler API.
reduce(): Fold a Sequence
from functools import reduce
# Sum of a list (just use sum())
total = reduce(lambda acc, x: acc + x, [1, 2, 3, 4, 5]) # 15
# Product
import operator
product = reduce(operator.mul, [1, 2, 3, 4, 5]) # 120
# Max
maximum = reduce(lambda a, b: a if a > b else b, [5, 3, 8, 1]) # 8
# Build a dict from key-value pairs
pairs = [("a", 1), ("b", 2), ("c", 3)]
d = reduce(lambda acc, kv: {**acc, kv[0]: kv[1]}, pairs, {})
reduce is powerful but often overkill. For common operations (sum, max, min) Python has builtins. Reach for reduce only when the operation doesn't have a standard equivalent.
@wraps: Preserve Decorated Function Metadata
from functools import wraps
import time
def timed(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
print(f"{func.__name__} took {time.perf_counter()-t0:.4f}s")
return result
return wrapper
@timed
def slow_function(x):
"""Does something slow."""
time.sleep(0.1)
return x
slow_function(5)
print(slow_function.__name__) # 'slow_function' (not 'wrapper')
print(slow_function.__doc__) # 'Does something slow.' (preserved)
Without @wraps, your decorated function's __name__ and __doc__ become 'wrapper' and None — breaks help(), Sphinx, inspect. Always use @wraps on decorator inner functions.
Python's answer to method overloading. The right implementation runs based on the FIRST argument's type. Useful for serializers, formatters, anything that's polymorphic.
Common Pitfalls
lru_cache on mutable arguments. Arguments must be hashable. Lists, dicts, sets — won't cache. Convert to tuple/frozenset/string before calling.
Method caching memory leak.@lru_cache on a method captures self in the cache key, preventing instance garbage collection. Use cached_property for instance methods.
Decorator without @wraps. Loses function metadata. Always use @wraps inside decorator definitions.
Mutating cached return. @cache returns a reference, not a copy. Modifying the result corrupts the cache. Make defensive copies if the caller might mutate.
singledispatch on positional args. Only dispatches on the FIRST argument. For multi-argument dispatch, use plum or multipledispatch.
FAQ
Q: lru_cache or cachetools? A: lru_cache for simple cases (one function, in-memory, no TTL). cachetools when you need TTLs, custom eviction policies, or thread safety beyond stdlib.
Q: Will lru_cache leak memory? A: With finite maxsize, no — evicts LRU entries. With maxsize=None (or @cache), can grow unbounded. Monitor with cache_info().
Q: Use partial vs lambda? A: partial when you want a picklable, named-keyword version (useful for multiprocessing). lambda when it's a one-liner used immediately.
Q: reduce vs explicit loop? A: Loop is usually clearer. reduce shines for chain operations where the loop body is short and the accumulator is the whole story.
Q: cached_property when? A: For per-instance memoization of expensive properties — first access computes, subsequent accesses return cached. Cleanly garbage-collects with the instance.
Wrapping Up
functools is a toolbox of higher-order primitives. partial for argument freezing; lru_cache/cache for memoization; wraps for decorator hygiene; singledispatch for type-based polymorphism; reduce for folds. Master these and your Python becomes more expressive without becoming clever-for-clever's-sake.
Every Python developer learns to use with open('file.txt') as f early on, but few explore the full power of context managers beyond file handling. If you have ever needed to manage database connections, acquire locks, temporarily change settings, or ensure cleanup code always runs, the contextlib module is your toolkit. It turns complex resource management into clean, readable code.
The contextlib module is part of Python’s standard library, so there is nothing to install. It provides decorators and utilities that let you create context managers without writing a full class with __enter__ and __exit__ methods. The most powerful tool is @contextmanager, which turns a simple generator function into a fully functional context manager.
In this tutorial, you will learn how to create custom context managers with @contextmanager, manage multiple resources with ExitStack, suppress specific exceptions cleanly, redirect output streams, and build reusable resource management patterns. By the end, you will write context managers as naturally as you write regular functions.
Custom Context Manager: Quick Example
Here is how to create a timing context manager that measures how long a block of code takes to run:
# quick_timer.py
from contextlib import contextmanager
import time
@contextmanager
def timer(label="Block"):
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"{label} took {elapsed:.4f} seconds")
# Use it
with timer("Data processing"):
total = sum(range(1_000_000))
print(f"Sum: {total}")
Output:
Sum: 499999500000
Data processing took 0.0312 seconds
The @contextmanager decorator transforms the generator function into a context manager. Everything before yield runs on entry (like __enter__), and everything after yield runs on exit (like __exit__). The yield itself is where your with block executes.
What is contextlib and Why Use It?
Context managers in Python follow the protocol defined by the __enter__ and __exit__ magic methods. Any object with these two methods can be used with the with statement. The contextlib module provides shortcuts that save you from writing boilerplate class definitions for simple use cases.
Without contextlib, creating a context manager requires a full class with two methods. With it, you can do the same thing in a few lines using a generator function. This matters because context managers are everywhere in professional Python code — they manage database transactions, HTTP sessions, temporary files, thread locks, and any resource that needs guaranteed cleanup.
Approach
Lines of Code
Best For
Class with __enter__/__exit__
10-20 lines
Complex state management, reusable libraries
@contextmanager decorator
5-10 lines
Simple setup/teardown, one-off utilities
ExitStack
3-5 lines
Dynamic number of resources, conditional cleanup
The rule of thumb: if your context manager is straightforward setup-yield-cleanup, use @contextmanager. If it needs to manage dynamic or conditional resources, reach for ExitStack.
with-blocks: clean up always happens, even when things blow up.
The @contextmanager Decorator
The @contextmanager decorator is the workhorse of the module. It takes a generator that yields exactly once and turns it into a context manager. Here is a practical example that temporarily changes the working directory:
# change_dir.py
from contextlib import contextmanager
import os
@contextmanager
def change_directory(path):
"""Temporarily change the working directory."""
original = os.getcwd()
try:
os.chdir(path)
yield path
finally:
os.chdir(original)
# Usage
print(f"Before: {os.getcwd()}")
with change_directory("/tmp") as new_dir:
print(f"Inside: {os.getcwd()}")
print(f"Yielded: {new_dir}")
print(f"After: {os.getcwd()}")
The try/finally block inside the generator ensures cleanup happens even if an exception occurs within the with block. Whatever you pass to yield becomes the value bound by as in the with statement.
Handling Errors in Context Managers
When an exception occurs inside a with block, it gets thrown into the generator at the yield point. You can catch it, handle it, or let it propagate:
# error_handling.py
from contextlib import contextmanager
@contextmanager
def safe_operation(name):
"""Context manager that logs errors without suppressing them."""
print(f"Starting: {name}")
try:
yield
except Exception as e:
print(f"Error in {name}: {type(e).__name__}: {e}")
raise # Re-raise to let caller handle it
finally:
print(f"Finished: {name}")
# Normal usage
with safe_operation("calculation"):
result = 42 / 2
print(f"Result: {result}")
print("---")
# With an error
try:
with safe_operation("bad calculation"):
result = 42 / 0
except ZeroDivisionError:
print("Caught the error outside")
Output:
Starting: calculation
Result: 21.0
Finished: calculation
---
Starting: bad calculation
Error in bad calculation: ZeroDivisionError: division by zero
Finished: bad calculation
Caught the error outside
The finally block guarantees cleanup runs whether the operation succeeds or fails. If you want to suppress the exception (prevent it from propagating), do not re-raise it — but be careful, as silently swallowing exceptions makes debugging difficult.
Managing Multiple Resources with ExitStack
When you need to manage a variable number of resources — like opening multiple files determined at runtime — ExitStack is the right tool:
# exitstack_demo.py
from contextlib import ExitStack
import tempfile
import os
def process_multiple_files(filenames):
"""Open and process multiple files safely."""
with ExitStack() as stack:
# Open all files -- ExitStack closes them all on exit
files = [
stack.enter_context(open(f, 'w'))
for f in filenames
]
# Write to each file
for i, f in enumerate(files):
f.write(f"Content for file {i}\n")
print(f"Wrote to {filenames[i]}")
print(f"All {len(files)} files open simultaneously")
# All files are closed here, even if an error occurred
print("All files closed")
# Create temp files for demo
temp_dir = tempfile.mkdtemp()
filenames = [os.path.join(temp_dir, f"file_{i}.txt") for i in range(3)]
process_multiple_files(filenames)
# Verify files are written and closed
for f in filenames:
print(f"{os.path.basename(f)}: {open(f).read().strip()}")
Output:
Wrote to /tmp/tmpXXXXXX/file_0.txt
Wrote to /tmp/tmpXXXXXX/file_1.txt
Wrote to /tmp/tmpXXXXXX/file_2.txt
All 3 files open simultaneously
All files closed
file_0.txt: Content for file 0
file_1.txt: Content for file 1
file_2.txt: Content for file 2
The key advantage of ExitStack is that it handles cleanup for all registered resources, even if opening a later resource fails. Without it, you would need deeply nested with statements or manual cleanup logic.
@contextmanager: a decorator that turns a generator into a with-block.
Suppressing Exceptions with suppress()
Sometimes you want to ignore specific exceptions cleanly. Instead of writing try/except: pass, use contextlib.suppress():
# suppress_demo.py
from contextlib import suppress
import os
# Without suppress (verbose)
try:
os.remove("nonexistent_file.txt")
except FileNotFoundError:
pass
# With suppress (clean)
with suppress(FileNotFoundError):
os.remove("nonexistent_file.txt")
# Multiple exception types
with suppress(FileNotFoundError, PermissionError):
os.remove("/protected/file.txt")
print("Cleanup complete -- no crashes")
Output:
Cleanup complete -- no crashes
Use suppress() when you genuinely do not care about the exception — like deleting a file that might not exist, or disconnecting a client that might already be disconnected. Do not use it to hide errors you should be handling.
Redirecting Output with redirect_stdout
The redirect_stdout and redirect_stderr context managers temporarily redirect output streams. This is useful for capturing output from third-party libraries or silencing noisy functions:
# redirect_demo.py
from contextlib import redirect_stdout, redirect_stderr
import io
# Capture stdout to a string
buffer = io.StringIO()
with redirect_stdout(buffer):
print("This goes to the buffer")
print("So does this")
captured = buffer.getvalue()
print(f"Captured {len(captured)} characters:")
print(repr(captured))
# Silence stderr
with redirect_stderr(io.StringIO()):
import warnings
warnings.warn("This warning is silenced")
Output:
Captured 39 characters:
'This goes to the buffer\nSo does this\n'
This pattern is especially useful in testing, where you need to verify what a function prints without modifying the function itself.
ExitStack: stack a dozen context managers without nested ifs.
Real-Life Example: Database Transaction Manager
Let us build a practical transaction manager that handles database connections, commits on success, and rolls back on failure — all with proper resource cleanup:
# transaction_manager.py
from contextlib import contextmanager, ExitStack
import sqlite3
import os
import tempfile
@contextmanager
def database_connection(db_path):
"""Manage a database connection lifecycle."""
conn = sqlite3.connect(db_path)
try:
yield conn
finally:
conn.close()
@contextmanager
def transaction(conn):
"""Manage a database transaction with auto-commit/rollback."""
cursor = conn.cursor()
try:
yield cursor
conn.commit()
print("Transaction committed")
except Exception as e:
conn.rollback()
print(f"Transaction rolled back: {e}")
raise
@contextmanager
def managed_database(db_path):
"""Complete database session with connection and transaction."""
with ExitStack() as stack:
conn = stack.enter_context(database_connection(db_path))
cursor = stack.enter_context(transaction(conn))
yield cursor
# Demo
db_path = os.path.join(tempfile.mkdtemp(), "demo.db")
# Successful transaction
with managed_database(db_path) as cursor:
cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com"))
# Read back
with managed_database(db_path) as cursor:
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(f" User: {row}")
# Failed transaction (rollback)
print("\nAttempting bad insert:")
try:
with managed_database(db_path) as cursor:
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Charlie", "charlie@example.com"))
raise ValueError("Simulated error -- transaction should rollback")
except ValueError:
pass
# Verify rollback worked
with managed_database(db_path) as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"Users after rollback: {count} (Charlie was NOT added)")
Output:
Transaction committed
Transaction committed
User: (1, 'Alice', 'alice@example.com')
User: (2, 'Bob', 'bob@example.com')
Attempting bad insert:
Transaction rolled back: Simulated error -- transaction should rollback
Transaction committed
Users after rollback: 2 (Charlie was NOT added)
This example composes three context managers: database_connection handles the connection lifecycle, transaction handles commit/rollback logic, and managed_database combines both using ExitStack. The composition pattern keeps each context manager focused on a single responsibility while providing a convenient combined interface.
Frequently Asked Questions
When should I use a class-based context manager instead of @contextmanager?
Use a class when you need to store state between __enter__ and __exit__, when the context manager will be reused as a library component, or when you need the __exit__ method’s exception arguments to make decisions about exception handling. Use @contextmanager for simple setup-teardown patterns where a generator function is more readable.
Does contextlib work with async code?
Yes. Python 3.7+ includes contextlib.asynccontextmanager for async generators, and AsyncExitStack for managing async resources. The patterns are identical — just use async with instead of with and async def instead of def.
Can I nest context managers?
Yes, and there are multiple ways: nested with statements, comma-separated managers in a single with statement (with A() as a, B() as b:), or ExitStack for dynamic nesting. The comma syntax is preferred for a fixed number of managers; ExitStack is preferred when the number is determined at runtime.
Can I reuse a context manager instance?
It depends. @contextmanager-based managers are single-use — calling __enter__ twice on the same instance raises a RuntimeError. Class-based managers can be made reentrant by resetting state in __enter__. If you need a reentrant version, create a new instance each time or use a factory function.
What is contextlib.closing() for?
The closing() wrapper calls .close() on an object when the with block exits. Use it for objects that have a close() method but do not implement the context manager protocol natively — like urllib.request.urlopen() in older Python versions or custom connection objects.
Conclusion
The contextlib module transforms resource management from boilerplate-heavy class definitions into clean, expressive patterns. We covered @contextmanager for generator-based managers, ExitStack for dynamic resource composition, suppress() for clean exception ignoring, and redirect_stdout for output capture. The transaction manager example showed how these tools compose into production-ready patterns.
Try extending the database transaction manager with connection pooling, nested savepoints, or retry logic. For the complete API reference, see the official contextlib documentation.
If you have ever built a chat application, a live dashboard, or a multiplayer game, you know that regular HTTP requests fall short when you need real-time, bidirectional communication. Polling the server every few seconds wastes bandwidth and adds latency. WebSockets solve this by keeping a persistent connection open between client and server, allowing both sides to send messages at any time.
Python’s websockets library makes building WebSocket servers and clients remarkably straightforward. It is built on top of asyncio, so it integrates naturally with Python’s async ecosystem. You only need to install one package — pip install websockets — and you are ready to go.
In this tutorial, you will learn how to create a WebSocket server, build a client that connects to it, implement broadcast messaging for chat-style applications, handle connection lifecycle events, and add basic authentication. By the end, you will have a working real-time chat server that you can extend for your own projects.
WebSocket Echo Server: Quick Example
Here is the simplest possible WebSocket server — it echoes back whatever the client sends:
# echo_server.py
import asyncio
import websockets
async def echo(websocket):
async for message in websocket:
print(f"Received: {message}")
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("Echo server running on ws://localhost:8765")
await asyncio.Future() # Run forever
asyncio.run(main())
Output (server):
Echo server running on ws://localhost:8765
Received: Hello WebSocket!
Received: How are you?
The websockets.serve() function starts a server that calls the echo handler for every new connection. The async for message in websocket pattern automatically handles the connection lifecycle — it reads messages until the client disconnects, then exits cleanly. We will build on this pattern throughout the tutorial.
What Are WebSockets and Why Use Them?
WebSockets are a communication protocol that provides full-duplex (two-way) communication channels over a single TCP connection. Unlike HTTP, where the client must initiate every request, WebSockets allow either side to send data at any time after the initial handshake.
The WebSocket protocol starts with an HTTP upgrade request. The client sends a regular HTTP request with an Upgrade: websocket header, and if the server agrees, the connection is upgraded from HTTP to WebSocket. From that point on, both sides communicate using lightweight WebSocket frames instead of full HTTP requests.
Feature
HTTP
WebSocket
Direction
Client to server only
Bidirectional
Connection
New connection per request
Persistent single connection
Overhead
Headers on every request
Minimal frame overhead (2-14 bytes)
Latency
Round-trip per request
Near real-time
Use case
REST APIs, page loads
Chat, live feeds, gaming
Use WebSockets when you need low-latency, real-time updates. Use HTTP when you need stateless request-response patterns (like REST APIs).
WebSockets: HTTP grew tired of one-way conversations.
Setting Up the websockets Library
Install the library with pip:
# install.sh
pip install websockets
Output:
Successfully installed websockets-13.1
The websockets library requires Python 3.8 or later. It has no dependencies beyond the standard library’s asyncio module, keeping your dependency tree clean.
Building a WebSocket Client
To test our servers, we need a client. Here is a simple interactive client that sends messages and prints responses:
# client.py
import asyncio
import websockets
async def chat_client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
print("Connected to server. Type messages (Ctrl+C to quit):")
# Send a greeting
await websocket.send("Hello from Python client!")
response = await websocket.recv()
print(f"Server says: {response}")
# Interactive loop
while True:
message = input("> ")
await websocket.send(message)
response = await websocket.recv()
print(f"Server says: {response}")
asyncio.run(chat_client())
Output:
Connected to server. Type messages (Ctrl+C to quit):
Server says: Echo: Hello from Python client!
> Testing 123
Server says: Echo: Testing 123
The websockets.connect() context manager handles the connection lifecycle automatically. When you exit the async with block (or the program ends), the connection closes cleanly with a proper WebSocket close handshake.
Push notifications, live dashboards, chat. WebSockets does it all.
Building a Broadcast Chat Server
A real chat server needs to broadcast messages from one client to all connected clients. This requires tracking active connections in a set:
# chat_server.py
import asyncio
import websockets
import json
from datetime import datetime
connected_clients = set()
async def broadcast(message, sender=None):
"""Send a message to all connected clients except the sender."""
disconnected = set()
for client in connected_clients:
if client != sender:
try:
await client.send(message)
except websockets.ConnectionClosed:
disconnected.add(client)
# Clean up disconnected clients
connected_clients -= disconnected
async def chat_handler(websocket):
"""Handle a single client connection."""
# Register client
connected_clients.add(websocket)
client_id = f"User-{id(websocket) % 1000}"
print(f"{client_id} connected. Total clients: {len(connected_clients)}")
# Notify others
join_msg = json.dumps({
"type": "system",
"message": f"{client_id} joined the chat",
"timestamp": datetime.now().isoformat()
})
await broadcast(join_msg, sender=websocket)
try:
async for message in websocket:
# Wrap message with metadata
chat_msg = json.dumps({
"type": "chat",
"sender": client_id,
"message": message,
"timestamp": datetime.now().isoformat()
})
print(f"{client_id}: {message}")
await broadcast(chat_msg)
except websockets.ConnectionClosed:
pass
finally:
# Unregister client
connected_clients.discard(websocket)
leave_msg = json.dumps({
"type": "system",
"message": f"{client_id} left the chat",
"timestamp": datetime.now().isoformat()
})
await broadcast(leave_msg)
print(f"{client_id} disconnected. Total clients: {len(connected_clients)}")
async def main():
async with websockets.serve(chat_handler, "localhost", 8765):
print("Chat server running on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())
Output (server with two clients):
Chat server running on ws://localhost:8765
User-142 connected. Total clients: 1
User-857 connected. Total clients: 2
User-142: Hello everyone!
User-857: Hey there!
User-142 disconnected. Total clients: 1
The connected_clients set tracks all active connections. When a client sends a message, broadcast() forwards it to every other connected client. The try/finally block ensures we clean up the client set even if the connection drops unexpectedly.
Handling Connection Errors Gracefully
Real-world WebSocket connections drop unexpectedly due to network issues, client crashes, or timeouts. Robust error handling is essential:
# robust_server.py
import asyncio
import websockets
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("websocket-server")
async def robust_handler(websocket):
"""Handler with comprehensive error management."""
remote = websocket.remote_address
logger.info(f"New connection from {remote}")
try:
# Set a ping interval to detect dead connections
async for message in websocket:
if len(message) > 10000:
await websocket.send("Error: Message too long (max 10000 chars)")
continue
await websocket.send(f"Processed: {message}")
except websockets.ConnectionClosedError as e:
logger.warning(f"Connection closed with error: {e.code} {e.reason}")
except websockets.ConnectionClosedOK:
logger.info(f"Connection closed normally from {remote}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
logger.info(f"Cleanup complete for {remote}")
async def main():
async with websockets.serve(
robust_handler,
"localhost",
8765,
ping_interval=20, # Send ping every 20 seconds
ping_timeout=10, # Wait 10 seconds for pong
max_size=2**20, # Max message size: 1MB
close_timeout=5 # Wait 5 seconds for close handshake
):
logger.info("Robust server running on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())
Output:
INFO:websocket-server:Robust server running on ws://localhost:8765
INFO:websocket-server:New connection from ('127.0.0.1', 54321)
WARNING:websocket-server:Connection closed with error: 1006
INFO:websocket-server:Cleanup complete for ('127.0.0.1', 54321)
The ping_interval and ping_timeout parameters enable automatic keepalive detection. If a client stops responding to pings within the timeout, the server closes the connection and triggers cleanup. The max_size parameter prevents clients from sending oversized messages that could exhaust server memory.
Heartbeats: prove the socket is alive, not just open.
Adding Basic Authentication
You can authenticate WebSocket connections using query parameters, headers, or an initial authentication message. Here is a token-based approach:
# auth_server.py
import asyncio
import websockets
import json
import secrets
# Valid tokens (in production, use a database)
VALID_TOKENS = {
"demo-token-abc123": "alice",
"demo-token-def456": "bob"
}
async def authenticate(websocket):
"""Authenticate the client using the first message."""
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
data = json.loads(auth_msg)
token = data.get("token", "")
for valid_token, username in VALID_TOKENS.items():
if secrets.compare_digest(token, valid_token):
await websocket.send(json.dumps({
"type": "auth",
"status": "ok",
"username": username
}))
return username
await websocket.send(json.dumps({
"type": "auth",
"status": "error",
"message": "Invalid token"
}))
return None
except asyncio.TimeoutError:
await websocket.send(json.dumps({
"type": "auth",
"status": "error",
"message": "Authentication timeout"
}))
return None
async def secure_handler(websocket):
"""Handler that requires authentication."""
username = await authenticate(websocket)
if not username:
await websocket.close(1008, "Authentication failed")
return
print(f"{username} authenticated successfully")
async for message in websocket:
response = f"[{username}] {message}"
await websocket.send(response)
async def main():
async with websockets.serve(secure_handler, "localhost", 8765):
print("Secure server running on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())
Output:
Secure server running on ws://localhost:8765
alice authenticated successfully
The server expects the first message to contain a JSON object with a token field. It uses secrets.compare_digest() for timing-safe comparison and gives the client 5 seconds to authenticate before timing out. If authentication fails, the connection is closed with WebSocket status code 1008 (Policy Violation).
Real-Life Example: Live Notification System
Let us build a notification system where a server pushes real-time alerts to connected clients based on their subscribed topics:
# notification_server.py
import asyncio
import websockets
import json
from datetime import datetime
from collections import defaultdict
class NotificationServer:
"""Real-time notification server with topic subscriptions."""
def __init__(self):
self.subscribers = defaultdict(set) # topic -> set of websockets
self.clients = {} # websocket -> client info
async def handle_subscribe(self, websocket, topics):
"""Subscribe a client to one or more topics."""
for topic in topics:
self.subscribers[topic].add(websocket)
self.clients[websocket]["topics"] = topics
await websocket.send(json.dumps({
"type": "subscribed",
"topics": topics
}))
async def notify(self, topic, message):
"""Send a notification to all subscribers of a topic."""
if topic not in self.subscribers:
return 0
notification = json.dumps({
"type": "notification",
"topic": topic,
"message": message,
"timestamp": datetime.now().isoformat()
})
sent = 0
disconnected = set()
for client in self.subscribers[topic]:
try:
await client.send(notification)
sent += 1
except websockets.ConnectionClosed:
disconnected.add(client)
# Cleanup
self.subscribers[topic] -= disconnected
return sent
async def handler(self, websocket):
"""Main connection handler."""
self.clients[websocket] = {"topics": [], "connected": datetime.now()}
try:
async for raw_message in websocket:
data = json.loads(raw_message)
action = data.get("action")
if action == "subscribe":
await self.handle_subscribe(
websocket, data.get("topics", [])
)
elif action == "publish":
topic = data.get("topic")
message = data.get("message")
count = await self.notify(topic, message)
await websocket.send(json.dumps({
"type": "published",
"topic": topic,
"recipients": count
}))
except websockets.ConnectionClosed:
pass
finally:
# Remove from all subscriptions
for topic_subs in self.subscribers.values():
topic_subs.discard(websocket)
self.clients.pop(websocket, None)
server = NotificationServer()
async def main():
async with websockets.serve(server.handler, "localhost", 8765):
print("Notification server running on ws://localhost:8765")
# Simulate periodic system notifications
async def system_alerts():
while True:
await asyncio.sleep(30)
await server.notify("system", "Heartbeat check - all systems normal")
await asyncio.gather(
asyncio.Future(), # Run forever
system_alerts()
)
asyncio.run(main())
Output:
Notification server running on ws://localhost:8765
This server supports topic-based subscriptions. Clients subscribe to topics like “alerts”, “updates”, or “system”, and only receive notifications for their subscribed topics. The notify() method handles broadcasting to subscribers and automatically cleans up disconnected clients. You could extend this with persistent message queues, delivery confirmation, or priority levels.
Frequently Asked Questions
How many concurrent connections can a Python WebSocket server handle?
A single Python process using asyncio can typically handle 10,000-50,000 concurrent WebSocket connections, depending on message frequency and server resources. The websockets library is efficient with memory, using roughly 10KB per connection. For higher scale, use multiple processes behind a load balancer like Nginx.
How do I add TLS/SSL encryption (wss://)?
Pass an ssl context to websockets.serve(): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER), then load your certificate with ssl_context.load_cert_chain(certfile, keyfile). In production, use a reverse proxy like Nginx to handle TLS termination instead.
How do I handle client reconnection?
The websockets library does not auto-reconnect. On the client side, wrap your connection in a retry loop with exponential backoff. The websockets.connect() context manager closes cleanly, so you can simply reconnect in a while True loop with a try/except block catching ConnectionClosed errors.
Can I send binary data over WebSockets?
Yes. The websockets library automatically detects whether you send str (text frames) or bytes (binary frames). Use await websocket.send(b"\x00\x01\x02") for binary data. This is useful for sending images, audio streams, or protobuf-encoded messages.
Can I use WebSockets with Django or Flask?
Django supports WebSockets through Django Channels, which adds an ASGI layer. Flask does not natively support WebSockets, but you can use Flask-SocketIO or run a separate websockets server alongside Flask. For new projects needing WebSocket support, consider FastAPI, which has native WebSocket support built on Starlette.
Conclusion
You now have a solid foundation for building WebSocket applications in Python. We covered creating echo servers, building broadcast chat systems, handling connection errors gracefully, adding authentication, and implementing topic-based notification systems. The websockets library’s async-first design makes it easy to handle thousands of concurrent connections with clean, readable code.
Try extending the notification server with features like message persistence, delivery acknowledgments, or a web-based admin dashboard. For the complete API reference, see the official websockets documentation.
Bare websockets Library Server
The websockets library is the canonical pure-Python implementation — modern async, type-hinted, RFC-compliant. The minimal echo server is 8 lines:
# pip install websockets
import asyncio
from websockets.asyncio.server import serve
async def echo(ws):
async for message in ws:
await ws.send(f"Echo: {message}")
async def main():
async with serve(echo, "0.0.0.0", 8765) as server:
await server.serve_forever()
asyncio.run(main())
Connect from anywhere — browser JavaScript (new WebSocket("ws://localhost:8765")), wscat, or another Python client via websockets.connect. The handler runs once per connection; async for message reads incoming frames until the client disconnects.
Broadcasting to Multiple Clients
For chat rooms or live dashboards, track all connected clients in a set, broadcast each message to all of them:
import asyncio
from websockets.asyncio.server import serve
import json
clients = set()
async def handler(ws):
clients.add(ws)
try:
async for raw in ws:
payload = json.loads(raw)
outbound = json.dumps({"user": payload["user"], "text": payload["text"]})
# Send to everyone, ignore disconnected clients
await asyncio.gather(
*[c.send(outbound) for c in clients],
return_exceptions=True,
)
finally:
clients.remove(ws)
async def main():
async with serve(handler, "0.0.0.0", 8765):
await asyncio.Future() # run forever
asyncio.run(main())
Auth, Subprotocols, and Path Routing
Production WebSocket servers need auth and routing. Use the process_request hook for HTTP-level checks before upgrading to WebSocket:
from websockets.asyncio.server import serve
from http import HTTPStatus
async def auth_handler(connection, request):
# Reject before upgrade
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
if not is_valid(token):
return connection.respond(HTTPStatus.UNAUTHORIZED, "Bad token\n")
# Attach user info for the handler
connection.user_id = get_user_id_from_token(token)
async def handler(ws):
user_id = ws.user_id
async for msg in ws:
# use user_id in messages
pass
async def main():
async with serve(handler, "0.0.0.0", 8765, process_request=auth_handler):
await asyncio.Future()
WebSockets in FastAPI
If you already have a FastAPI app, FastAPI ships its own WebSocket support — no second framework needed:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
active = []
@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
await websocket.accept()
active.append(websocket)
try:
while True:
data = await websocket.receive_text()
for client in active:
await client.send_text(f"[broadcast] {data}")
except WebSocketDisconnect:
active.remove(websocket)
Heartbeats and Reconnection
WebSockets across the public internet drop silently — proxies and NAT timeouts kill idle connections after 30-120 seconds. Send pings to keep them alive:
async with serve(handler, "0.0.0.0", 8765, ping_interval=20, ping_timeout=10):
...
For clients, implement exponential-backoff reconnect on the JS side — bare browsers don’t auto-reconnect.
Common Pitfalls
Blocking inside the handler. A synchronous time.sleep(5) blocks the entire event loop. Use await asyncio.sleep(5). Long CPU work belongs in asyncio.to_thread.
Forgetting to remove disconnected clients. Without cleanup, broadcasting tries to send on a dead socket — exception, server crashes. Use try/finally with remove.
Treating each message as JSON without try/except. A malformed message kills the handler. Wrap json.loads in a try/except and send an error message back.
Holding references in the broadcast list. Long-lived sets of WebSocket objects can leak memory if a handler exits without removing itself. Always remove in finally.
Skipping wss:// in production. Plain ws:// sends data unencrypted. Always terminate TLS at your reverse proxy and use wss://.
FAQ
Q: websockets library or FastAPI WebSockets?
A: FastAPI if you already have a FastAPI app. The websockets library when you want a dedicated, low-overhead WebSocket-only server.
Q: How many concurrent connections can one server handle?
A: Thousands on a single Python process, tens of thousands with uvloop. Beyond that, scale horizontally with a load balancer that supports sticky sessions or use a pub/sub layer (Redis) for cross-instance broadcasts.
Q: How do I scale broadcasts across multiple server instances?
A: Use Redis pub/sub as the message bus. Each instance subscribes to a channel; broadcasts go through Redis; every connected client gets the message regardless of which instance they’re on.
Q: Polling vs WebSockets vs SSE?
A: WebSockets for bidirectional real-time (chat, multiplayer games). Server-Sent Events for one-way push (dashboards, notifications). Polling when you need a backup. Each has its place.
Q: Behind nginx — what’s the config?
A: Add proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; to your location block. Increase proxy_read_timeout well beyond default (3600s).
Wrapping Up
WebSockets are the right answer for any real-time, bidirectional, low-latency need: chat, multiplayer, live cursors, streaming logs, collaborative editing. The websockets library is the modern Python implementation; FastAPI bundles equivalent functionality if you’re already using it. Pings, auth at upgrade time, and TLS termination at a reverse proxy are the production essentials.
You have probably written code that generates random tokens for password resets, API keys, or session identifiers. If you used the random module for this, your tokens are predictable — an attacker who knows the seed can reproduce every token you generate. This is not a theoretical risk; it has been the root cause of real security breaches.
Python 3.6 introduced the secrets module specifically for generating cryptographically secure random values. It is part of the standard library, so there is nothing to install. It uses the operating system’s best source of randomness (/dev/urandom on Linux, CryptGenRandom on Windows) and produces tokens that are safe for security-sensitive applications.
In this tutorial, you will learn how to generate secure tokens in multiple formats (hex, URL-safe, bytes), create safe passwords, build one-time password reset links, and compare tokens securely. By the end, you will have a complete toolkit for handling secrets in any Python application.
Generating a Secure Token: Quick Example
Here is the fastest way to generate a cryptographically secure token in Python:
The secrets.token_urlsafe() function generates random bytes and encodes them as a URL-safe base64 string. The 32-byte input produces a 43-character token with enough entropy (256 bits) to resist brute-force attacks. We will explore all the token types and their use cases in the sections below.
What is the Secrets Module and Why Use It?
The secrets module provides functions for generating cryptographically strong random numbers suitable for managing secrets such as account authentication, tokens, and similar. Think of it as the security-focused counterpart to random.
The key difference is the source of randomness. The random module uses a Mersenne Twister algorithm — fast and statistically uniform, but deterministic. If someone discovers the internal state (which requires observing only 624 consecutive outputs), they can predict all future values. The secrets module draws from the OS entropy pool, which is non-deterministic and cannot be reversed.
Feature
random
secrets
Algorithm
Mersenne Twister (PRNG)
OS entropy pool (CSPRNG)
Predictable?
Yes, if seed is known
No
Speed
Very fast
Slightly slower
Use case
Simulations, games, testing
Passwords, tokens, API keys
Thread-safe?
No (shared state)
Yes
The rule is simple: if the value protects something, use secrets. If it does not, random is fine.
secrets.token_urlsafe(): the right way to generate a token.
Generating Tokens in Different Formats
The secrets module offers three token functions, each producing a different encoding of the same underlying random bytes. The format you choose depends on where you plan to use the token.
Hex Tokens with token_hex()
Hex tokens produce a string of hexadecimal characters (0-9, a-f). They are commonly used for database identifiers, session IDs, and anywhere you need a clean alphanumeric string.
Each byte produces two hex characters, so token_hex(16) returns a 32-character string. For most applications, 32 bytes (256 bits) provides sufficient entropy.
URL-Safe Tokens with token_urlsafe()
URL-safe tokens use base64 encoding with - and _ instead of + and /. This makes them safe to include in URLs, query parameters, and HTTP headers without encoding issues.
Bytes: b'\xd4\x8a\x1f...(32 bytes of binary data)'
Length: 32 bytes
As hex: d48a1f3b7c9e2d...
Raw bytes are not human-readable, but they are the right choice when the token feeds directly into a cryptographic function like hmac.new() or a symmetric encryption library.
Generating Secure Passwords
The secrets module includes secrets.choice(), which selects a random element from a sequence using cryptographically secure randomness. Combined with Python’s string module, you can build password generators that meet any complexity requirement.
# password_gen.py
import secrets
import string
def generate_password(length=16):
"""Generate a secure password with mixed character types."""
alphabet = string.ascii_letters + string.digits + string.punctuation
# Ensure at least one of each required type
password = [
secrets.choice(string.ascii_uppercase),
secrets.choice(string.ascii_lowercase),
secrets.choice(string.digits),
secrets.choice(string.punctuation),
]
# Fill remaining length with random choices
password += [secrets.choice(alphabet) for _ in range(length - 4)]
# Shuffle to avoid predictable positions
# Use secrets for the shuffle via sorting with random keys
password.sort(key=lambda _: secrets.randbelow(1000))
return ''.join(password)
# Generate 5 passwords
for i in range(5):
pwd = generate_password(20)
print(f"Password {i+1}: {pwd}")
The key detail is the shuffle step. Without it, the first four characters always follow the same type pattern (uppercase, lowercase, digit, punctuation). The secrets.randbelow() function provides a secure random integer that we use as a sort key to randomize character positions.
random is for games. secrets is for security.
Comparing Tokens Securely
When verifying a token (checking if a submitted reset token matches the stored one), you must use constant-time comparison to prevent timing attacks. A timing attack measures how long the comparison takes — a regular == comparison returns False as soon as it finds the first mismatched character, leaking information about how many characters were correct.
# secure_compare.py
import secrets
import hmac
stored_token = "abc123def456"
submitted_token = "abc123def456"
# WRONG: vulnerable to timing attacks
if stored_token == submitted_token:
print("Match (but insecure comparison)")
# RIGHT: constant-time comparison
if secrets.compare_digest(stored_token, submitted_token):
print("Match (secure comparison)")
# Also works with bytes
stored_bytes = b"secret_token_value"
submitted_bytes = b"secret_token_value"
if secrets.compare_digest(stored_bytes, submitted_bytes):
print("Bytes match (secure comparison)")
Output:
Match (but insecure comparison)
Match (secure comparison)
Bytes match (secure comparison)
The secrets.compare_digest() function always examines every character, taking the same amount of time regardless of where (or whether) the strings differ. This is the same function used internally by hmac.compare_digest().
Practical Security Patterns
Building a Password Reset Flow
Here is how to generate a time-limited password reset token with proper security practices:
# reset_flow.py
import secrets
import hashlib
import time
class TokenManager:
"""Manages secure, time-limited tokens."""
def __init__(self, expiry_seconds=3600):
self.tokens = {} # In production, use a database
self.expiry = expiry_seconds
def create_token(self, user_id):
"""Generate a reset token for a user."""
raw_token = secrets.token_urlsafe(32)
# Store only the hash -- never store raw tokens
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
self.tokens[token_hash] = {
'user_id': user_id,
'created': time.time()
}
return raw_token # Send this to the user via email
def verify_token(self, submitted_token):
"""Verify a submitted token and return user_id if valid."""
token_hash = hashlib.sha256(submitted_token.encode()).hexdigest()
# Constant-time lookup using compare_digest
for stored_hash, data in self.tokens.items():
if secrets.compare_digest(stored_hash, token_hash):
# Check expiry
if time.time() - data['created'] > self.expiry:
del self.tokens[stored_hash]
return None # Expired
return data['user_id']
return None # Not found
# Usage
manager = TokenManager(expiry_seconds=3600)
token = manager.create_token("user_42")
print(f"Reset token: {token}")
print(f"Verified: {manager.verify_token(token)}")
print(f"Invalid token: {manager.verify_token('wrong-token')}")
The critical detail is that we store only the hash of the token, never the raw token itself. If the database is breached, attackers get useless hashes. The raw token only exists in the email sent to the user and in memory during verification.
Generating API Keys
API keys typically need a prefix for identification and a random body for security:
Public key: pk_Rk9PQkFSLVRoaXMtaXMtYQ
Secret key: sk_c2VjcmV0LWtleS12YWx1ZQ
Test key: tk_dGVzdC1rZXktdmFsdWUtaGVyZQ
The prefix makes it easy to identify key types in logs and configuration files without revealing the secret portion. This is the same pattern used by services like Stripe (sk_live_, pk_test_).
Using random for passwords? You’re rolling dice for the wolf.
Real-Life Example: Secure Invitation System
Let us build a complete invitation system that generates secure invite codes, tracks their usage, and enforces expiration and single-use constraints.
# invitation_system.py
import secrets
import hashlib
import time
from datetime import datetime
class InvitationSystem:
"""Secure invitation code manager with expiry and usage tracking."""
def __init__(self):
self.invitations = {}
def create_invite(self, creator, role="member", max_uses=1,
expiry_hours=48):
"""Create a new invitation with constraints."""
code = secrets.token_urlsafe(16)
code_hash = hashlib.sha256(code.encode()).hexdigest()
self.invitations[code_hash] = {
'creator': creator,
'role': role,
'max_uses': max_uses,
'current_uses': 0,
'created': time.time(),
'expiry': time.time() + (expiry_hours * 3600),
'used_by': []
}
return code
def redeem_invite(self, code, user_name):
"""Attempt to redeem an invitation code."""
code_hash = hashlib.sha256(code.encode()).hexdigest()
for stored_hash, invite in self.invitations.items():
if not secrets.compare_digest(stored_hash, code_hash):
continue
if time.time() > invite['expiry']:
return {"success": False, "error": "Invitation expired"}
if invite['current_uses'] >= invite['max_uses']:
return {"success": False, "error": "Invitation fully used"}
invite['current_uses'] += 1
invite['used_by'].append(user_name)
return {
"success": True,
"role": invite['role'],
"message": f"Welcome {user_name}! Role: {invite['role']}"
}
return {"success": False, "error": "Invalid invitation code"}
def get_stats(self):
"""Get invitation usage statistics."""
active = sum(1 for inv in self.invitations.values()
if time.time() < inv['expiry']
and inv['current_uses'] < inv['max_uses'])
expired = sum(1 for inv in self.invitations.values()
if time.time() >= inv['expiry'])
return {"active": active, "expired": expired,
"total": len(self.invitations)}
# Demo
system = InvitationSystem()
# Create invitations
admin_code = system.create_invite("alice", role="admin", max_uses=1)
team_code = system.create_invite("bob", role="editor", max_uses=5,
expiry_hours=72)
print(f"Admin invite: {admin_code}")
print(f"Team invite: {team_code}")
print()
# Redeem invitations
print(system.redeem_invite(admin_code, "charlie"))
print(system.redeem_invite(admin_code, "dave")) # Should fail
print(system.redeem_invite(team_code, "eve"))
print(system.redeem_invite("invalid-code", "mallory"))
print()
print(f"Stats: {system.get_stats()}")
This system demonstrates several security best practices: hashed storage of codes, constant-time comparison, usage limits, and time-based expiration. In production, you would replace the in-memory dictionary with a database and add rate limiting to prevent brute-force code guessing.
Frequently Asked Questions
Can I just use random.SystemRandom() instead of secrets?
Yes, random.SystemRandom() also uses the OS entropy pool and is cryptographically secure. However, secrets is the recommended module since Python 3.6 because it provides a cleaner API specifically designed for security tasks. It also includes compare_digest() and token_urlsafe(), which SystemRandom does not.
How many bytes should my tokens be?
The Python documentation recommends at least 32 bytes (256 bits) for tokens used in security contexts. This provides sufficient entropy that brute-force guessing is infeasible even with massive computing resources. For lower-stakes use cases like email verification, 16 bytes (128 bits) is still very strong.
What happens if I call token_hex() with no arguments?
If you omit the byte count, secrets uses a default that is “reasonable for most use cases” — currently 32 bytes. However, it is better to specify the length explicitly so your code is self-documenting and immune to future default changes.
Is secrets slower than random?
Yes, but the difference is negligible for token generation. Generating 10,000 tokens with secrets takes about 50 milliseconds compared to 15 milliseconds with random. Since you typically generate tokens one at a time (not in bulk), the performance difference is irrelevant.
Should I use uuid4() or secrets for unique identifiers?
uuid.uuid4() generates random UUIDs using os.urandom(), so it is cryptographically secure. Use UUIDs when you need a standardized format (like database primary keys). Use secrets when you need a security token with a specific format (URL-safe, hex) or when you need compare_digest() for safe verification.
Conclusion
The secrets module gives you everything you need to generate cryptographically secure tokens, passwords, and random values in Python. We covered token_hex() for hexadecimal strings, token_urlsafe() for URL-compatible tokens, token_bytes() for raw binary data, and compare_digest() for timing-attack-resistant comparisons. We also built practical systems for password resets, API key generation, and invitation codes.
Try extending the invitation system with features like rate limiting, audit logging, or integration with a real database using SQLite or PostgreSQL. The secrets module is small but foundational — once you understand it, you can build secure authentication flows for any Python application.
For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.
Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.
In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.
200
{‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}
That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.
Timing reveals performance truths — measure twice, optimize once
What is httpx and Why Use It?
httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.
The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.
Let’s compare httpx to other HTTP libraries in the Python ecosystem:
Feature
httpx
requests
aiohttp
urllib3
Synchronous API
Yes
Yes
No
Yes
Async/Await Support
Yes
No
Yes
No
HTTP/2
Yes
No
Yes
No
Connection Pooling
Yes
Yes
Yes
Yes
Streaming Support
Yes
Yes
Yes
Yes
API Complexity
Low
Low
Medium
High
Drop-in requests Replacement
Mostly Yes
N/A
No
No
httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.
Installing httpx
Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.
If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.
That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.
Repeated benchmarks beat single measurements — variance hides optimization truths
Making GET Requests
GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.
# get_requests.py
import httpx
# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")
# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")
# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200
Content-Type: application/json
First user: Leanne Graham
Posts for user 1: 10
{‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}
Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.
Making POST Requests
POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.
# post_requests.py
import httpx
# POST with JSON data
client = httpx.Client()
data = {
"title": "New Post",
"body": "This is a test post",
"userId": 1
}
response = client.post(
"https://jsonplaceholder.typicode.com/posts",
json=data
)
print(f"Created post with ID: {response.json()['id']}")
# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
"https://httpbin.org/post",
data=form_data
)
print(f"Form post status: {response.status_code}")
# POST with custom timeout
try:
response = client.post(
"https://httpbin.org/delay/10",
timeout=5.0
)
except httpx.TimeoutException:
print("Request timed out after 5 seconds")
client.close()
Created post with ID: 101
Form post status: 200
Request timed out after 5 seconds
When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.
Sort by cumtime — the wall-clock time users actually experience
Using Async with httpx
This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.
# async_requests.py
import asyncio
import httpx
async def fetch_posts(user_id):
"""Fetch posts for a specific user"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
)
return response.json()
async def fetch_multiple_users():
"""Fetch posts for multiple users concurrently"""
tasks = [
fetch_posts(user_id)
for user_id in range(1, 6)
]
results = await asyncio.gather(*tasks)
for i, posts in enumerate(results, 1):
print(f"User {i}: {len(posts)} posts")
# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts
User 2: 10 posts
User 3: 9 posts
User 4: 9 posts
User 5: 9 posts
The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.
# async_with_timeout.py
import asyncio
import httpx
async def fetch_with_timeout():
"""Fetch with explicit timeout configuration"""
timeout = httpx.Timeout(10.0) # 10 second timeout for all operations
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.get(
"https://jsonplaceholder.typicode.com/posts/1"
)
print(f"Success: {response.status_code}")
except httpx.TimeoutException:
print("Request timed out")
except httpx.RequestError as e:
print(f"Network error: {e}")
asyncio.run(fetch_with_timeout())
Success: 200
Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.
HTTP/2 Support
HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.
# http2_support.py
import httpx
# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")
# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()
# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2
HTTP Version (forced 1.1): HTTP/1.1
HTTP Version (with HTTP/2): HTTP/2
The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.
Local variables beat namespace lookups — milliseconds add up in loops
Timeouts and Error Handling
Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.
# error_handling.py
import httpx
def fetch_with_fallback(url, fallback_url):
"""Fetch from primary URL with fallback"""
try:
response = httpx.get(url, timeout=5.0)
response.raise_for_status() # Raise exception for bad status codes
return response.json()
except httpx.TimeoutException:
print(f"Timeout on {url}, trying fallback")
return httpx.get(fallback_url).json()
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
raise
except httpx.RequestError as e:
print(f"Request error: {e}")
raise
try:
data = fetch_with_fallback(
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2"
)
print(f"Fetched post: {data['title']}")
except Exception as e:
print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…
httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.
Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.
Streaming Responses
When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.
# streaming_responses.py
import httpx
# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
print(f"Status: {response.status_code}")
print(f"Content-Length: {response.headers.get('content-length')}")
# Process response in chunks
for chunk in response.iter_bytes(chunk_size=256):
print(f"Received {len(chunk)} bytes")
# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
for line in response.iter_lines():
if line:
print(f"Line: {line[:50]}...")
# Async streaming
import asyncio
async def async_stream():
async with httpx.AsyncClient() as client:
async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
async for chunk in response.aiter_bytes(chunk_size=128):
print(f"Async received {len(chunk)} bytes")
asyncio.run(async_stream())
Status: 200
Content-Length: 1024
Received 256 bytes
Received 256 bytes
Received 256 bytes
Received 256 bytes
Line: {“slideshow”: {“author”: “Yours Truly”, “title”: …
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.
Real-Life Example: Async API Data Aggregator
Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.
# api_aggregator.py
import asyncio
import httpx
from datetime import datetime
class APIAggregator:
"""Aggregates data from multiple APIs concurrently"""
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.timeout = httpx.Timeout(10.0)
async def fetch_post(self, client, post_id):
"""Fetch a single post"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts/{post_id}",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching post {post_id}: {e}")
return None
async def fetch_user_comments(self, client, user_id):
"""Fetch comments for a user"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching comments: {e}")
return []
async def aggregate(self):
"""Aggregate data from multiple endpoints"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Fetch posts concurrently
post_tasks = [
self.fetch_post(client, i)
for i in range(1, 6)
]
posts = await asyncio.gather(*post_tasks)
# Fetch comments concurrently
comment_tasks = [
self.fetch_user_comments(client, i)
for i in range(1, 3)
]
comments = await asyncio.gather(*comment_tasks)
return {
"timestamp": datetime.now().isoformat(),
"posts_fetched": len([p for p in posts if p]),
"total_comments": sum(len(c) for c in comments if c),
"sample_post": posts[0] if posts else None
}
# Run the aggregator
async def main():
aggregator = APIAggregator()
results = await aggregator.aggregate()
print(f"Aggregation completed at {results['timestamp']}")
print(f"Posts fetched: {results['posts_fetched']}")
print(f"Total comments: {results['total_comments']}")
print(f"First post title: {results['sample_post']['title']}")
asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456
Posts fetched: 5
Total comments: 10
First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit
This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.
Frequently Asked Questions
Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.
Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.
Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.
Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.
Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.
Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.
Conclusion
httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.
Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.
Related Articles
How To Handle Asynchronous Programming in Python with asyncio
Understanding HTTP Status Codes: A Complete Guide
Building REST APIs with FastAPI and Python
Web Scraping with Python: Best Practices and Tools
Error Handling and Logging in Production Python Applications
For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.
Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.
In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.
200
{‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}
That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.
Assignment that makes code cooler
What is httpx and Why Use It?
httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.
The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.
Let’s compare httpx to other HTTP libraries in the Python ecosystem:
Feature
httpx
requests
aiohttp
urllib3
Synchronous API
Yes
Yes
No
Yes
Async/Await Support
Yes
No
Yes
No
HTTP/2
Yes
No
Yes
No
Connection Pooling
Yes
Yes
Yes
Yes
Streaming Support
Yes
Yes
Yes
Yes
API Complexity
Low
Low
Medium
High
Drop-in requests Replacement
Mostly Yes
N/A
No
No
httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.
Installing httpx
Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.
If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.
That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.
GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.
# get_requests.py
import httpx
# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")
# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")
# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200
Content-Type: application/json
First user: Leanne Graham
Posts for user 1: 10
{‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}
Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.
Making POST Requests
POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.
# post_requests.py
import httpx
# POST with JSON data
client = httpx.Client()
data = {
"title": "New Post",
"body": "This is a test post",
"userId": 1
}
response = client.post(
"https://jsonplaceholder.typicode.com/posts",
json=data
)
print(f"Created post with ID: {response.json()['id']}")
# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
"https://httpbin.org/post",
data=form_data
)
print(f"Form post status: {response.status_code}")
# POST with custom timeout
try:
response = client.post(
"https://httpbin.org/delay/10",
timeout=5.0
)
except httpx.TimeoutException:
print("Request timed out after 5 seconds")
client.close()
Created post with ID: 101
Form post status: 200
Request timed out after 5 seconds
When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.
When to walrus: regex matches and stream reading
Using Async with httpx
This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.
# async_requests.py
import asyncio
import httpx
async def fetch_posts(user_id):
"""Fetch posts for a specific user"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
)
return response.json()
async def fetch_multiple_users():
"""Fetch posts for multiple users concurrently"""
tasks = [
fetch_posts(user_id)
for user_id in range(1, 6)
]
results = await asyncio.gather(*tasks)
for i, posts in enumerate(results, 1):
print(f"User {i}: {len(posts)} posts")
# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts
User 2: 10 posts
User 3: 9 posts
User 4: 9 posts
User 5: 9 posts
The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.
# async_with_timeout.py
import asyncio
import httpx
async def fetch_with_timeout():
"""Fetch with explicit timeout configuration"""
timeout = httpx.Timeout(10.0) # 10 second timeout for all operations
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.get(
"https://jsonplaceholder.typicode.com/posts/1"
)
print(f"Success: {response.status_code}")
except httpx.TimeoutException:
print("Request timed out")
except httpx.RequestError as e:
print(f"Network error: {e}")
asyncio.run(fetch_with_timeout())
Success: 200
Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.
HTTP/2 Support
HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.
# http2_support.py
import httpx
# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")
# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()
# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2
HTTP Version (forced 1.1): HTTP/1.1
HTTP Version (with HTTP/2): HTTP/2
The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.
Understanding when to skip the walrus keeps code maintainable
Timeouts and Error Handling
Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.
# error_handling.py
import httpx
def fetch_with_fallback(url, fallback_url):
"""Fetch from primary URL with fallback"""
try:
response = httpx.get(url, timeout=5.0)
response.raise_for_status() # Raise exception for bad status codes
return response.json()
except httpx.TimeoutException:
print(f"Timeout on {url}, trying fallback")
return httpx.get(fallback_url).json()
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
raise
except httpx.RequestError as e:
print(f"Request error: {e}")
raise
try:
data = fetch_with_fallback(
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2"
)
print(f"Fetched post: {data['title']}")
except Exception as e:
print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…
httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.
Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.
Streaming Responses
When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.
# streaming_responses.py
import httpx
# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
print(f"Status: {response.status_code}")
print(f"Content-Length: {response.headers.get('content-length')}")
# Process response in chunks
for chunk in response.iter_bytes(chunk_size=256):
print(f"Received {len(chunk)} bytes")
# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
for line in response.iter_lines():
if line:
print(f"Line: {line[:50]}...")
# Async streaming
import asyncio
async def async_stream():
async with httpx.AsyncClient() as client:
async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
async for chunk in response.aiter_bytes(chunk_size=128):
print(f"Async received {len(chunk)} bytes")
asyncio.run(async_stream())
Status: 200
Content-Length: 1024
Received 256 bytes
Received 256 bytes
Received 256 bytes
Received 256 bytes
Line: {“slideshow”: {“author”: “Yours Truly”, “title”: …
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.
Real-Life Example: Async API Data Aggregator
Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.
# api_aggregator.py
import asyncio
import httpx
from datetime import datetime
class APIAggregator:
"""Aggregates data from multiple APIs concurrently"""
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.timeout = httpx.Timeout(10.0)
async def fetch_post(self, client, post_id):
"""Fetch a single post"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts/{post_id}",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching post {post_id}: {e}")
return None
async def fetch_user_comments(self, client, user_id):
"""Fetch comments for a user"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching comments: {e}")
return []
async def aggregate(self):
"""Aggregate data from multiple endpoints"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Fetch posts concurrently
post_tasks = [
self.fetch_post(client, i)
for i in range(1, 6)
]
posts = await asyncio.gather(*post_tasks)
# Fetch comments concurrently
comment_tasks = [
self.fetch_user_comments(client, i)
for i in range(1, 3)
]
comments = await asyncio.gather(*comment_tasks)
return {
"timestamp": datetime.now().isoformat(),
"posts_fetched": len([p for p in posts if p]),
"total_comments": sum(len(c) for c in comments if c),
"sample_post": posts[0] if posts else None
}
# Run the aggregator
async def main():
aggregator = APIAggregator()
results = await aggregator.aggregate()
print(f"Aggregation completed at {results['timestamp']}")
print(f"Posts fetched: {results['posts_fetched']}")
print(f"Total comments: {results['total_comments']}")
print(f"First post title: {results['sample_post']['title']}")
asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456
Posts fetched: 5
Total comments: 10
First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit
This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.
Frequently Asked Questions
Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.
Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.
Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.
Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.
Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.
Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.
Conclusion
httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.
Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.
Related Articles
How To Handle Asynchronous Programming in Python with asyncio
Understanding HTTP Status Codes: A Complete Guide
Building REST APIs with FastAPI and Python
Web Scraping with Python: Best Practices and Tools
Error Handling and Logging in Production Python Applications
For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.
Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.
In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.
200
{‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}
That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.
When the 429 wave hits, read the headers
What is httpx and Why Use It?
httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.
The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.
Let’s compare httpx to other HTTP libraries in the Python ecosystem:
Feature
httpx
requests
aiohttp
urllib3
Synchronous API
Yes
Yes
No
Yes
Async/Await Support
Yes
No
Yes
No
HTTP/2
Yes
No
Yes
No
Connection Pooling
Yes
Yes
Yes
Yes
Streaming Support
Yes
Yes
Yes
Yes
API Complexity
Low
Low
Medium
High
Drop-in requests Replacement
Mostly Yes
N/A
No
No
httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.
Installing httpx
Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.
If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.
That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.
Headers are the bread crumbs out of the rate limit forest
Making GET Requests
GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.
# get_requests.py
import httpx
# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")
# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")
# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200
Content-Type: application/json
First user: Leanne Graham
Posts for user 1: 10
{‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}
Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.
Making POST Requests
POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.
# post_requests.py
import httpx
# POST with JSON data
client = httpx.Client()
data = {
"title": "New Post",
"body": "This is a test post",
"userId": 1
}
response = client.post(
"https://jsonplaceholder.typicode.com/posts",
json=data
)
print(f"Created post with ID: {response.json()['id']}")
# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
"https://httpbin.org/post",
data=form_data
)
print(f"Form post status: {response.status_code}")
# POST with custom timeout
try:
response = client.post(
"https://httpbin.org/delay/10",
timeout=5.0
)
except httpx.TimeoutException:
print("Request timed out after 5 seconds")
client.close()
Created post with ID: 101
Form post status: 200
Request timed out after 5 seconds
When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.
Exponential backoff: be patient, then exponentially more patient
Using Async with httpx
This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.
# async_requests.py
import asyncio
import httpx
async def fetch_posts(user_id):
"""Fetch posts for a specific user"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
)
return response.json()
async def fetch_multiple_users():
"""Fetch posts for multiple users concurrently"""
tasks = [
fetch_posts(user_id)
for user_id in range(1, 6)
]
results = await asyncio.gather(*tasks)
for i, posts in enumerate(results, 1):
print(f"User {i}: {len(posts)} posts")
# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts
User 2: 10 posts
User 3: 9 posts
User 4: 9 posts
User 5: 9 posts
The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.
# async_with_timeout.py
import asyncio
import httpx
async def fetch_with_timeout():
"""Fetch with explicit timeout configuration"""
timeout = httpx.Timeout(10.0) # 10 second timeout for all operations
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.get(
"https://jsonplaceholder.typicode.com/posts/1"
)
print(f"Success: {response.status_code}")
except httpx.TimeoutException:
print("Request timed out")
except httpx.RequestError as e:
print(f"Network error: {e}")
asyncio.run(fetch_with_timeout())
Success: 200
Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.
HTTP/2 Support
HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.
# http2_support.py
import httpx
# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")
# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()
# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2
HTTP Version (forced 1.1): HTTP/1.1
HTTP Version (with HTTP/2): HTTP/2
The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.
Decorators beat manual retry loops every time
Timeouts and Error Handling
Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.
# error_handling.py
import httpx
def fetch_with_fallback(url, fallback_url):
"""Fetch from primary URL with fallback"""
try:
response = httpx.get(url, timeout=5.0)
response.raise_for_status() # Raise exception for bad status codes
return response.json()
except httpx.TimeoutException:
print(f"Timeout on {url}, trying fallback")
return httpx.get(fallback_url).json()
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
raise
except httpx.RequestError as e:
print(f"Request error: {e}")
raise
try:
data = fetch_with_fallback(
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2"
)
print(f"Fetched post: {data['title']}")
except Exception as e:
print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…
httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.
Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.
Streaming Responses
When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.
# streaming_responses.py
import httpx
# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
print(f"Status: {response.status_code}")
print(f"Content-Length: {response.headers.get('content-length')}")
# Process response in chunks
for chunk in response.iter_bytes(chunk_size=256):
print(f"Received {len(chunk)} bytes")
# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
for line in response.iter_lines():
if line:
print(f"Line: {line[:50]}...")
# Async streaming
import asyncio
async def async_stream():
async with httpx.AsyncClient() as client:
async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
async for chunk in response.aiter_bytes(chunk_size=128):
print(f"Async received {len(chunk)} bytes")
asyncio.run(async_stream())
Status: 200
Content-Length: 1024
Received 256 bytes
Received 256 bytes
Received 256 bytes
Received 256 bytes
Line: {“slideshow”: {“author”: “Yours Truly”, “title”: …
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.
Know your limits, respect the headers, fetch responsibly
Real-Life Example: Async API Data Aggregator
Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.
# api_aggregator.py
import asyncio
import httpx
from datetime import datetime
class APIAggregator:
"""Aggregates data from multiple APIs concurrently"""
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.timeout = httpx.Timeout(10.0)
async def fetch_post(self, client, post_id):
"""Fetch a single post"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts/{post_id}",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching post {post_id}: {e}")
return None
async def fetch_user_comments(self, client, user_id):
"""Fetch comments for a user"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching comments: {e}")
return []
async def aggregate(self):
"""Aggregate data from multiple endpoints"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Fetch posts concurrently
post_tasks = [
self.fetch_post(client, i)
for i in range(1, 6)
]
posts = await asyncio.gather(*post_tasks)
# Fetch comments concurrently
comment_tasks = [
self.fetch_user_comments(client, i)
for i in range(1, 3)
]
comments = await asyncio.gather(*comment_tasks)
return {
"timestamp": datetime.now().isoformat(),
"posts_fetched": len([p for p in posts if p]),
"total_comments": sum(len(c) for c in comments if c),
"sample_post": posts[0] if posts else None
}
# Run the aggregator
async def main():
aggregator = APIAggregator()
results = await aggregator.aggregate()
print(f"Aggregation completed at {results['timestamp']}")
print(f"Posts fetched: {results['posts_fetched']}")
print(f"Total comments: {results['total_comments']}")
print(f"First post title: {results['sample_post']['title']}")
asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456
Posts fetched: 5
Total comments: 10
First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit
This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.
Frequently Asked Questions
Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.
Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.
Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.
Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.
Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.
Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.
Conclusion
httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.
Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.
Related Articles
How To Handle Asynchronous Programming in Python with asyncio
Understanding HTTP Status Codes: A Complete Guide
Building REST APIs with FastAPI and Python
Web Scraping with Python: Best Practices and Tools
Error Handling and Logging in Production Python Applications
For years, the Python requests library has been the go-to solution for making HTTP requests. While requests is powerful and user-friendly, it has limitations that modern Python developers encounter daily. It doesn’t support async operations natively, lacks HTTP/2 support, and can feel sluggish when you’re handling dozens of concurrent requests. If you’ve found yourself wrestling with requests’ synchronous nature or spinning up ThreadPoolExecutor just to manage multiple requests, you’re not alone. Many developers hit a wall where they need something more capable.
Enter httpx — a modern HTTP client that drops in as a replacement for requests while adding powerful features like native async/await support, HTTP/2 capabilities, and streaming responses. The best part? If you already know requests, you’ll feel right at home with httpx. The API is remarkably similar, which means you can start using it immediately without relearning everything. We’ll cover installation, basic usage, async patterns, and real-world examples that show why httpx is becoming the preferred choice for new Python projects.
In this guide, we’ll walk through everything you need to know about httpx. We’ll start with a quick example to get you up and running, explore what makes httpx special compared to other HTTP libraries, and then dive into practical patterns you can use in your own projects. Whether you’re building a simple API client or managing complex async workflows, httpx has the tools you need. Let’s get started.
200
{‘userId’: 1, ‘id’: 1, ‘title’: ‘sunt aut facere repellat…’, ‘body’: ‘…’}
That’s it. If you’ve used requests before, you already know httpx. The synchronous API is nearly identical, but under the hood, httpx brings modern features to the table. Now let’s explore what makes httpx more powerful than its predecessor.
httpx handles synchronous and asynchronous requests from a single library
What is httpx and Why Use It?
httpx is a modern HTTP client library for Python that combines the simplicity of requests with advanced features like async support, HTTP/2, and more sophisticated timeout handling. Created by Tom Christie (the same developer behind Starlette and FastAPI), httpx is built on a foundation that understands modern Python development patterns. It’s not just a requests replacement — it’s a redesign based on everything we’ve learned about making HTTP libraries in the 2020s.
The key differences matter when you’re building real applications. Unlike requests, httpx supports both synchronous and asynchronous code from the same library. You don’t need to install separate packages or maintain multiple code paths. It supports HTTP/2 by default, which means better performance for services that support it. It has built-in connection pooling, proper async context managers, and a cleaner API that feels more Pythonic.
Let’s compare httpx to other HTTP libraries in the Python ecosystem:
Feature
httpx
requests
aiohttp
urllib3
Synchronous API
Yes
Yes
No
Yes
Async/Await Support
Yes
No
Yes
No
HTTP/2
Yes
No
Yes
No
Connection Pooling
Yes
Yes
Yes
Yes
Streaming Support
Yes
Yes
Yes
Yes
API Complexity
Low
Low
Medium
High
Drop-in requests Replacement
Mostly Yes
N/A
No
No
httpx shines when you need synchronous and asynchronous code in the same project. Unlike aiohttp, which requires AsyncIO from the start, httpx lets you start simple and scale to async when you need it. Unlike requests, httpx doesn’t force you into threading patterns when you want to handle multiple requests concurrently. It’s the bridge between requests’ simplicity and aiohttp’s power.
Installing httpx
Installation is straightforward. httpx is available on PyPI and installs cleanly without forcing dependencies on you. For basic functionality, you only need one command.
If you want HTTP/2 support with better performance, you can install the optional dependencies. The h2 library handles HTTP/2 protocol details, while httpcore provides the underlying transport layer.
That’s all you need. Unlike some HTTP libraries, httpx doesn’t require compiling C extensions or installing system dependencies. It’s pure Python with optional performance enhancements.
POST, GET, PUT, DELETE — all methods work seamlessly with httpx
Making GET Requests
GET requests are the foundation of HTTP. They retrieve data without side effects, and httpx makes them effortless. The basic pattern is identical to requests, but httpx adds subtle improvements like better error handling and automatic timeout management.
# get_requests.py
import httpx
# Simple GET request
response = httpx.get("https://jsonplaceholder.typicode.com/users")
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers['content-type']}")
print(f"First user: {response.json()[0]['name']}")
# GET with query parameters
params = {"userId": 1}
response = httpx.get("https://jsonplaceholder.typicode.com/posts", params=params)
print(f"Posts for user 1: {len(response.json())}")
# GET with custom headers
headers = {"User-Agent": "MyApp/1.0"}
response = httpx.get("https://httpbin.org/headers", headers=headers)
print(response.json())
Status: 200
Content-Type: application/json
First user: Leanne Graham
Posts for user 1: 10
{‘headers’: {‘User-Agent’: ‘MyApp/1.0’, …}}
Notice how httpx handles query parameters naturally through the `params` dictionary. You don’t manually construct query strings or worry about URL encoding — httpx handles that behind the scenes. Headers work the same way, accepting a dictionary that httpx merges with the default headers. This consistent API means you can focus on your application logic instead of HTTP bookkeeping.
Making POST Requests
POST requests send data to the server. httpx supports multiple ways to send data: form-encoded, JSON, raw bytes, or streaming. Let’s explore the most common patterns.
# post_requests.py
import httpx
# POST with JSON data
client = httpx.Client()
data = {
"title": "New Post",
"body": "This is a test post",
"userId": 1
}
response = client.post(
"https://jsonplaceholder.typicode.com/posts",
json=data
)
print(f"Created post with ID: {response.json()['id']}")
# POST with form data
form_data = {"username": "john", "password": "secret123"}
response = client.post(
"https://httpbin.org/post",
data=form_data
)
print(f"Form post status: {response.status_code}")
# POST with custom timeout
try:
response = client.post(
"https://httpbin.org/delay/10",
timeout=5.0
)
except httpx.TimeoutException:
print("Request timed out after 5 seconds")
client.close()
Created post with ID: 101
Form post status: 200
Request timed out after 5 seconds
When making POST requests, use the `json` parameter for JSON data and the `data` parameter for form-encoded data. httpx automatically sets the correct Content-Type header for you. The `Client()` context manager maintains connection pooling across multiple requests, which is more efficient than using module-level functions for repeated requests. Timeouts are crucial for production code — they prevent your application from hanging if a server stops responding.
Connection pooling keeps your HTTP clients fast and memory-efficient
Using Async with httpx
This is where httpx truly shines. Async support is built in from the ground up, not bolted on as an afterthought. When you need to handle multiple concurrent requests, async/await patterns let you handle hundreds of concurrent connections with minimal memory overhead — something that would require threading or multiprocessing with requests.
# async_requests.py
import asyncio
import httpx
async def fetch_posts(user_id):
"""Fetch posts for a specific user"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts?userId={user_id}"
)
return response.json()
async def fetch_multiple_users():
"""Fetch posts for multiple users concurrently"""
tasks = [
fetch_posts(user_id)
for user_id in range(1, 6)
]
results = await asyncio.gather(*tasks)
for i, posts in enumerate(results, 1):
print(f"User {i}: {len(posts)} posts")
# Run the async function
asyncio.run(fetch_multiple_users())
User 1: 10 posts
User 2: 10 posts
User 3: 9 posts
User 4: 9 posts
User 5: 9 posts
The `AsyncClient()` context manager handles resource cleanup automatically. Using `asyncio.gather()`, we fetch posts for five users concurrently in roughly the time it takes to fetch one. This pattern scales to thousands of concurrent requests without the overhead of creating threads. The key difference from synchronous code is minimal — just add `async` and `await` keywords.
# async_with_timeout.py
import asyncio
import httpx
async def fetch_with_timeout():
"""Fetch with explicit timeout configuration"""
timeout = httpx.Timeout(10.0) # 10 second timeout for all operations
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.get(
"https://jsonplaceholder.typicode.com/posts/1"
)
print(f"Success: {response.status_code}")
except httpx.TimeoutException:
print("Request timed out")
except httpx.RequestError as e:
print(f"Network error: {e}")
asyncio.run(fetch_with_timeout())
Success: 200
Timeout handling in async contexts is critical. The `Timeout` object lets you set different timeouts for connection, read, write, and pool operations. This fine-grained control prevents your async application from hanging on unresponsive servers.
HTTP/2 Support
HTTP/2 is faster than HTTP/1.1 because it multiplexes multiple requests over a single connection and compresses headers. With httpx, HTTP/2 support is automatic for servers that support it. You don’t need to change your code — just have the h2 library installed.
# http2_support.py
import httpx
# HTTP/2 is automatic when available
response = httpx.get("https://httpbin.org/get")
print(f"HTTP Version: {response.http_version}")
# Force HTTP/1.1 if needed
client = httpx.Client(http2=False)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (forced 1.1): {response.http_version}")
client.close()
# Create client with explicit HTTP/2 support
client = httpx.Client(http2=True)
response = client.get("https://httpbin.org/get")
print(f"HTTP Version (with HTTP/2): {response.http_version}")
client.close()
HTTP Version: HTTP/2
HTTP Version (forced 1.1): HTTP/1.1
HTTP Version (with HTTP/2): HTTP/2
The performance difference is subtle for single requests but becomes dramatic with concurrent requests over HTTP/2. Since HTTP/2 multiplexes requests on a single connection, you avoid the overhead of establishing multiple TCP connections. For APIs that support it, this can mean 20-50% faster performance in real-world scenarios.
HTTP/2 multiplexing lets httpx handle multiple requests on one connection
Timeouts and Error Handling
Production code needs robust error handling. httpx provides clear exceptions for different failure scenarios, making it easy to distinguish between network problems, timeouts, and server errors.
# error_handling.py
import httpx
def fetch_with_fallback(url, fallback_url):
"""Fetch from primary URL with fallback"""
try:
response = httpx.get(url, timeout=5.0)
response.raise_for_status() # Raise exception for bad status codes
return response.json()
except httpx.TimeoutException:
print(f"Timeout on {url}, trying fallback")
return httpx.get(fallback_url).json()
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
raise
except httpx.RequestError as e:
print(f"Request error: {e}")
raise
try:
data = fetch_with_fallback(
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2"
)
print(f"Fetched post: {data['title']}")
except Exception as e:
print(f"All attempts failed: {e}")
Fetched post: sunt aut facere repellat provident…
httpx organizes exceptions in a clear hierarchy. `RequestError` is the base class for all request-related errors. `TimeoutException` indicates a timeout (connection, read, write, or pool). `HTTPStatusError` means the server responded with an error status code (4xx or 5xx). Using `raise_for_status()` automatically raises an exception for bad status codes, similar to requests.
Different operations need different timeout values. Connection timeouts should be shorter (5-10 seconds), while read timeouts depend on the expected response size. For large downloads, you might need 30+ second read timeouts. httpx lets you configure each separately.
Streaming Responses
When dealing with large files or streaming APIs, loading the entire response into memory is inefficient. httpx supports streaming, letting you process responses chunk by chunk.
# streaming_responses.py
import httpx
# Stream a large response
with httpx.stream("GET", "https://httpbin.org/bytes/1024") as response:
print(f"Status: {response.status_code}")
print(f"Content-Length: {response.headers.get('content-length')}")
# Process response in chunks
for chunk in response.iter_bytes(chunk_size=256):
print(f"Received {len(chunk)} bytes")
# Stream with iterator
with httpx.stream("GET", "https://httpbin.org/json") as response:
for line in response.iter_lines():
if line:
print(f"Line: {line[:50]}...")
# Async streaming
import asyncio
async def async_stream():
async with httpx.AsyncClient() as client:
async with client.stream("GET", "https://httpbin.org/bytes/512") as response:
async for chunk in response.aiter_bytes(chunk_size=128):
print(f"Async received {len(chunk)} bytes")
asyncio.run(async_stream())
Status: 200
Content-Length: 1024
Received 256 bytes
Received 256 bytes
Received 256 bytes
Received 256 bytes
Line: {“slideshow”: {“author”: “Yours Truly”, “title”: …
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Async received 128 bytes
Streaming is essential for production applications that download large files or handle streaming APIs. The `iter_bytes()` method gives you raw bytes, while `iter_lines()` automatically splits on newlines — useful for newline-delimited JSON APIs. Async streaming with `aiter_bytes()` and `aiter_lines()` works the same way in async contexts.
Streaming responses keep memory usage constant regardless of file size
Real-Life Example: Async API Data Aggregator
Let’s build a practical example that combines async requests, error handling, and structured data processing. Imagine you’re aggregating data from multiple APIs and want to do it efficiently.
# api_aggregator.py
import asyncio
import httpx
from datetime import datetime
class APIAggregator:
"""Aggregates data from multiple APIs concurrently"""
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.timeout = httpx.Timeout(10.0)
async def fetch_post(self, client, post_id):
"""Fetch a single post"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/posts/{post_id}",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching post {post_id}: {e}")
return None
async def fetch_user_comments(self, client, user_id):
"""Fetch comments for a user"""
try:
response = await client.get(
f"https://jsonplaceholder.typicode.com/comments?email=*@{user_id}.com",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error fetching comments: {e}")
return []
async def aggregate(self):
"""Aggregate data from multiple endpoints"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Fetch posts concurrently
post_tasks = [
self.fetch_post(client, i)
for i in range(1, 6)
]
posts = await asyncio.gather(*post_tasks)
# Fetch comments concurrently
comment_tasks = [
self.fetch_user_comments(client, i)
for i in range(1, 3)
]
comments = await asyncio.gather(*comment_tasks)
return {
"timestamp": datetime.now().isoformat(),
"posts_fetched": len([p for p in posts if p]),
"total_comments": sum(len(c) for c in comments if c),
"sample_post": posts[0] if posts else None
}
# Run the aggregator
async def main():
aggregator = APIAggregator()
results = await aggregator.aggregate()
print(f"Aggregation completed at {results['timestamp']}")
print(f"Posts fetched: {results['posts_fetched']}")
print(f"Total comments: {results['total_comments']}")
print(f"First post title: {results['sample_post']['title']}")
asyncio.run(main())
Aggregation completed at 2026-04-09T14:23:45.123456
Posts fetched: 5
Total comments: 10
First post title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit
This example demonstrates several httpx patterns: creating an async client, managing concurrent requests with proper error handling, using consistent timeouts across all requests, and returning structured data. The `APIAggregator` class is reusable and extensible — you could add caching, retry logic, or progress tracking. In production, you’d likely add logging and more sophisticated error recovery.
Frequently Asked Questions
Q: Is httpx a drop-in replacement for requests?
A: Mostly yes. The synchronous API is nearly identical, so most requests code works with httpx unchanged. However, httpx is stricter about some behaviors (like automatic redirects) and has some API differences. Test thoroughly before migrating production code.
Q: Do I need to use async?
A: No. httpx works great synchronously, and you only need async when handling many concurrent requests. Start with synchronous code and migrate to async if profiling shows it’s beneficial.
Q: What’s the performance difference between httpx and requests?
A: For single requests, performance is similar. For concurrent requests, httpx with async is dramatically faster because it avoids thread overhead. HTTP/2 support also improves performance for compatible servers.
Q: How do I handle cookies and sessions?
A: httpx clients maintain cookies automatically. Use a persistent client for multiple requests to the same host to keep cookies and connection pooling active across requests.
Q: Can I use httpx with Django or Flask?
A: Yes, but use the synchronous API in request handlers since WSGI is synchronous. Use async httpx in ASGI applications like FastAPI or Django Async Views.
Q: How do I set up authentication?
A: httpx supports multiple auth methods. For basic auth: `httpx.get(url, auth=(“user”, “pass”))`. For bearer tokens: `headers={“Authorization”: “Bearer token”}`. For custom auth, create a subclass of `httpx.Auth`.
Conclusion
httpx represents the future of HTTP requests in Python. It combines the simplicity of requests with modern features like async/await, HTTP/2, and better timeout handling. Whether you’re building a simple API client or managing complex concurrent request workflows, httpx has the tools you need without unnecessary complexity.
Start by installing httpx and replacing requests in a non-critical project. You’ll quickly discover why developers are switching. For more details, comprehensive documentation is available at httpx.readthedocs.io.
Related Articles
How To Handle Asynchronous Programming in Python with asyncio
Understanding HTTP Status Codes: A Complete Guide
Building REST APIs with FastAPI and Python
Web Scraping with Python: Best Practices and Tools
Error Handling and Logging in Production Python Applications
Why Password Security Matters in Your Applications
Every year, millions of passwords are exposed in data breaches. When companies store passwords in plain text or with weak encryption methods, attackers who gain access to the database can immediately use those credentials against users. A single security oversight can compromise not just one user, but your entire application’s integrity and your reputation. The good news? Protecting passwords isn’t complicated — bcrypt makes it remarkably straightforward to implement enterprise-grade password security in your Python applications today.
Bcrypt is a purpose-built password hashing library that automatically handles the complexity of secure password storage. Unlike generic hashing functions, bcrypt incorporates salt generation, adaptive work factors, and built-in protections against common attacks. Whether you’re building a small project or scaling to millions of users, bcrypt gives you the same security guarantees with just a few lines of code.
In this guide, we’ll explore how bcrypt works from first principles, then move through practical implementations including user registration, password verification, and real-world patterns you can use immediately. By the end, you’ll understand why bcrypt is the industry standard and how to integrate it into your projects with confidence.
Quick Example: Hash and Verify in 10 Lines
Before diving into the theory, let’s see bcrypt in action. This example demonstrates the complete workflow — hashing a password and then verifying a user’s input against that stored hash. It’s this simple and this secure.
That’s the essence of password hashing with bcrypt. The hashpw() function handles salt generation and hashing internally, while checkpw() verifies inputs without needing to decrypt anything. This simplicity masks sophisticated security mechanisms working behind the scenes.
What is Password Hashing and Why Use bcrypt?
Password hashing is a one-way cryptographic function that transforms a plaintext password into an irreversible string of characters. The database stores only the hash, not the original password. When a user logs in, the application hashes their input and compares it to the stored hash — if they match, the password is correct. If attackers compromise the database, they get hashes, not passwords.
Not all hashing functions are suitable for passwords. General-purpose algorithms like MD5, SHA1, and even SHA256 are too fast — an attacker with access to your password hashes can run billions of guesses per second using GPU clusters. Bcrypt solves this by intentionally being slow and computationally expensive, making brute-force attacks impractical. Here’s how different approaches compare:
Method
Speed
Salt
Adaptive
Security Level
Plaintext
Instant
None
No
Catastrophic
MD5
~1M hashes/sec
Optional
No
Broken
SHA256
~100M hashes/sec
Optional
No
Weak for passwords
Bcrypt
~5 hashes/sec (configurable)
Automatic
Yes
Industry standard
Bcrypt’s adaptive cost factor is crucial — as hardware becomes faster, you can increase the work factor to keep password cracking slow. A hash that takes 0.2 seconds to compute today will still take 0.2 seconds tomorrow, even as computers improve. This forward-looking design is why bcrypt remains relevant decades after its creation.
Installing bcrypt
Bcrypt requires a simple pip installation. It’s a compiled C extension for performance, and the package handles all dependency management across operating systems. Most Python environments will install it without issues, though some systems may need development headers for the C compiler.
Once installed, bcrypt is immediately available for import. We recommend adding bcrypt to your project’s requirements.txt or pyproject.toml file so it’s installed automatically for anyone who clones your repository. The package is maintained actively and released regularly with security updates, so keep it current with pip install --upgrade bcrypt periodically.
Investigating hash functions like a crime scene — following the trail that only goes one way
Hashing a Password
The hashing process in bcrypt is deceptively simple from a user perspective, but implements several sophisticated techniques internally. When you call hashpw(), bcrypt generates a cryptographically random salt, applies the Blowfish cipher multiple times based on the cost factor, and returns a single string containing all the information needed to verify passwords later.
Here’s the standard pattern for creating password hashes when users register or change their password:
# hash_password_demo.py
import bcrypt
def hash_password(password: str) -> str:
"""
Hash a plaintext password using bcrypt.
Args:
password: The user's plaintext password
Returns:
The bcrypt hash as a string (can be stored in database)
"""
# Encode string to bytes (bcrypt requires bytes)
password_bytes = password.encode('utf-8')
# Generate salt and hash in one operation
# Default cost factor is 12
hashed = bcrypt.hashpw(password_bytes, bcrypt.gensalt())
# Return as string for database storage
return hashed.decode('utf-8')
# Example usage
user_password = "SecurePass123!"
stored_hash = hash_password(user_password)
print(f"Original: {user_password}")
print(f"Stored: {stored_hash}")
print(f"Length: {len(stored_hash)} characters")
Every hash is exactly 60 characters long, making it database-friendly to store in a VARCHAR(60) column. The hash contains embedded information: the algorithm identifier ($2b$), the cost factor ($12$), and the salt. You never need to store these separately — everything is self-contained in that single string.
A critical consideration: passwords must be encoded to bytes before hashing. Python 3 strings are Unicode by default, but bcrypt operates on bytes. Always use .encode('utf-8') or specify the encoding explicitly. Different encodings will produce different hashes, which is why consistency matters — the verification step must use the same encoding.
Verifying a Password
When a user logs in, you don’t decrypt the stored hash or compare strings directly. Instead, you hash the incoming password and let bcrypt compare the hashes internally using timing-safe comparison. This prevents timing attacks where attackers measure response times to gain information about the hash.
Here’s how to verify passwords securely:
# verify_password_demo.py
import bcrypt
def verify_password(provided_password: str, stored_hash: str) -> bool:
"""
Verify a plaintext password against a bcrypt hash.
Args:
provided_password: Password the user provided at login
stored_hash: The bcrypt hash from the database
Returns:
True if the password matches, False otherwise
"""
# Encode both to bytes
provided_bytes = provided_password.encode('utf-8')
stored_bytes = stored_hash.encode('utf-8')
# checkpw handles timing-safe comparison internally
return bcrypt.checkpw(provided_bytes, stored_bytes)
# Example: Simulating a login attempt
stored_hash = "$2b$12$QaNu.rBvNp8zXrLbSKc.MOaVfVL6qfKfhIU3SYvKzGdpLp4I8TS6O"
# Correct password
attempt1 = "SecurePass123!"
print(f"Attempt 1 ('{attempt1}'): {verify_password(attempt1, stored_hash)}")
# Wrong password
attempt2 = "WrongPassword123"
print(f"Attempt 2 ('{attempt2}'): {verify_password(attempt2, stored_hash)}")
# Close but not exact
attempt3 = "SecurePass123"
print(f"Attempt 3 ('{attempt3}'): {verify_password(attempt3, stored_hash)}")
Notice that even a single character difference results in verification failure. Bcrypt’s comparison is exact — there’s no “close enough” or partial matches. This is intentional: password verification must be binary.
The checkpw() function uses constant-time comparison, meaning it takes the same amount of time regardless of where a mismatch occurs. A naive string comparison might return immediately upon finding the first different character, but bcrypt compares the entire hash every time. This prevents attackers from using timing measurements to guess parts of the hash.
Following the principle of least privilege — storing only what you absolutely need
Understanding Salt and Work Factor
Bcrypt’s security relies on two mechanisms working together: salt and cost factor. The salt is a random string added to each password before hashing, ensuring identical passwords produce different hashes. Without salt, an attacker could create a rainbow table — a precomputed mapping of common passwords to their hashes — and look up compromised passwords instantly.
The cost factor (work factor) determines how many rounds of hashing bcrypt performs. A higher cost factor makes hashing slower, which also makes brute-force attacks slower. Each increment doubles the computational cost. Let’s explore these concepts:
# salt_cost_demo.py
import bcrypt
import time
password = "TestPassword123"
password_bytes = password.encode('utf-8')
# Demonstrate different cost factors
print("Cost Factor Performance Analysis:")
print("-" * 50)
for cost in [10, 12, 14]:
start = time.time()
hashed = bcrypt.hashpw(password_bytes, bcrypt.gensalt(rounds=cost))
elapsed = time.time() - start
print(f"Cost {cost}: {elapsed:.3f}s - Hash: {hashed.decode()[:30]}...")
print("\nDifferent salts, same password:")
print("-" * 50)
# Same password hashed multiple times produces different results
for i in range(3):
hashed = bcrypt.hashpw(password_bytes, bcrypt.gensalt())
print(f"Hash {i+1}: {hashed.decode()}")
Notice how cost factor 10 is fast but cost factor 14 takes over a second. In production, you need to balance security with user experience — a login taking 5 seconds would frustrate users. The default cost of 12 (0.2-0.3 seconds) is a proven sweet spot for most applications. As hardware improves over time, you can increase the cost factor in your code to maintain the same hashing duration.
Each hash includes its salt and cost factor as part of the output string. The format is: $2b$COST$SALT_AND_HASH. When you verify a password with checkpw(), bcrypt extracts this information from the stored hash automatically, ensuring verification uses the same parameters as the original hashing. You never need to manage salt or cost separately.
Adjusting the Cost Factor
While the default cost of 12 works well, production applications often adjust this based on their specific needs. Services with high login volume might use cost 10 to keep latency low, while security-critical applications might use cost 13 or 14. The key is measuring and balancing performance against security.
Here’s how to implement configurable cost factors and test performance in your application:
# configurable_cost.py
import bcrypt
import time
from typing import Optional
class PasswordManager:
"""Manages password hashing with configurable cost factor."""
def __init__(self, cost_factor: int = 12):
"""
Initialize with a specific cost factor.
Args:
cost_factor: Bcrypt cost factor (10-12 recommended, max 31)
"""
if not 4 <= cost_factor <= 31:
raise ValueError("Cost factor must be between 4 and 31")
self.cost_factor = cost_factor
def hash_password(self, password: str) -> str:
"""Hash a password with the configured cost factor."""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt(rounds=self.cost_factor)
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
def verify_password(self, password: str, stored_hash: str) -> bool:
"""Verify a password against a stored hash."""
try:
password_bytes = password.encode('utf-8')
stored_bytes = stored_hash.encode('utf-8')
return bcrypt.checkpw(password_bytes, stored_bytes)
except ValueError:
# Invalid hash format
return False
def benchmark(self, test_password: str = "TestPassword123") -> float:
"""Measure how long hashing takes with current cost factor."""
start = time.time()
self.hash_password(test_password)
return time.time() - start
# Test different cost factors
print("Finding optimal cost factor:")
print("-" * 50)
for cost in [10, 11, 12, 13]:
manager = PasswordManager(cost_factor=cost)
elapsed = manager.benchmark()
print(f"Cost {cost}: {elapsed:.3f} seconds")
# Use production setting
print("\nProduction PasswordManager:")
print("-" * 50)
pm = PasswordManager(cost_factor=12)
pwd = "MySecurePassword!"
hashed = pm.hash_password(pwd)
print(f"Hash: {hashed}")
print(f"Verify: {pm.verify_password(pwd, hashed)}")
This PasswordManager class wraps bcrypt operations and makes cost factor configurable without changing application logic everywhere. You can update the default cost factor in one place, and all password operations use the new value. For applications with mixed cost factors in the database (from previous versions), bcrypt’s automatic parameter extraction handles this — older hashes with cost 10 verify correctly even if your application now uses cost 12.
Measuring twice, hashing once — balancing security with real-world performance constraints
Real-Life Example: User Registration CLI
Let’s build a practical user registration system that demonstrates password hashing in context. This example includes input validation, database simulation, and proper error handling — everything you’d need to adapt for a real application:
# user_registration_system.py
import bcrypt
import json
import os
from typing import Optional, Dict, Any
class UserDatabase:
"""Simulates a user database with file-based storage."""
def __init__(self, db_file: str = "users.json"):
self.db_file = db_file
self._load_database()
def _load_database(self) -> None:
"""Load users from file or create new database."""
if os.path.exists(self.db_file):
with open(self.db_file, 'r') as f:
self.users = json.load(f)
else:
self.users = {}
def _save_database(self) -> None:
"""Persist users to file."""
with open(self.db_file, 'w') as f:
json.dump(self.users, f, indent=2)
def user_exists(self, username: str) -> bool:
"""Check if username already exists."""
return username in self.users
def register_user(self, username: str, email: str,
password: str) -> Dict[str, Any]:
"""
Register a new user with password hashing.
Returns:
Dict with status and message
"""
# Validation
if self.user_exists(username):
return {"success": False, "message": "Username already taken"}
if len(password) < 8:
return {"success": False,
"message": "Password must be 8+ characters"}
if not email or '@' not in email:
return {"success": False, "message": "Invalid email address"}
# Hash password with cost factor 12
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt(rounds=12)
password_hash = bcrypt.hashpw(password_bytes, salt).decode('utf-8')
# Store user (never store plaintext password)
self.users[username] = {
"email": email,
"password_hash": password_hash,
"created": "2026-04-09"
}
self._save_database()
return {"success": True,
"message": f"User {username} registered successfully"}
def authenticate_user(self, username: str,
password: str) -> Dict[str, Any]:
"""
Verify username and password.
Returns:
Dict with authentication result
"""
if not self.user_exists(username):
return {"success": False, "message": "Invalid credentials"}
user = self.users[username]
password_bytes = password.encode('utf-8')
stored_hash = user["password_hash"].encode('utf-8')
# Use bcrypt to verify
if bcrypt.checkpw(password_bytes, stored_hash):
return {"success": True,
"message": f"Welcome back, {username}!"}
else:
return {"success": False, "message": "Invalid credentials"}
# Interactive CLI registration demo
if __name__ == "__main__":
db = UserDatabase("demo_users.json")
print("User Registration System")
print("=" * 50)
# Register a new user
result = db.register_user(
username="alice_dev",
email="alice@example.com",
password="AliceSecure123!"
)
print(f"Register: {result['message']}")
# Attempt duplicate registration
result = db.register_user(
username="alice_dev",
email="alice2@example.com",
password="DifferentPass123"
)
print(f"Duplicate attempt: {result['message']}")
# Correct login
result = db.authenticate_user("alice_dev", "AliceSecure123!")
print(f"Login (correct): {result['message']}")
# Incorrect login
result = db.authenticate_user("alice_dev", "WrongPassword123")
print(f"Login (wrong): {result['message']}")
# Cleanup
if os.path.exists("demo_users.json"):
os.remove("demo_users.json")
User Registration System
==================================================
Register: User alice_dev registered successfully
Duplicate attempt: Username already taken
Login (correct): Welcome back, alice_dev!
Login (wrong): Invalid credentials
This registration system demonstrates critical security patterns: passwords are never stored, only their bcrypt hashes are persisted. The validation ensures reasonable password strength before hashing. The authentication method uses bcrypt’s comparison, so timing attacks won’t reveal information about the hash. You can adapt this structure directly into web frameworks like Flask, Django, or FastAPI by replacing the file-based database with a proper database connection.
Catching attackers in the act — timing-safe comparisons leave no fingerprints
Frequently Asked Questions
Q: Can I decrypt a bcrypt hash to see the original password?
No, bcrypt is one-way cryptography by design. There’s no decryption function. This is a feature, not a limitation — if a password is forgotten, the user must reset it, not retrieve it. If someone claims to have recovered your original password from a bcrypt hash, they’re either lying or the system wasn’t actually using bcrypt.
Q: How long does a bcrypt hash stay valid?
The hash itself never expires. A hash created in 2015 will still verify passwords correctly in 2026. However, you can re-hash passwords when users log in with a higher cost factor. For example, migrate users to cost 13 only when they authenticate, avoiding the need to rehash all passwords at once.
Q: Is cost factor 12 always the right choice?
Cost 12 is a solid default that takes about 0.2-0.3 seconds on modern hardware. However, measure your specific use case: if you have thousands of concurrent logins, cost 10 might be necessary; if you’re a high-security application with low login volume, cost 13 or 14 is justified. The OWASP recommendation is to adjust cost so hashing takes 0.5-2 seconds on your target hardware.
Q: What if my bcrypt hash contains non-ASCII characters?
Bcrypt hashes only contain ASCII characters in the format $2a$, $2b$, or $2y$ followed by base-64 encoded characters. If you’re seeing non-ASCII, something went wrong during encoding. Always store bcrypt hashes as UTF-8 strings, and encode/decode carefully at boundaries with your database and external systems.
Q: Can I use the same salt for multiple passwords?
No, and bcrypt prevents this automatically. Every call to bcrypt.gensalt() generates a cryptographically unique salt. Using the same salt for multiple passwords would allow attackers to detect if two users have the same password. Let bcrypt generate a fresh salt for each password.
Q: Should I add my own salt on top of bcrypt’s salt?
No, that’s unnecessary and unlikely to improve security. Bcrypt’s salt is cryptographically sound. Adding additional layers of salting adds complexity without meaningful security benefit, and it could introduce vulnerabilities. Trust bcrypt’s design and focus on protecting your source code and deployment infrastructure instead.
Conclusion
Password hashing with bcrypt is one of the few areas where security and simplicity align perfectly. In just a few lines of code, you get enterprise-grade protection against the most common password attacks. The key patterns — always use bcrypt.hashpw() for storage and bcrypt.checkpw() for verification — should become second nature in any application handling user accounts.
Start with a cost factor of 12 and monitor your application’s performance. As hardware improves in the coming years, you can increase the cost factor to maintain security without redesigning your system. Test your implementation with common passwords and verify that both hashing and verification complete in acceptable timeframes for your users.
For deeper technical details and the latest version of bcrypt, visit the official bcrypt repository on GitHub. The PyCA project maintains bcrypt as part of Python’s cryptographic toolkit, with regular security audits and updates.
Memory management is one of the most critical yet often overlooked aspects of Python programming. Unlike languages such as C or C++, Python abstracts away manual memory allocation and deallocation, but understanding how Python manages memory under the hood is essential for writing efficient, scalable applications. Whether you’re building web services, data processing pipelines, or long-running applications, inefficient memory management can lead to performance degradation, excessive resource consumption, and even application crashes.
Python employs several sophisticated mechanisms to manage memory, including reference counting as its primary garbage collection strategy, supplemented by a cyclic garbage collector to handle circular references. Additionally, Python provides memory profiling tools and optimization techniques that allow developers to monitor and improve memory usage. This comprehensive guide explores how Python allocates and manages memory, how garbage collection works, and practical strategies for optimizing memory consumption in your applications.
By the end of this tutorial, you’ll understand how Python’s memory management system operates at a fundamental level, be able to identify and fix memory leaks, profile your applications to find memory hotspots, and implement best practices for memory-efficient Python code. These skills are particularly valuable for developing production-grade applications where resource efficiency directly impacts cost, performance, and user experience.
Quick Example: Memory Tracking in Python
Tracking memory like a hawk because trusting Python to clean up after itself is bold
Before diving deep, let’s see a quick example of how to track memory usage:
import sys
import tracemalloc
# Start tracking memory
tracemalloc.start()
# Create objects
data_list = [i for i in range(1000)]
data_dict = {i: i**2 for i in range(1000)}
# Get memory snapshot
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: {current / 1024:.2f} KB")
print(f"Peak memory usage: {peak / 1024:.2f} KB")
tracemalloc.stop()
# Check object size
print(f"List size: {sys.getsizeof(data_list)} bytes")
print(f"Dict size: {sys.getsizeof(data_dict)} bytes")
Output:
Current memory usage: 45.23 KB
Peak memory usage: 45.67 KB
List size: 9016 bytes
Dict size: 49264 bytes
How Python Allocates Memory
The private heap called — it wants you to stop allocating objects you never delete
Python memory allocation happens at multiple levels, from the operating system level down to individual object allocation. When Python starts, it reserves a block of memory from the operating system. This memory is divided into different pools and arenas for efficient allocation and deallocation.
Memory Pools and Arenas: Python uses a memory pool architecture where small objects (smaller than 512 bytes) are allocated from pre-allocated pools. These pools are organized into arenas, which are allocated from the system heap. This approach reduces fragmentation and improves allocation speed compared to direct system calls for every object.
Object Structure: Every Python object has a reference count and type information stored alongside the actual data. The PyObject structure in CPython includes:
# Conceptual representation of a Python object
class PyObject:
def __init__(self, value):
self.ob_refcnt = 1 # Reference count
self.ob_type = type(value) # Type information
self.value = value # Actual data
Memory Layout Example:
import sys
# Demonstrate object memory layout
my_list = [1, 2, 3]
my_dict = {"a": 1, "b": 2}
my_string = "Hello, World!"
print(f"List object size: {sys.getsizeof(my_list)} bytes")
print(f"Dict object size: {sys.getsizeof(my_dict)} bytes")
print(f"String object size: {sys.getsizeof(my_string)} bytes")
# The actual memory usage is larger because of internal structures
print(f"\nActual memory for list contents: {sys.getsizeof(my_list) + sum(sys.getsizeof(x) for x in my_list)} bytes")
Reference count hits zero and the object vanishes faster than your weekend plans
Python’s primary garbage collection mechanism is reference counting. Each object maintains a count of how many references point to it. When the reference count drops to zero, the memory is immediately freed. This mechanism is automatic and happens transparently, making it simple for developers but requiring careful attention to avoid circular references.
How Reference Counting Works:
import sys
# Create an object
my_list = [1, 2, 3]
print(f"Initial ref count: {sys.getrefcount(my_list)}") # At least 1 (from variable)
# Create another reference
another_list = my_list
print(f"After assignment: {sys.getrefcount(my_list)}") # Now 2
# Function call increments ref count temporarily
def check_refcount(obj):
return sys.getrefcount(obj)
refcount = check_refcount(my_list)
print(f"Inside function: {refcount}") # Higher due to function parameter
# Delete reference
del another_list
print(f"After deletion: {sys.getrefcount(my_list)}") # Back to 1
# Variable goes out of scope
del my_list # Memory is freed here
Output:
Initial ref count: 2
After assignment: 3
Inside function: 4
After deletion: 2
Reference Counting Advantages and Limitations:
Aspect
Advantages
Limitations
Memory Freeing
Immediate, deterministic
Overhead on every assignment
Circular References
Simple for non-circular data
Cannot handle cycles automatically
Pause Time
No stop-the-world pauses
Continuous overhead
Performance
Predictable for most cases
Reference count updates can be slow
Python’s Garbage Collection Module
Circular references thinking they’re safe until the garbage collector shows up uninvited
While reference counting handles most memory management, Python includes a garbage collector to detect and clean up circular references—situations where objects reference each other and create a cycle that reference counting cannot break.
Understanding Circular References:
# Circular reference example
class Node:
def __init__(self, value):
self.value = value
self.next = None
# Create circular reference
node1 = Node(1)
node2 = Node(2)
node1.next = node2
node2.next = node1 # Creates a cycle
# Even after deleting variables, memory isn't freed without gc
del node1
del node2 # Memory may still be held due to circular reference
How the Garbage Collector Works:
import gc
# Check garbage collection status
print(f"Garbage collection enabled: {gc.isenabled()}")
# Get collection statistics
stats = gc.get_stats()
for stat in stats:
print(f"Generation {stat['collections']}: {stat}")
# Manually trigger garbage collection
collected = gc.collect()
print(f"Objects collected: {collected}")
# Disable automatic garbage collection for performance-critical code
gc.disable()
# ... performance-critical code ...
gc.collect() # Manual collection
gc.enable()
Generational Garbage Collection: Python’s garbage collector uses generational collection, based on the hypothesis that younger objects are more likely to be garbage than older objects. Objects are divided into three generations (0, 1, and 2), with more frequent collection of younger generations.
import gc
# Get garbage collection thresholds
thresholds = gc.get_threshold()
print(f"Collection thresholds: {thresholds}")
# Set custom thresholds
# gen0_threshold, gen1_threshold, gen2_threshold
gc.set_threshold(700, 10, 10)
# Get current generation stats
for i in range(3):
count = gc.get_count()[i]
print(f"Generation {i} object count: {count}")
# Force collection of specific generation
gc.collect(generation=0)
print("Generation 0 collection completed")
Memory leaks in Python happen when objects remain referenced long after they are useful. This is common with global caches, circular references in custom data structures, and forgotten event listeners. The tracemalloc module and objgraph library are your best tools for tracking these down.
A practical approach is to take memory snapshots at different points in your application and compare them. If certain object types keep growing between snapshots, you have found your leak. Combined with proper debugging techniques, you can isolate the exact line of code responsible.
import tracemalloc
import gc
tracemalloc.start()
# Take first snapshot
snapshot1 = tracemalloc.take_snapshot()
# Simulate work that might leak
leaked_objects = []
for i in range(10000):
leaked_objects.append({'data': 'x' * 100, 'index': i})
# Take second snapshot and compare
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("Top memory changes:")
for stat in top_stats[:5]:
print(stat)
You should also be aware that __del__ finalizer methods can prevent garbage collection of circular references in older Python versions. If you define __del__, Python’s cyclic garbage collector may not be able to determine a safe order to destroy objects that reference each other. The modern best practice is to use weakref for breaking circular references and avoid __del__ entirely when possible.
Hunting memory leaks at 2 AM because past you thought gc.collect() was optional
Real-Life Example: Memory-Efficient Data Pipeline
Suppose you need to process a 2 GB CSV file, but your server only has 512 MB of RAM. Loading the entire file into a list would crash your application instantly. Instead, you can use generators and careful memory management to process it in chunks:
import gc
import tracemalloc
tracemalloc.start()
def process_csv_in_chunks(filepath, chunk_size=1000):
"""Process a large CSV file without loading it all into memory."""
chunk = []
processed = 0
with open(filepath, 'r') as f:
header = f.readline().strip().split(',')
for line in f:
row = dict(zip(header, line.strip().split(',')))
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
processed += chunk_size
# Force garbage collection every 10k rows
if processed % 10000 == 0:
gc.collect()
if chunk:
yield chunk
# Usage
for batch in process_csv_in_chunks('transactions.csv'):
# Process each batch - only chunk_size rows in memory at a time
totals = sum(float(row.get('amount', 0)) for row in batch)
print(f"Batch total: {totals}")
# Check peak memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory: {current / 1024 / 1024:.1f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()
This pattern keeps memory usage constant regardless of file size. The generator yields one chunk at a time, the previous chunk becomes unreferenced and gets collected, and periodic gc.collect() calls ensure circular references do not accumulate. This is the same approach used in production data pipelines at companies processing millions of records daily. For even better performance, you can combine this with Python profiling and optimization techniques to identify exactly where your memory bottlenecks occur.
Frequently Asked Questions
Does Python have manual memory management?
No, Python handles memory allocation and deallocation automatically through its memory manager and garbage collector. However, you can influence the process using gc.collect() to trigger manual garbage collection, gc.disable() to turn off automatic collection, and weakref to create references that do not prevent garbage collection. You cannot directly allocate or free memory like in C or C++.
What causes memory leaks in Python?
The most common causes are: global variables or caches that grow unbounded, circular references between objects with __del__ finalizers (which prevent the cyclic garbage collector from cleaning them up), closures that capture large objects unintentionally, and C extension modules that do not properly release memory. Using tracemalloc to compare snapshots is the most reliable way to track down leaks.
How does Python’s garbage collector handle circular references?
Python’s cyclic garbage collector uses a generational approach with three generations (0, 1, and 2). New objects start in generation 0, and objects that survive collection are promoted to older generations. The collector detects reference cycles by temporarily removing all internal references between container objects and checking which objects become unreachable. This process runs automatically when the threshold for a generation is exceeded.
Should I call gc.collect() manually?
In most applications, you do not need to call gc.collect() manually — Python’s automatic collection works well for typical workloads. However, calling it manually is useful in specific scenarios: after deleting a large number of objects, during natural pauses in a long-running process, or when you need predictable memory usage in a memory-constrained environment. Avoid calling it in tight loops, as collection itself has a performance cost.
What is the difference between reference counting and garbage collection?
Reference counting is Python’s primary memory management mechanism — every object tracks how many references point to it, and the object is immediately freed when the count reaches zero. Garbage collection is the secondary mechanism that handles cases reference counting cannot: specifically, circular references where two or more objects reference each other and their counts never reach zero. Both work together to keep memory usage efficient. Understanding these mechanisms is also useful when working with Python 3.13’s free-threaded mode, which changes how reference counting works in concurrent code.
How can I monitor memory usage in a Python application?
Python provides several built-in and third-party tools: tracemalloc (built-in) traces memory allocations with source file and line information, sys.getsizeof() returns the size of individual objects, gc.get_objects() lists all tracked objects, and the third-party objgraph library visualizes object reference graphs. For production monitoring, psutil tracks overall process memory from outside the Python runtime. Combining tracemalloc snapshots with proper exception handling ensures you capture memory data even when errors occur.
Conclusion
Python’s memory management system is a carefully designed partnership between reference counting, the cyclic garbage collector, and the private heap allocator. Reference counting handles the majority of object cleanup instantly, while the generational garbage collector sweeps up circular references that reference counting misses. Together, they free you from manual memory management while still giving you tools like tracemalloc, gc, and weakref to monitor and control memory when performance demands it.
The key takeaways are: use generators and iterators for large datasets instead of loading everything into lists, watch out for circular references especially when defining __del__ methods, use tracemalloc to diagnose memory issues before they become production incidents, and understand that type hints combined with static analysis tools can help catch patterns that lead to memory issues early. Memory management might happen behind the scenes, but understanding how it works makes you a fundamentally better Python developer.
Debugging is an essential skill for every Python developer. Whether you’re tracking down a subtle logic error, identifying memory leaks, or understanding why your code behaves unexpectedly in production, having a solid debugging toolkit can save you hours of frustration. This comprehensive guide walks you through professional-grade debugging techniques, from simple print statements to advanced IDE features and logging strategies.
The journey from casual debugging involves understanding the right tool for each situation. Some developers rely entirely on print statements, while others prefer strategic logging. Effective debugging requires a multi-faceted approach: knowing when to use print statements for quick checks, when to deploy the interactive debugger for deep inspection, and when logging is your best friend for production issues.
Throughout this article, we’ll explore practical examples using Python’s standard library tools, popular IDEs, and battle-tested patterns that professional development teams use daily.
Your debugger is worth a thousand print statements. Learn to use it.
Print Debugging vs. Professional Debuggers
The Case for Print Statements
Print debugging is often dismissed by purists, but it’s actually a legitimate technique for certain scenarios. When you need quick answers about variable values at specific points in your code, a strategically placed print statement can give you instant feedback.
Professional debuggers offer capabilities that print statements simply cannot provide. They allow you to pause execution, inspect the entire program state, step through code line by line, and modify variables on the fly.
Python’s Built-in pdb Debugger
Python includes the pdb (Python Debugger) module, a powerful interactive debugging tool. Use pdb.set_trace() to insert breakpoints. Once paused, inspect variables, step through code with ‘n’ (next), ‘s’ (step), ‘c’ (continue), or ‘p variable’ (print).
pdb turns debugging from guesswork into systematic exploration.
IDE Debugging: VS Code and PyCharm
Visual Studio Code
VS Code’s Python extension provides full-featured graphical debugging. Create a launch configuration in .vscode/launch.json and set breakpoints by clicking line margins. VS Code displays variables in the sidebar and allows inline inspection via hover.
PyCharm’s Advanced Features
PyCharm offers Data Inspector for viewing complex structures, Evaluate Expression for running code immediately, Conditional Breakpoints, Logpoint for non-stopping messages, and Remote Debugging support.
Get systematic with your debugging approach.
The Logging Module for Production
Python’s logging module is far superior to print statements for production code. Use appropriate log levels: DEBUG for diagnostics, INFO for confirmation, WARNING for unexpected events, ERROR for serious problems, CRITICAL for system failures.
Structured logging (JSON format) enables easier analysis in log aggregation systems, turning a million log lines into actionable insights.
Structured logging turns chaos into searchable truth.
Debugging Patterns
Rubber Duck Debugging
Explain your code line-by-line to an inanimate object. This forces you to articulate assumptions and often reveals logical errors you missed.
Assertion-Based Debugging
Assertions validate preconditions and catch violations early. Remember they’re disabled in production with the -O flag, so use for development only.
Context Managers
Create reusable debugging utilities using context managers to log execution time without cluttering main code.
Right patterns turn logs into insights.
Reading Tracebacks
Always read tracebacks from bottom to top. The innermost frame usually indicates the actual error. Tracebacks show error type, message, and execution chain with line numbers pointing to the problem.
Different Environments
Local Development
Use full debugging capabilities, verbose logging, and interactive debuggers for maximum visibility.
Production Safety
Can’t attach debuggers to running servers. Rely on comprehensive logging to files and external services. Avoid logging sensitive data.
FAQ
Q1: Debug multi-threaded code?
Use thread-safe logging with thread identifiers. Avoid print statements. Consider using threading.Lock() to control execution order during debugging.
Q2: Debug remote code?
Most IDEs support remote debugging. VS Code and PyCharm allow connecting to Python processes on other machines.
Q3: Find memory leaks?
Use tracemalloc module to track memory allocation and show top allocators in your code.
Q4: Debugging vs profiling?
Debugging finds why code is broken. Profiling measures performance. Use debugging when code fails; profiling when code is correct but slow.
Q5: Debug in Docker?
Run Python with unbuffered output (-u flag), map debugger ports for IDE debugging, or rely on logging. VS Code Remote Containers extension helps locally.
Q6: Leave debug code in production?
Never leave breakpoint() calls. Remove pdb.set_trace(). Logging is appropriate but use right levels and never log sensitive data.
Conclusion
Professional debugging separates junior from experienced engineers. Master the spectrum: print statements for iteration, pdb for exploration, logging for production visibility, and assertions for early issue catching. Develop intuition through practice on real projects. Debugging isn’t failure—it’s inevitable in development. Efficient debugging means more time building features.
Python ships with an interactive debugger you don’t have to install. Drop breakpoint() into any line and the script pauses there, dropping you into a REPL with full access to local variables:
# example.py
def transform(items):
result = []
for i in items:
breakpoint() # script stops here, pdb prompt appears
result.append(i * 2)
return result
transform([1, 2, 3])
The pdb prompt accepts: n (next line), s (step into function), c (continue to next breakpoint), p var (print value), l (list source around current line), w (show stack trace), q (quit). Just typing a variable name evaluates it.
For Python 3.7+, breakpoint() is preferred over the older import pdb; pdb.set_trace() — it respects the PYTHONBREAKPOINT env var, so you can swap debuggers without editing source. Set PYTHONBREAKPOINT=ipdb.set_trace to use ipdb (better tab completion, colors); set PYTHONBREAKPOINT=0 to disable all breakpoints in production.
Post-Mortem Debugging
When a script crashes, you can drop into pdb at the moment of failure to inspect what went wrong — without modifying the source:
# Run the script normally
python script.py
# (it crashes)
# Now run it under post-mortem pdb
python -m pdb script.py
# at the prompt: c (continue) — runs until exception
# then automatically lands you at the crash point with the stack intact
# Or trigger post-mortem from inside code:
import pdb, sys, traceback
try:
risky_operation()
except Exception:
traceback.print_exc()
pdb.post_mortem()
Post-mortem is the single most underused debugging technique. Beats adding print statements after the fact and re-running.
logging vs print()
For anything more complex than a script, replace print() with logging. The difference matters: logging has levels (DEBUG, INFO, WARNING, ERROR, CRITICAL), can be routed to files / syslog / cloud, includes timestamps and call sites, and can be tuned per module without code changes:
logger.exception() inside an except block is the killer feature — it records the message AND the full stack trace, ready to ship to your log aggregator.
IDE Debuggers: VS Code and PyCharm
Graphical debuggers beat pdb for complex sessions. VS Code’s Python extension and PyCharm both give you:
Click-to-set breakpoints in the gutter
Conditional breakpoints (“break only if x > 100”)
Step over / into / out buttons
Variable inspector showing all locals (and globals)
Watch expressions that auto-evaluate as you step
Call stack with click-to-jump-to-frame
For a one-off bug, pdb at the terminal is faster. For a meaty multi-hour debugging session, an IDE debugger pays for itself.
Tracing and Profiling: Beyond Debugging
Sometimes “where is this slow?” or “why is this called 1000 times?” matters more than “what’s the bug?”. For those questions, profiling tools are the right answer:
# Built-in cProfile — function call counts and timings
import cProfile
cProfile.run("expensive_function()", sort="cumulative")
# tracemalloc — memory allocation tracking
import tracemalloc
tracemalloc.start()
result = process_data()
snapshot = tracemalloc.take_snapshot()
for stat in snapshot.statistics("lineno")[:10]:
print(stat)
# py-spy — sampling profiler that runs against a live process
# pip install py-spy
# Then: py-spy top --pid 12345
# Or: py-spy record -o profile.svg -- python myscript.py
For production performance debugging, py-spy is unmatched — it attaches to a running process without restarting it, so you can profile a real workload as it happens.
Common Pitfalls
Leaving breakpoints in production. A forgotten breakpoint() in a server will block the request thread until somebody types c at the (invisible) prompt. Set PYTHONBREAKPOINT=0 in production env to neutralize them.
print() debugging on large data.print(huge_list) prints thousands of lines, scrolling away the useful info. Use pprint with a depth limit, or print just len(huge_list).
Catching exceptions too broadly.try: ... except: pass hides bugs forever. Catch specific exceptions and re-raise unexpected ones.
Ignoring warnings. Python’s warning system flags deprecated APIs and likely bugs. Run with python -W error in dev to surface them.
Debugging multi-threaded code without thread names. Logs from 10 threads interleave into chaos unless you include the thread name in the format: %(threadName)s.
FAQ
Q: pdb, ipdb, pudb, or PyCharm?
A: pdb when you can’t install anything. ipdb for tab completion and colors. pudb for a TUI experience. PyCharm/VS Code when you want a real GUI and have the tooling installed.
Q: How do I debug a script that runs in production?
A: Don’t drop into pdb. Use structured logging + log aggregation + py-spy attach. Production debugging is reading logs and traces, not stepping through code.
Q: How do I debug async code?
A: breakpoint() works inside async functions. The pdb prompt suspends the coroutine. For more complex async debugging, the IDE debuggers handle await stepping much better than pdb.
Q: How do I find which test is slow?
A: pytest --durations=10 shows the 10 slowest tests. Combine with pytest -k "test_name" + cProfile for line-by-line breakdown.
Q: How do I debug memory leaks?
A: tracemalloc with snapshots before/after the suspect code, compare allocation sites. For production, install memray for a heap profile across a real workload.
Wrapping Up
The single biggest debugging improvement most Python developers can make: stop using print() and start using logging + breakpoint(). The second-biggest: learn post-mortem pdb — it turns every crash into a diagnostic session. The IDE debugger is the heavyweight option when those aren’t enough. And for “why is this slow?”, profilers (cProfile, py-spy) answer questions that no amount of stepping through code will reveal.
Every Python developer has written code that works perfectly in testing and then explodes in production. The difference between amateur and professional Python code often comes down to one thing: how you handle exceptions. Proper exception handling isn’t about wrapping everything in try/except blocks–it’s about anticipating failure modes, recovering gracefully, and giving yourself enough information to fix problems quickly.
This guide covers everything from basic try/except mechanics to advanced patterns used in production systems. You’ll learn when to catch exceptions, when to let them propagate, how to create custom exception hierarchies, and patterns that will save you hours of debugging. We’re not just covering syntax–we’re covering the thinking behind robust error handling.
Here’s what we’ll work through: the exception hierarchy in Python, try/except/else/finally blocks, catching specific vs broad exceptions, raising and re-raising exceptions, custom exception classes, context managers for cleanup, logging strategies, and real-world patterns for API error handling. By the end, your code will fail gracefully instead of catastrophically.
Quick Example: The Right Way vs The Wrong Way
Before diving deep, here’s the difference between amateur and professional exception handling:
# Bad: Catches everything, hides bugs
try:
result = process_data(user_input)
except:
print("Something went wrong")
# Good: Specific, informative, recoverable
try:
result = process_data(user_input)
except ValidationError as e:
logger.warning(f"Invalid input from user: {e}")
return {"error": str(e), "field": e.field_name}, 400
except DatabaseError as e:
logger.error(f"Database failure processing request: {e}", exc_info=True)
return {"error": "Service temporarily unavailable"}, 503
except Exception as e:
logger.critical(f"Unexpected error: {e}", exc_info=True)
raise
The first example swallows every error silently. The second catches specific exceptions, logs useful context, returns appropriate HTTP status codes, and re-raises unexpected errors so they don’t go unnoticed. That’s the pattern we’re building toward.
Reading the actual error message is the most underrated debugging technique in programming.
Understanding Python’s Exception Hierarchy
Python’s exceptions form a class hierarchy rooted at BaseException. Understanding this hierarchy is critical because your except clauses catch the specified exception and all its subclasses.
This is why you should almost never write except BaseException or bare except:–they catch KeyboardInterrupt and SystemExit, preventing users from stopping your program with Ctrl+C. Always catch Exception at most.
Exception
When It Occurs
Common Cause
ValueError
Right type, wrong value
int(\”abc\”), invalid arguments
TypeError
Wrong type entirely
len(42), \”hello\” + 5
KeyError
Dict key missing
my_dict[\”nonexistent\”]
IndexError
List index out of range
my_list[999]
AttributeError
Object lacks attribute
None.split()
FileNotFoundError
File doesn’t exist
open(\”missing.txt\”)
ConnectionError
Network failure
requests.get() timeout
Mastering try/except/else/finally
Most developers know try/except. Fewer use else and finally correctly. Each block has a specific purpose, and using all four makes your intent crystal clear.
import json
def load_config(filepath):
\"\"\"Load and parse a JSON config file with proper error handling\"\"\"
try:
# Only put code that might raise the expected exception here
with open(filepath, 'r') as f:
raw_data = f.read()
except FileNotFoundError:
print(f\"Config file not found: {filepath}\")
return get_default_config()
except PermissionError:
print(f\"No permission to read: {filepath}\")
raise
else:
# Runs only if try block succeeded (no exception)
# Put code here that depends on try's success
# but shouldn't be protected by the except
try:
config = json.loads(raw_data)
except json.JSONDecodeError as e:
print(f\"Invalid JSON in {filepath}: {e}\")
return get_default_config()
return config
finally:
# Always runs, even if an exception was raised
# Use for cleanup that must happen regardless
print(f\"Config loading attempt for {filepath} complete\")
def get_default_config():
return {\"debug\": False, \"log_level\": \"INFO\"}
# Usage
config = load_config(\"settings.json\")
print(config)
Config file not found: settings.json
Config loading attempt for settings.json complete
{\"debug\": False, \"log_level\": \"INFO\"}
The else block is often overlooked but serves an important purpose: it separates \”code that might fail\” from \”code that should run only on success.\” This prevents accidentally catching exceptions you didn’t intend to handle.
Catching Specific Exceptions (And Why Order Matters)
Python evaluates except clauses top to bottom and executes the first matching one. Since exceptions form a hierarchy, order matters–put specific exceptions before general ones.
import requests
import json
def fetch_api_data(url):
\"\"\"Fetch data from an API with granular error handling\"\"\"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
return data
except requests.exceptions.Timeout:
# Most specific: request timed out
print(f\"Request to {url} timed out after 10 seconds\")
return None
except requests.exceptions.ConnectionError:
# Network-level failure
print(f\"Could not connect to {url}\")
return None
except requests.exceptions.HTTPError as e:
# Server returned error status code
status = e.response.status_code
if status == 404:
print(f\"Resource not found: {url}\")
elif status == 429:
print(f\"Rate limited. Retry after: {e.response.headers.get('Retry-After', 'unknown')}\")
elif status >= 500:
print(f\"Server error ({status}) at {url}\")
return None
except json.JSONDecodeError:
# Response wasn't valid JSON
print(f\"Invalid JSON response from {url}\")
return None
except requests.exceptions.RequestException as e:
# Catch-all for requests library (parent class)
print(f\"Unexpected request error: {e}\")
return None
# Test with various URLs
data = fetch_api_data(\"https://api.github.com/users/python\")
print(f\"Got data: {type(data)}\")
Output:
Got data: <class 'dict'>
If you reversed the order and put RequestException first, it would catch Timeout, ConnectionError, and HTTPError before their specific handlers ever ran. Always go from most specific to most general.
Understanding the exception hierarchy saves you from catching errors you never meant to handle.
Raising and Re-Raising Exceptions
Sometimes you need to raise exceptions yourself, or catch one and re-raise it after logging. Python gives you three patterns for this.
import logging
logger = logging.getLogger(__name__)
def validate_age(age):
\"\"\"Validate age with descriptive error messages\"\"\"
if not isinstance(age, (int, float)):
raise TypeError(f\"Age must be a number, got {type(age).__name__}\")
if age < 0:
raise ValueError(f\"Age cannot be negative: {age}\")
if age > 150:
raise ValueError(f\"Age seems unrealistic: {age}\")
return int(age)
def process_user_registration(data):
\"\"\"Process registration with re-raising pattern\"\"\"
try:
age = validate_age(data.get('age'))
# ... more processing
return {\"status\": \"success\", \"age\": age}
except (TypeError, ValueError) as e:
# Log and re-raise: preserves original traceback
logger.error(f\"Validation failed for user data: {e}\")
raise # Re-raises the SAME exception with original traceback
except Exception as e:
# Wrap in a new exception: chain for context
logger.critical(f\"Unexpected error during registration: {e}\")
raise RuntimeError(\"Registration system failure\") from e
# Exception chaining with 'from'
def connect_to_database(config):
\"\"\"Demonstrate exception chaining\"\"\"
try:
# Simulating a connection attempt
if not config.get('host'):
raise KeyError('host')
except KeyError as e:
# 'from e' chains the original exception
raise ConnectionError(
f\"Cannot connect: missing config key '{e}'\"
) from e
# Test it
try:
validate_age(-5)
except ValueError as e:
print(f\"Caught: {e}\")
try:
validate_age(\"twenty\")
except TypeError as e:
print(f\"Caught: {e}\")
try:
connect_to_database({})
except ConnectionError as e:
print(f\"Caught: {e}\")
print(f\"Caused by: {e.__cause__}\")
Output:
Caught: Age cannot be negative: -5
Caught: Age must be a number, got str
Caught: Cannot connect: missing config key 'host'
Caused by: 'host'
The raise without arguments re-raises the current exception with its original traceback intact. The raise X from Y syntax creates an exception chain, so the traceback shows both the new error and what caused it.
Creating Custom Exception Hierarchies
For any non-trivial application, create custom exceptions. They make your code self-documenting and let callers handle specific error conditions without parsing error messages.
class AppError(Exception):
\"\"\"Base exception for our application\"\"\"
def __init__(self, message, code=None, details=None):
super().__init__(message)
self.code = code
self.details = details or {}
class ValidationError(AppError):
\"\"\"Input validation failed\"\"\"
def __init__(self, message, field=None, **kwargs):
super().__init__(message, code=\"VALIDATION_ERROR\", **kwargs)
self.field = field
class NotFoundError(AppError):
\"\"\"Requested resource doesn't exist\"\"\"
def __init__(self, resource_type, resource_id):
message = f\"{resource_type} with id '{resource_id}' not found\"
super().__init__(message, code=\"NOT_FOUND\")
self.resource_type = resource_type
self.resource_id = resource_id
class AuthenticationError(AppError):
\"\"\"User authentication failed\"\"\"
def __init__(self, message=\"Authentication required\"):
super().__init__(message, code=\"AUTH_ERROR\")
class RateLimitError(AppError):
\"\"\"Too many requests\"\"\"
def __init__(self, retry_after=60):
message = f\"Rate limit exceeded. Retry after {retry_after} seconds\"
super().__init__(message, code=\"RATE_LIMIT\")
self.retry_after = retry_after
# Using custom exceptions
def get_user(user_id):
users = {\"1\": \"Alice\", \"2\": \"Bob\"}
if not isinstance(user_id, str):
raise ValidationError(\"User ID must be a string\", field=\"user_id\")
if user_id not in users:
raise NotFoundError(\"User\", user_id)
return users[user_id]
def handle_request(user_id):
\"\"\"Handler showing how custom exceptions simplify error responses\"\"\"
try:
user = get_user(user_id)
return {\"status\": \"success\", \"user\": user}
except ValidationError as e:
return {\"error\": e.code, \"message\": str(e), \"field\": e.field}
except NotFoundError as e:
return {\"error\": e.code, \"message\": str(e)}
except AppError as e:
return {\"error\": e.code, \"message\": str(e)}
# Test
print(handle_request(\"1\"))
print(handle_request(\"99\"))
print(handle_request(42))
Output:
{'status': 'success', 'user': 'Alice'}
{'error': 'NOT_FOUND', 'message': \"User with id '99' not found\"}
{'error': 'VALIDATION_ERROR', 'message': 'User ID must be a string', 'field': 'user_id'}
Custom exception hierarchies turn something broke into this specific thing broke for this specific reason.
Context Managers for Guaranteed Cleanup
Context managers (the with statement) are Python’s best tool for ensuring cleanup happens even when exceptions occur. They’re cleaner than try/finally for resource management.
import sqlite3
from contextlib import contextmanager
@contextmanager
def database_connection(db_path):
\"\"\"Context manager for database connections with automatic rollback\"\"\"
conn = sqlite3.connect(db_path)
try:
yield conn
conn.commit() # Only commits if no exception occurred
except Exception:
conn.rollback() # Rollback on any error
raise # Re-raise so caller knows something failed
finally:
conn.close() # Always close the connection
@contextmanager
def temporary_file(filepath, mode='w'):
\"\"\"Write to a temp file, then atomically rename on success\"\"\"
import os
temp_path = filepath + '.tmp'
f = open(temp_path, mode)
try:
yield f
f.close()
os.replace(temp_path, filepath) # Atomic rename
except Exception:
f.close()
if os.path.exists(temp_path):
os.remove(temp_path) # Clean up temp file on failure
raise
# Usage
with database_connection(\":memory:\") as conn:
cursor = conn.cursor()
cursor.execute(\"CREATE TABLE users (name TEXT, age INTEGER)\")
cursor.execute(\"INSERT INTO users VALUES ('Alice', 30)\")
cursor.execute(\"SELECT * FROM users\")
print(cursor.fetchall())
# The connection is guaranteed to be closed, committed on success
# or rolled back on failure
Output:
[('Alice', 30)]
The @contextmanager decorator from contextlib lets you write context managers as generator functions. Everything before yield is your setup, and everything after is your cleanup. The try/except/finally inside ensures proper handling regardless of what happens.
Logging Exceptions Effectively
Print statements aren’t sufficient for production. Use Python’s logging module with structured information that helps you diagnose issues quickly.
import logging
import traceback
import sys
# Configure logging with useful format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(\"myapp\")
def process_payment(user_id, amount):
\"\"\"Demonstrate different logging levels for exceptions\"\"\"
try:
if amount <= 0:
raise ValueError(f\"Invalid amount: {amount}\")
if amount > 10000:
# Warning: not an error, but worth noting
logger.warning(
\"Large payment detected\",
extra={\"user_id\": user_id, \"amount\": amount}
)
# Simulate processing
if user_id == \"blocked\":
raise PermissionError(\"User account is blocked\")
logger.info(f\"Payment processed: user={user_id}, amount=${amount:.2f}\")
return True
except ValueError as e:
# Client error: log as warning, don't need full traceback
logger.warning(f\"Invalid payment request: {e}\")
return False
except PermissionError as e:
# Expected business logic error
logger.error(f\"Payment blocked: user={user_id}, reason={e}\")
return False
except Exception as e:
# Unexpected error: log as critical WITH traceback
logger.critical(
f\"Payment system failure: user={user_id}, amount={amount}\",
exc_info=True # This includes the full traceback
)
raise
# Test scenarios
process_payment(\"user123\", 50.00)
process_payment(\"user456\", -10)
process_payment(\"blocked\", 100)
The key insight: use exc_info=True for unexpected exceptions where you need the full stack trace. For expected exceptions (validation errors, business logic), a simple message is sufficient. Don’t log full tracebacks for every caught exception–it creates noise that obscures real problems.
Good logging turns a mystery crash into a five-minute fix.
Real-World Example: Resilient API Client with Retry Logic
Here’s a production-grade pattern combining everything we’ve covered: custom exceptions, context managers, logging, and retry logic for an API client.
import time
import logging
import json
from functools import wraps
logger = logging.getLogger(__name__)
class APIError(Exception):
\"\"\"Base API exception\"\"\"
def __init__(self, message, status_code=None, response_body=None):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class RetryableError(APIError):
\"\"\"Error that should trigger a retry\"\"\"
pass
class FatalError(APIError):
\"\"\"Error that should NOT be retried\"\"\"
pass
def retry_on_failure(max_retries=3, base_delay=1, backoff_factor=2):
\"\"\"Decorator that retries on RetryableError with exponential backoff\"\"\"
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except RetryableError as e:
last_exception = e
if attempt < max_retries:
delay = base_delay * (backoff_factor ** attempt)
logger.warning(
f\"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. \"
f\"Retrying in {delay}s...\"
)
time.sleep(delay)
else:
logger.error(
f\"All {max_retries + 1} attempts failed for {func.__name__}\"
)
except FatalError:
# Don't retry fatal errors
raise
raise last_exception
return wrapper
return decorator
class APIClient:
\"\"\"Resilient API client with proper exception handling\"\"\"
def __init__(self, base_url, api_key):
self.base_url = base_url
self.api_key = api_key
self._session = None
def _classify_error(self, status_code, response_body):
\"\"\"Classify HTTP errors as retryable or fatal\"\"\"
if status_code in (429, 502, 503, 504):
raise RetryableError(
f\"Server returned {status_code}\",
status_code=status_code,
response_body=response_body
)
elif status_code in (400, 401, 403, 404, 422):
raise FatalError(
f\"Client error {status_code}: {response_body}\",
status_code=status_code,
response_body=response_body
)
else:
raise APIError(
f\"Unexpected status {status_code}\",
status_code=status_code,
response_body=response_body
)
@retry_on_failure(max_retries=3, base_delay=1)
def get(self, endpoint):
\"\"\"GET request with automatic retry on transient failures\"\"\"
import urllib.request
import urllib.error
url = f\"{self.base_url}/{endpoint}\"
req = urllib.request.Request(url)
req.add_header('Authorization', f'Bearer {self.api_key}')
try:
with urllib.request.urlopen(req, timeout=10) as response:
data = json.loads(response.read().decode())
logger.info(f\"GET {endpoint}: success\")
return data
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else \"\"
self._classify_error(e.code, body)
except urllib.error.URLError as e:
raise RetryableError(f\"Connection failed: {e.reason}\")
except json.JSONDecodeError as e:
raise FatalError(f\"Invalid JSON response: {e}\")
@retry_on_failure(max_retries=2, base_delay=2)
def post(self, endpoint, data):
\"\"\"POST request with retry logic\"\"\"
import urllib.request
import urllib.error
url = f\"{self.base_url}/{endpoint}\"
payload = json.dumps(data).encode('utf-8')
req = urllib.request.Request(url, data=payload, method='POST')
req.add_header('Authorization', f'Bearer {self.api_key}')
req.add_header('Content-Type', 'application/json')
try:
with urllib.request.urlopen(req, timeout=30) as response:
result = json.loads(response.read().decode())
logger.info(f\"POST {endpoint}: success\")
return result
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else \"\"
self._classify_error(e.code, body)
except urllib.error.URLError as e:
raise RetryableError(f\"Connection failed: {e.reason}\")
# Usage example
client = APIClient(\"https://api.example.com\", \"my-api-key\")
try:
users = client.get(\"users\")
print(f\"Fetched {len(users)} users\")
except RetryableError as e:
print(f\"Service unavailable after retries: {e}\")
except FatalError as e:
print(f\"Request invalid: {e}\")
except APIError as e:
print(f\"API error: {e}\")
This pattern handles transient failures (network issues, rate limits, server errors) with automatic retry and exponential backoff. Fatal errors like 401 or 404 fail immediately since retrying won't help. The decorator separates retry logic from business logic, keeping your code clean.
Exponential backoff is the polite way of saying I will keep trying, but I will wait longer each time.
Frequently Asked Questions
Should I use except Exception or except BaseException?
Almost always except Exception. The BaseException class includes SystemExit, KeyboardInterrupt, and GeneratorExit, which you almost never want to catch. Catching KeyboardInterrupt prevents Ctrl+C from working. Only use BaseException in top-level cleanup code where you truly need to catch everything.
When should I use EAFP vs LBYL style?
EAFP (Easier to Ask Forgiveness than Permission) means using try/except. LBYL (Look Before You Leap) means checking conditions first. Python idiomatically prefers EAFP. Use try/except when the check would be expensive or racy (like file existence checks). Use LBYL when the check is cheap and makes code clearer, like if key in dict.
Is it bad to use bare except clauses?
Yes, except: without specifying an exception type catches everything including SystemExit and KeyboardInterrupt. It also makes debugging harder because you don't know what went wrong. Always specify at least except Exception, and prefer more specific exception types.
How do I handle exceptions in async code?
Async exception handling uses the same try/except syntax inside async functions. For gathering multiple coroutines, use asyncio.gather(return_exceptions=True) to collect exceptions instead of failing on the first one. For task groups in Python 3.11+, use asyncio.TaskGroup which raises ExceptionGroup containing all child exceptions.
What's the performance cost of try/except?
In Python, entering a try block has virtually zero cost when no exception occurs. Exceptions are \"zero-cost\" on the happy path. However, actually raising and catching exceptions is expensive--roughly 10-100x slower than a simple if/else check. Don't use exceptions for normal control flow (like iterating with IndexError). Use them for truly exceptional conditions.
How do I test exception handling code?
Use pytest.raises as a context manager to verify exceptions are raised correctly. For testing retry logic, use unittest.mock.patch to simulate failures. Test both the happy path and each exception path. Verify that the right exception type, message, and attributes are set.
Wrapping Up
Exception handling separates production-ready code from scripts that work on your laptop. The patterns we covered--specific exception catching, custom hierarchies, context managers, logging strategies, and retry decorators--form the foundation of resilient Python applications.
The key principles to remember: catch specific exceptions rather than broad ones, use else and finally blocks intentionally, create custom exception classes for your domain, log with appropriate severity levels, and design your error recovery strategy before writing the happy path code.
Start applying these patterns incrementally. Pick one codebase and replace bare except clauses with specific ones. Add custom exceptions to your next project. Build a retry decorator for your API calls. Each improvement makes your code more debuggable and your production systems more reliable.
Catch the most specific exception first, then progressively broader ones. Bare except: catches EVERYTHING including KeyboardInterrupt — almost always wrong.
The Three Anti-Patterns
Bare except.except: with no class catches every exception, including system-level ones like KeyboardInterrupt. Use except Exception: as the fallback if you genuinely don't know.
Silently swallowing.except Exception: pass hides bugs forever. At minimum, log the exception. Better, re-raise after cleanup.
Catching too broadly.try: do_a(); do_b(); do_c() except Exception: can't tell you which call failed. Wrap each risky operation separately.
The else and finally Clauses
try:
data = parse_user_input(raw)
except ValueError as e:
logger.error("Invalid input: %s", e)
return error_response(400)
else:
# Runs ONLY if try block succeeded — no implicit catch of other errors
save_to_db(data)
notify_user(data)
finally:
# Runs whether the try succeeded, failed, or raised
close_connection()
The else block runs only on success — useful when you want code that's clearly NOT covered by the except. finally always runs — perfect for cleanup that must happen regardless.
Raising With Context
# Re-raise the same exception
try:
risky()
except ValueError:
log_diagnostics()
raise # re-raises the current exception
# Wrap with a higher-level exception
try:
response = requests.get(url)
return response.json()
except requests.RequestException as e:
raise APIError("Could not fetch user data") from e
# Chain explicitly (preserves original traceback)
raise ValidationError("bad input") from original_exception
The from keyword preserves the chain — readers see "while handling X, Y happened" — invaluable for debugging.
Custom Exception Hierarchies
class APIError(Exception):
pass
class APIConnectionError(APIError):
pass
class APIRateLimit(APIError):
def __init__(self, retry_after):
self.retry_after = retry_after
super().__init__("Rate limited, retry in %s seconds" % retry_after)
# Callers can catch broadly OR narrowly
try:
call_api()
except APIRateLimit as e:
time.sleep(e.retry_after)
except APIError:
raise
# Or in one catch with different handling
try:
call_api()
except APIError as e:
if isinstance(e, APIRateLimit):
retry(after=e.retry_after)
else:
raise
Exception Groups (Python 3.11+)
try:
asyncio.run(parallel_tasks())
except* ValueError as eg:
for e in eg.exceptions:
log.warning("Validation failed: %s", e)
except* ConnectionError as eg:
log.error("%s connections failed", len(eg.exceptions))
except* handles multiple errors raised in parallel — essential for async code where multiple tasks may fail simultaneously.
Common Pitfalls
except Exception as e: print(e). Loses the traceback. Use logger.exception(...) or traceback.print_exc().
Catching to silence linter warnings. A try/except that does nothing isn't a fix — it's hiding a bug.
Raising strings.raise "error" is a TypeError. Always raise an exception instance: raise ValueError("...").
Not closing resources on exception. Use with blocks (context managers) instead of try/finally for file handles, locks, DB sessions.
Catching system-exit exceptions. KeyboardInterrupt, SystemExit, GeneratorExit don't inherit from Exception. except Exception skips them — usually what you want.
FAQ
Q: Look before you leap, or ask forgiveness? A: Python idiom is EAFP — try the operation, catch on failure. LBYL has race conditions (the file might be deleted between exists() and open()).
Q: When to raise vs return None? A: Raise for programming errors (bad arguments, missing files, network failures the caller can't handle). Return None for "expected absence" (key not in dict, optional config value).
Q: How do I see the full traceback in production? A: logger.exception("msg") inside the except block. Or set up a global handler with sys.excepthook.
Q: Should I catch and ignore in tests? A: Use pytest.raises(ExpectedError) to assert errors. Don't catch errors in tests — they're meant to fail loudly.
Q: Performance cost of try/except? A: Negligible if no exception fires. Raising is expensive — don't use exceptions for normal control flow.
Wrapping Up
Good exception handling is mostly about NOT catching things you shouldn't. Catch specific exceptions, never bare except, always log or re-raise. Use context managers for cleanup, custom exceptions for domain errors, and raise X from Y to preserve diagnostic chains. These patterns prevent the silent failures that cause production headaches.