How To Use Python traceback Module for Error Reporting

How To Use Python traceback Module for Error Reporting

Intermediate

Your production script catches an exception in a broad except block, logs the message, and moves on. Three weeks later, a customer reports a silent failure and all you have in your log file is "Error: 'NoneType' object is not subscriptable" — no file name, no line number, no call stack. Diagnosing that takes hours instead of minutes. The fix is giving Python’s traceback module ten minutes of your attention now.

The traceback module exposes the same machinery that Python uses when it prints unhandled exceptions to the terminal. You can capture that output as a string, route it to a log file, send it to a monitoring service, strip it to just the last line, or walk its structure line by line. It is in the standard library and requires no installation.

In this article you will learn traceback.format_exc() for capturing tracebacks as strings, print_exc() for immediate output, TracebackException for structured inspection, chained exceptions, and a real-world error reporting pipeline that writes structured JSON error logs.

Python traceback Module: Quick Example

Here is the minimum viable pattern: catch an exception, capture its full traceback as a string, and log it instead of losing it.

# traceback_quick.py
import traceback
import logging

logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s %(message)s')

def divide(a, b):
    return a / b

def process(values):
    results = []
    for pair in values:
        try:
            results.append(divide(pair[0], pair[1]))
        except Exception:
            tb_str = traceback.format_exc()
            logging.error("Failed to process %s:\n%s", pair, tb_str)
            results.append(None)
    return results

pairs = [(10, 2), (5, 0), (8, 4)]
print(process(pairs))
2026-04-26 10:00:00,000 ERROR Failed to process (5, 0):
Traceback (most recent call last):
  File "traceback_quick.py", line 11, in process
    results.append(divide(pair[0], pair[1]))
  File "traceback_quick.py", line 5, in divide
    return a / b
ZeroDivisionError: division by zero

[5.0, None, 2.0]

traceback.format_exc() captures the current exception’s full traceback as a string — file names, line numbers, and the full call stack — exactly as Python would print it unhandled. Call it inside an except block while the exception is still active. Outside an except block it returns 'NoneType: None\n', which is useless.

What Is the traceback Module?

When Python raises an unhandled exception, it calls internal machinery to format and print the traceback you see in the terminal. The traceback module exposes that machinery as a public API so your code can do the same thing programmatically.

FunctionOutputBest for
format_exc()StringLogging, monitoring, string manipulation
print_exc()Prints to stderrQuick debug prints in scripts
format_exception(exc)List of stringsBuilding structured output line by line
print_exception(exc)Prints to stderrPrinting a stored exception later
extract_tb(tb)StackSummaryWalking frames individually
TracebackExceptionObjectStructured inspection, JSON logging

The most important distinction is between “format” functions (return strings/lists you can do things with) and “print” functions (write directly to stderr). In production code, almost always use the format versions so you can route output to your logging infrastructure.

format_exc and print_exc

format_exc() is the single most useful function in the module. It takes no required arguments and returns the full traceback of the current exception as a single string, ready to log or store.

# traceback_format.py
import traceback
import sys

def level3():
    raise ValueError("Something went wrong deep in level3")

def level2():
    level3()

def level1():
    level2()

# --- format_exc: capture as string ---
try:
    level1()
except ValueError:
    tb_string = traceback.format_exc()
    print("=== Captured traceback ===")
    print(tb_string)
    print("=== Last line only ===")
    print(tb_string.strip().split('\n')[-1])

# --- print_exc: print directly to stderr ---
try:
    level1()
except ValueError:
    print("\n=== print_exc output (goes to stderr) ===", file=sys.stderr)
    traceback.print_exc()

# --- format_exc with limit: show only top 2 frames ---
try:
    level1()
except ValueError:
    limited = traceback.format_exc(limit=2)
    print("\n=== Limited to 2 frames ===")
    print(limited)
=== Captured traceback ===
Traceback (most recent call last):
  File "traceback_format.py", line 13, in <module>
    level1()
  File "traceback_format.py", line 11, in level1
    level2()
  File "traceback_format.py", line 8, in level2
    level3()
  File "traceback_format.py", line 5, in level3
    raise ValueError("Something went wrong deep in level3")
ValueError: Something went wrong deep in level3

=== Last line only ===
ValueError: Something went wrong deep in level3

Extracting the last line (tb_string.strip().split('\n')[-1]) gives you the exception type and message — ideal for a brief alert subject line. The limit parameter controls how many frames to show (positive = from the top, negative = from the bottom), which is useful for very deep call stacks where only the innermost frames matter.

TracebackException for Structured Inspection

TracebackException is the object-oriented API for exceptions. It captures all the information about an exception in a structured form that you can inspect field by field — perfect for building JSON error logs or custom error display formats.

# traceback_te.py
import traceback
import json

def risky_parse(text):
    return int(text)  # will fail if text is not a number

try:
    risky_parse("hello")
except ValueError as exc:
    te = traceback.TracebackException.from_exception(exc)

    # Structured fields
    print("Type   :", te.exc_type.__name__)
    print("Message:", str(te))
    print()

    # Walk the stack frames
    print("Stack frames:")
    for frame in te.stack:
        print(f"  {frame.filename}:{frame.lineno} in {frame.name}")
        print(f"    {frame.line}")

    # Format as a list of strings (same as format_exc but as a list)
    lines = list(te.format())
    print()
    print(f"Total lines in traceback: {len(lines)}")

    # Build a JSON-serializable error record
    error_record = {
        'type': te.exc_type.__name__,
        'message': str(te),
        'frames': [
            {'file': f.filename, 'line': f.lineno, 'function': f.name, 'code': f.line}
            for f in te.stack
        ]
    }
    print()
    print(json.dumps(error_record, indent=2))
Type   : ValueError
Message: invalid literal for int() with base 10: 'hello'

Stack frames:
  traceback_te.py:12 in <module>
    risky_parse("hello")
  traceback_te.py:5 in risky_parse
    return int(text)

Total lines in traceback: 7

{
  "type": "ValueError",
  "message": "invalid literal for int() with base 10: 'hello'",
  "frames": [
    {"file": "traceback_te.py", "line": 12, "function": "<module>", "code": "risky_parse(\"hello\")"},
    {"file": "traceback_te.py", "line": 5, "function": "risky_parse", "code": "return int(text)"}
  ]
}

The JSON-serializable error record is the foundation of structured error logging. Send it to your monitoring service (Sentry, Datadog, Elasticsearch) and you can filter by exception type, group by file, alert on specific error messages, or build dashboards showing which functions fail most often.

Chained Exceptions and __cause__ vs __context__

When one exception is raised while handling another, Python chains them. The traceback module understands and displays these chains, and TracebackException exposes them as __cause__ (explicit: raise X from Y) and __context__ (implicit: exception raised inside an except block).

# traceback_chained.py
import traceback

def fetch_config(path):
    raise FileNotFoundError(f"Config file not found: {path}")

def start_app(config_path):
    try:
        fetch_config(config_path)
    except FileNotFoundError as e:
        raise RuntimeError("Application startup failed") from e

try:
    start_app('/etc/app/config.yaml')
except RuntimeError:
    # format_exc shows the full chain automatically
    print(traceback.format_exc())

# Inspect the chain with TracebackException
try:
    start_app('/etc/app/config.yaml')
except RuntimeError as exc:
    te = traceback.TracebackException.from_exception(exc, chain=True)
    for part in te.format():
        print(part, end='')
Traceback (most recent call last):
  File "traceback_chained.py", line 9, in start_app
    fetch_config(config_path)
  File "traceback_chained.py", line 4, in fetch_config
    raise FileNotFoundError(f"Config file not found: {path}")
FileNotFoundError: Config file not found: /etc/app/config.yaml

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "traceback_chained.py", line 13, in <module>
    start_app('/etc/app/config.yaml')
  File "traceback_chained.py", line 11, in start_app
    raise RuntimeError("Application startup failed") from e
RuntimeError: Application startup failed

The raise X from Y pattern creates an explicit cause chain and is the recommended way to wrap low-level exceptions in higher-level ones without losing the original cause. Always use it when you catch a library exception and re-raise as a domain exception — it preserves the root cause in logs and debuggers.

Real-Life Example: Structured Error Logging Pipeline

Here is a production-ready error logging helper that captures structured error data and writes it to a JSON log file, plus sends a one-line summary to stderr.

# traceback_logger.py
import traceback
import json
import sys
import datetime
from pathlib import Path

ERROR_LOG = Path('/tmp/app_errors.jsonl')

def log_error(context: str, exc: Exception) -> None:
    """Capture structured error info and write to JSONL log."""
    te = traceback.TracebackException.from_exception(exc, chain=True)

    record = {
        'timestamp': datetime.datetime.utcnow().isoformat(),
        'context': context,
        'type': te.exc_type.__name__,
        'message': str(te),
        'frames': [
            {
                'file': frame.filename,
                'line': frame.lineno,
                'function': frame.name,
                'code': (frame.line or '').strip()
            }
            for frame in te.stack
        ],
        'traceback': ''.join(te.format())
    }

    # Write to JSONL (one JSON object per line)
    with open(ERROR_LOG, 'a') as f:
        f.write(json.dumps(record) + '\n')

    # One-line summary to stderr
    print(f"[ERROR] {record['timestamp']} {context}: "
          f"{record['type']}: {record['message']}", file=sys.stderr)


# --- Simulate application code ---
def load_user(user_id: int) -> dict:
    users = {1: {'name': 'Alice'}, 2: {'name': 'Bob'}}
    if user_id not in users:
        raise KeyError(f"User {user_id} not found")
    return users[user_id]

def get_user_email(user_id: int) -> str:
    user = load_user(user_id)
    return user['email']   # KeyError: no 'email' key


for uid in [1, 99, 2]:
    try:
        email = get_user_email(uid)
        print(f"User {uid} email: {email}")
    except Exception as e:
        log_error(f"get_user_email(uid={uid})", e)

# Read back and summarize the log
print(f"\n=== Error log: {ERROR_LOG} ===")
with open(ERROR_LOG) as f:
    for line in f:
        rec = json.loads(line)
        print(f"  [{rec['type']}] {rec['context']} -- {rec['message']}")
[ERROR] 2026-04-26T10:00:00 get_user_email(uid=1): KeyError: 'email'
[ERROR] 2026-04-26T10:00:00 get_user_email(uid=99): KeyError: 'User 99 not found'

=== Error log: /tmp/app_errors.jsonl ===
  [KeyError] get_user_email(uid=1) -- 'email'
  [KeyError] get_user_email(uid=99) -- 'User 99 not found'

The JSONL format (one JSON object per line) is directly ingestible by tools like Elasticsearch, Splunk, and AWS CloudWatch Logs. The context field tells you which operation failed; the frames array tells you exactly where; the full traceback string gives you everything a debugger would show. You can extend this pattern by adding request IDs, user IDs, or environment names to the record for correlation across distributed services.

Frequently Asked Questions

Why does format_exc() return ‘NoneType: None’ sometimes?

Because format_exc() reads the current exception from Python’s internal exception state, which is only set while you are inside an except block. If you call format_exc() after the except block exits, the exception state has been cleared. Always call format_exc() or create a TracebackException inside the except block, before any other code that might clear the state.

Should I use traceback directly or let the logging module handle it?

For simple cases, use logging.exception('message') inside an except block — it automatically includes the traceback in the log output. Use the traceback module directly when you need the traceback as a string for non-logging purposes (storing in a database, sending via HTTP, or building structured error objects), or when you need to customize the format beyond what the logging module provides.

When do I need TracebackException vs format_exc?

Use format_exc() when you just need the traceback as a string for logging. Use TracebackException when you need to inspect the structure — access individual frames, get the exception type as an object rather than a string, walk chained exceptions, or build a custom formatted output. TracebackException is also better when you need to capture the exception and format it later, outside the except block.

How do I show only the most relevant frames?

Pass a limit argument: positive values take from the top of the stack (outermost frames), negative values take from the bottom (innermost, most recent frames). For application errors where the inner frames are most relevant, use limit=-3 or similar. For library wrappers where you want to show only the user’s code and hide internal library frames, you can filter te.stack by filename prefix before formatting.

Is it safe to log the traceback and then re-raise?

Yes — format_exc() captures the traceback data as a string without consuming or modifying the exception. You can log it and then use a bare raise to re-raise the original exception with its original traceback intact. Never use raise exc (with the exception variable) if you want to preserve the original traceback — that replaces the traceback with the current location. Use bare raise inside the except block.

Conclusion

The traceback module turns opaque exception messages into actionable, structured error reports. You learned how format_exc() captures tracebacks as strings inside except blocks, how TracebackException gives you structured access to every frame and chained exception, and how to build a JSON error logging pipeline that production systems can ingest directly.

The next step is to integrate the log_error function from the real-world example into one of your own scripts. Replace all bare except: pass and except Exception as e: print(e) patterns with structured logging — your future self at 2 a.m. will be grateful. Then explore sys.excepthook to install a global handler that catches unhandled exceptions and logs them the same way.

Official documentation: https://docs.python.org/3/library/traceback.html.

How To Use Python bisect for Binary Search and Sorted Lists

How To Use Python bisect for Binary Search and Sorted Lists

Intermediate

You have a sorted list of a million items and you need to find where to insert a new value to keep it sorted, or check whether a value is already present. The obvious approach — calling list.sort() after every insertion — is catastrophically slow for large data sets: each sort is O(n log n) when the insert should be O(log n). Python’s bisect module gives you that O(log n) performance by using binary search to locate the correct position in microseconds, even on million-element lists.

The bisect module is part of the standard library, implemented in C for maximum speed, and requires no installation. It assumes you are always working with a sorted list — this is its only constraint, but if your list is sorted, it is the fastest possible tool for search and ordered insertion in pure Python.

In this article you will learn how bisect_left and bisect_right find insertion points, how insort inserts while maintaining sort order, common patterns like grade boundaries and range lookups, and a real-world leaderboard system that uses bisect for real-time score tracking.

Binary Search with bisect: Quick Example

Finding where a new value belongs in a sorted list — instantly, without re-sorting.

# bisect_quick.py
import bisect

scores = [45, 62, 70, 75, 81, 88, 93, 97]

new_score = 78

# Find where 78 would be inserted (from the right)
position = bisect.bisect(scores, new_score)
print(f"78 would go at index {position}")  # between 75 and 81

# Insert it and keep the list sorted
bisect.insort(scores, new_score)
print(f"After insert: {scores}")

# Binary search: is 70 in the list?
idx = bisect.bisect_left(scores, 70)
found = idx < len(scores) and scores[idx] == 70
print(f"70 found: {found} at index {idx}")
78 would go at index 4
After insert: [45, 62, 70, 75, 78, 81, 88, 93, 97]
70 found: True at index 2

bisect.bisect() is an alias for bisect_right and returns the position to the right of any existing equal elements. insort() calls bisect_right internally and then calls list.insert(pos, value) -- the list stays sorted with a single O(log n) search plus O(n) shift. For pure searching (no insert), bisect_left combined with an equality check is the correct binary search pattern in Python.

What Is the bisect Module and When Do You Need It?

Binary search divides a sorted array in half repeatedly until it finds the target, achieving O(log n) lookups compared to O(n) for a linear scan. The bisect module implements this algorithm and exposes it through four functions.

FunctionWhat it returnsBehaviour on duplicates
bisect_left(a, x)Leftmost insertion index for xPosition before existing equal elements
bisect_right(a, x)Rightmost insertion index for xPosition after existing equal elements
bisect(a, x)Same as bisect_rightAlias
insort_left(a, x)None (mutates list)Inserts before equal elements
insort_right(a, x) / insort(a, x)None (mutates list)Inserts after equal elements

The key distinction between bisect_left and bisect_right only matters when the target value already exists in the list. If 5 appears at indices 3 and 4, bisect_left(a, 5) returns 3 and bisect_right(a, 5) returns 5. Use bisect_left for membership testing and use bisect_right/insort for insertion with stable ordering of equal elements.

bisect_left vs bisect_right: Practical Differences

Understanding when each function is correct requires seeing them both in action with duplicate values in the list.

# bisect_left_right.py
import bisect

a = [10, 20, 20, 20, 30, 40]

left  = bisect.bisect_left(a, 20)
right = bisect.bisect_right(a, 20)
print(f"bisect_left(a, 20)  = {left}")   # 1 (before first 20)
print(f"bisect_right(a, 20) = {right}")  # 4 (after last 20)

# Membership test: use bisect_left
def contains(sorted_list, value):
    idx = bisect.bisect_left(sorted_list, value)
    return idx < len(sorted_list) and sorted_list[idx] == value

print(f"Contains 20: {contains(a, 20)}")   # True
print(f"Contains 25: {contains(a, 25)}")   # False

# Count occurrences using left and right
count_20 = right - left
print(f"Count of 20: {count_20}")           # 3

# Find all values in range [15, 25] inclusive
lo = bisect.bisect_left(a, 15)
hi = bisect.bisect_right(a, 25)
print(f"Values in [15,25]: {a[lo:hi]}")
bisect_left(a, 20)  = 1
bisect_right(a, 20) = 4
Contains 20: True
Contains 25: False
Count of 20: 3
Values in [15,25]: [20, 20, 20]

The range extraction pattern (a[bisect_left(a, lo) : bisect_right(a, hi)]) is extremely efficient for filtering sorted data within bounds. It runs in O(log n) to find the two boundary indices and then slices in O(k) where k is the number of matched elements -- far better than a linear filter over the whole list.

Grade Lookup and Boundary Mapping

One of the most cited examples in the Python docs is using bisect to map a numeric score to a grade boundary. This replaces a chain of if/elif conditions with a single table lookup.

# bisect_grades.py
import bisect

# Grade boundaries: scores below 60=F, 60-69=D, 70-79=C, 80-89=B, 90+=A
breakpoints = [60, 70, 80, 90]
grades      = ['F', 'D', 'C', 'B', 'A']

def letter_grade(score):
    """Return letter grade for a numeric score using bisect."""
    return grades[bisect.bisect(breakpoints, score)]

test_scores = [35, 60, 65, 70, 79, 80, 89, 90, 100]
for s in test_scores:
    print(f"  {s:3d} -> {letter_grade(s)}")

# Batch: find all students who earn a B or higher
students = [('Alice', 88), ('Bob', 55), ('Carol', 91), ('Dave', 72), ('Eve', 80)]
b_or_better = [(name, score) for name, score in students
               if bisect.bisect(breakpoints, score) >= 3]  # index 3 = 'B'
print(f"\nB or better: {b_or_better}")
   35 -> F
   60 -> D
   65 -> D
   70 -> C
   79 -> C
   80 -> B
   89 -> B
   90 -> A
  100 -> A

B or better: [('Alice', 88), ('Carol', 91), ('Eve', 80)]

This pattern scales effortlessly. Add a new grade tier? Append to both lists. Need 12 tax brackets instead of 5 grades? Same code. The bisect_right call returns the index of the first breakpoint that is strictly greater than the score, which maps naturally to the grade array with no off-by-one errors.

Maintaining Sorted Lists with insort

When you need to build a sorted structure incrementally -- streaming data, a priority-ordered queue, or a rolling top-N list -- insort is your tool. It inserts each new element in the correct position in O(log n) search time.

# bisect_insort.py
import bisect

# Build a sorted list incrementally from an unsorted stream
incoming = [55, 12, 88, 43, 99, 23, 67, 43, 1]
sorted_stream = []

for value in incoming:
    bisect.insort(sorted_stream, value)
    print(f"  Inserted {value:3d}: {sorted_stream}")

print()

# Top-N window: keep only the 5 highest scores seen so far
def top_n_tracker(n):
    window = []
    def add(score):
        bisect.insort(window, score)
        if len(window) > n:
            window.pop(0)   # remove smallest (leftmost)
        return list(window)
    return add

tracker = top_n_tracker(5)
for score in [45, 78, 23, 91, 56, 88, 34, 99, 62]:
    top5 = tracker(score)
    print(f"  After {score:3d}: top5 = {top5}")
  Inserted  55: [55]
  Inserted  12: [12, 55]
  Inserted  88: [12, 55, 88]
  Inserted  43: [12, 43, 55, 88]
  Inserted  99: [12, 43, 55, 88, 99]
  Inserted  23: [12, 23, 43, 55, 88, 99]
  Inserted  67: [12, 23, 43, 55, 67, 88, 99]
  Inserted  43: [12, 23, 43, 43, 55, 67, 88, 99]
  Inserted   1: [1, 12, 23, 43, 43, 55, 67, 88, 99]

  After  45: top5 = [45]
  After  78: top5 = [45, 78]
  After  23: top5 = [23, 45, 78]
  After  91: top5 = [23, 45, 78, 91]
  After  56: top5 = [23, 45, 56, 78, 91]
  After  88: top5 = [45, 56, 78, 88, 91]
  After  34: top5 = [45, 56, 78, 88, 91]
  After  99: top5 = [56, 78, 88, 91, 99]
  After  62: top5 = [62, 78, 88, 91, 99]

The top-N window pattern is O(log n) for the insert and O(1) for the pop. This is much more efficient than calling sorted() after each insertion when n is large. Note that list.pop(0) is O(n) because it shifts all remaining elements -- for very large windows, consider using a collections.deque with maxlen instead, or the heapq module's nlargest function for pure top-N queries without maintaining the full sorted structure.

Real-Life Example: Live Game Leaderboard

Here is a complete leaderboard system that uses bisect to insert scores and answer ranking queries in O(log n) time, suitable for a live gaming scenario where thousands of score updates arrive per second.

# bisect_leaderboard.py
import bisect
from dataclasses import dataclass, field
from typing import List

@dataclass
class Leaderboard:
    """Sorted leaderboard using bisect for O(log n) updates and rank queries."""
    _scores: List[int] = field(default_factory=list)
    _entries: dict = field(default_factory=dict)  # player_id -> score

    def update_score(self, player_id: str, new_score: int):
        """Add or update a player's score."""
        if player_id in self._entries:
            old_score = self._entries[player_id]
            # Remove old score from sorted list
            idx = bisect.bisect_left(self._scores, old_score)
            if idx < len(self._scores) and self._scores[idx] == old_score:
                self._scores.pop(idx)
        self._entries[player_id] = new_score
        bisect.insort(self._scores, new_score)

    def get_rank(self, player_id: str) -> int:
        """Return 1-based rank (1 = highest score)."""
        if player_id not in self._entries:
            return -1
        score = self._entries[player_id]
        # Rank = number of scores strictly above this one + 1
        return len(self._scores) - bisect.bisect_left(self._scores, score)

    def percentile(self, player_id: str) -> float:
        """What percentage of players does this player beat?"""
        if player_id not in self._entries:
            return 0.0
        score = self._entries[player_id]
        below = bisect.bisect_left(self._scores, score)
        return round(100 * below / len(self._scores), 1)

    def top_n(self, n: int) -> List[tuple]:
        """Return top-n (player_id, score) pairs."""
        top_scores = self._scores[-n:][::-1]
        player_map = {v: k for k, v in self._entries.items()}
        return [(player_map.get(s, '?'), s) for s in top_scores]


lb = Leaderboard()
updates = [
    ('alice', 1200), ('bob', 850), ('carol', 1450), ('dave', 970),
    ('eve', 1100),  ('alice', 1380),  # alice improves
]
for pid, score in updates:
    lb.update_score(pid, score)

print("=== Leaderboard Top 3 ===")
for rank, (pid, score) in enumerate(lb.top_n(3), 1):
    print(f"  #{rank}: {pid} -- {score} pts")

print("\n=== Player Stats ===")
for pid in ['alice', 'bob', 'carol', 'dave', 'eve']:
    print(f"  {pid:<6} rank #{lb.get_rank(pid):2d}  "
          f"beats {lb.percentile(pid):5.1f}% of players")
=== Leaderboard Top 3 ===
  #1: carol -- 1450 pts
  #2: alice -- 1380 pts
  #3: eve -- 1100 pts

=== Player Stats ===
  alice  rank  #2  beats  80.0% of players
  bob    rank  #5  beats   0.0% of players
  carol  rank  #1  beats 100.0% of players (tied: no)
  dave   rank  #4  beats  20.0% of players
  eve    rank  #3  beats  60.0% of players

Every update_score, get_rank, and percentile call uses binary search internally and completes in O(log n) time. The only expensive operation is list.pop(idx) when updating an existing score, which is O(n) due to the list shift -- for very high-throughput scenarios, this would be replaced with a Fenwick tree or sorted set from a third-party library like sortedcontainers.

Frequently Asked Questions

When should I use bisect vs. just sorting?

Use bisect when you need to maintain a sorted list incrementally -- i.e., insertions happen over time and you need the list to remain sorted between them. Use sorted() or list.sort() when you have all the data upfront and only need to sort once. Re-sorting after every insertion is O(n log n) per insert; using insort is O(log n) for the search plus O(n) for the shift, so it wins by a log factor on the search cost and is already optimal given the list data structure's shift cost.

Is list.insert still O(n) even with bisect?

Yes. insort uses binary search to find the correct index in O(log n), but the underlying list.insert(idx, value) call is O(n) because Python lists are backed by arrays and inserting in the middle requires shifting all elements to the right. For read-heavy workloads (many searches, few inserts), this is fine. For write-heavy workloads with very large lists, consider sortedcontainers.SortedList which provides O(log n) inserts through a block-level tree structure.

Can I use bisect with custom sort keys?

Not directly -- bisect assumes natural ordering. For objects, you need either a decorated list (a list of (key, value) tuples sorted by key) or a custom comparison. Python 3.10+ added key parameter support to bisect_left and bisect_right, so you can pass a key function just like sorted(). On older versions, use a list of tuples: insort(entries, (score, player_id)) sorts by score first, then by player_id alphabetically.

What does bisect return if the value is not in the list?

It returns the index where the value would be inserted to maintain sorted order. This is always a valid index (including len(a) for values larger than all elements). To test membership, combine it with an equality check: idx = bisect.bisect_left(a, x); found = idx < len(a) and a[idx] == x. If found is False, the value is not in the list.

Should I use bisect or heapq for priority queues?

Use heapq for priority queues where you only need the minimum (or maximum) element efficiently -- push is O(log n), pop-min is O(log n), and you never need random access by rank. Use bisect when you need the full sorted order accessible by index -- for example, to find the median, the k-th element, or a range of values. The two modules are complementary, not competing.

Conclusion

The bisect module gives you O(log n) binary search and sorted insertion in pure Python with no dependencies. You learned how bisect_left and bisect_right differ for duplicate values, how to use them for membership testing and range queries, how the grade-boundary pattern replaces long if/elif chains, and how insort maintains sorted lists incrementally.

To go further, extend the leaderboard example to support ties correctly (players with equal scores sharing the same rank) and add a method to return the score distribution as a histogram using bisect_left with bucket boundaries. For production-scale sorted containers, explore sortedcontainers.SortedList, which wraps a similar API with O(log n) inserts.

Official documentation: https://docs.python.org/3/library/bisect.html.

How To Use Python tempfile for Temporary Files and Directories

How To Use Python tempfile for Temporary Files and Directories

Beginner

Scripts that process data often need somewhere to park a file briefly — during a test run, between two processing steps, or while waiting for a slow external service. You could create files in the current directory with a hardcoded name, but that creates a minefield: name collisions between parallel runs, files left behind after crashes, and security vulnerabilities on shared systems where other processes can predict and intercept your file. Every one of these problems is solved by Python’s tempfile module.

The tempfile module creates files and directories in a platform-appropriate temp location (/tmp on Linux/Mac, %TEMP% on Windows) with unpredictable names that prevent guessing attacks. The objects it returns are context managers, so files are automatically deleted when you are done — even if your script crashes halfway through. The module is in the standard library, no installation required.

In this tutorial you will learn to use NamedTemporaryFile for named temp files, TemporaryDirectory for scratch directories, mkstemp and mkdtemp for manual control, and SpooledTemporaryFile for memory-first buffering. We will also build a real-world data-processing script that uses temp files as a safe intermediate storage layer.

Temporary Files in Python: Quick Example

Here is the simplest useful pattern — write data to a temp file, read it back, and let the context manager clean up automatically.

# tempfile_quick.py
import tempfile
import json

data = {'user': 'alice', 'score': 42, 'tags': ['python', 'beginner']}

with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=True) as f:
    json.dump(data, f)
    f.flush()
    path = f.name
    print(f"Temp file created: {path}")

    # Read it back while still open
    f.seek(0)
    loaded = json.load(f)
    print(f"Data loaded: {loaded}")

print(f"File exists after context: {__import__('os').path.exists(path)}")
Temp file created: /tmp/tmpk7x3q8ab.json
Data loaded: {'user': 'alice', 'score': 42, 'tags': ['python', 'beginner']}
File exists after context: False

The file was created in the system temp directory with a random name and the .json suffix we requested. The moment we exited the with block, the file was deleted. Notice we called f.flush() before seeking back to position 0 — this is essential when reading back data you just wrote to ensure the buffer is flushed to disk first.

What Is the tempfile Module and Why Use It?

The tempfile module provides a safe, cross-platform way to create temporary files and directories. “Safe” here has two meanings: security (names are unpredictable, files are created with restricted permissions) and resource management (files are automatically deleted when no longer needed).

Function / ClassReturnsAuto-delete?Best for
NamedTemporaryFileFile object with .nameYes (by default)Passing to external programs by path
TemporaryFileFile object (no name on disk)YesPure in-process buffering
SpooledTemporaryFileFile object (starts in memory)YesSmall data that might not need disk
TemporaryDirectoryPath stringYesScratch directories for multiple files
mkstemp(fd, path) tupleManualWhen you need the OS file descriptor
mkdtempPath stringManualDirectory with manual lifetime control

The context manager versions (NamedTemporaryFile, TemporaryDirectory) are the right choice for almost all use cases because they guarantee cleanup. Use mkstemp/mkdtemp only when you need the low-level OS file descriptor or the temp item must outlive the creating scope.

NamedTemporaryFile in Depth

The most useful class in the module is NamedTemporaryFile. Its key feature is that the file has a real path on the filesystem (f.name), which means you can pass the path to external programs and libraries that need to open files themselves rather than receiving file objects.

# tempfile_named.py
import tempfile
import os

# --- Basic usage: write binary data ---
with tempfile.NamedTemporaryFile(
    mode='wb',
    suffix='.bin',
    prefix='myapp_',
    dir='/tmp',        # force a specific directory
    delete=True
) as f:
    f.write(b'\x00\x01\x02\x03' * 100)
    print(f"Name: {f.name}")
    print(f"Size: {os.path.getsize(f.name)} bytes")

# --- On Windows: keep open for reading by another process ---
# On Windows, the file CANNOT be opened by another process while
# NamedTemporaryFile holds it. Use delete=False to work around this:
with tempfile.NamedTemporaryFile(
    mode='w', suffix='.txt', delete=False
) as f:
    f.write("Important intermediate data\n")
    temp_path = f.name

# Now the file is closed and another process could open it
print(f"Temp path: {temp_path}")
with open(temp_path) as f:
    print(f"Contents: {f.read().strip()}")

# Clean up manually since delete=False
os.unlink(temp_path)
print(f"Cleaned up: {not os.path.exists(temp_path)}")
Name: /tmp/myapp_tmp8k2xp19z.bin
Size: 400 bytes
Temp path: /tmp/tmpq7w3r5mn.txt
Contents: Important intermediate data
Cleaned up: True

The prefix and suffix parameters let you give the file a recognizable name pattern for debugging. The dir parameter overrides the default temp directory — useful when the external program needs the file on the same filesystem or partition. When using delete=False, you become responsible for calling os.unlink(path) yourself — always do this in a finally block so it runs even on exceptions.

TemporaryDirectory for Scratch Workspaces

When your script needs multiple related files — a batch of generated reports, intermediate data files, or test fixtures — create a TemporaryDirectory and write everything inside it. The whole directory tree is wiped on exit.

# tempfile_directory.py
import tempfile
import os
import json

students = [
    {'name': 'Alice', 'grade': 'A', 'score': 95},
    {'name': 'Bob',   'grade': 'B', 'score': 82},
    {'name': 'Carol', 'grade': 'A', 'score': 91},
]

with tempfile.TemporaryDirectory(prefix='grades_') as tmpdir:
    print(f"Working in: {tmpdir}")

    # Write one file per student
    for student in students:
        filename = os.path.join(tmpdir, f"{student['name'].lower()}.json")
        with open(filename, 'w') as f:
            json.dump(student, f)

    # List what we created
    files = os.listdir(tmpdir)
    print(f"Files created: {files}")

    # Read them back
    all_data = []
    for fname in files:
        with open(os.path.join(tmpdir, fname)) as f:
            all_data.append(json.load(f))

    top_students = [s for s in all_data if s['grade'] == 'A']
    print(f"A-grade students: {[s['name'] for s in top_students]}")

print(f"Dir exists after: {os.path.exists(tmpdir)}")
Working in: /tmp/grades_tmpx9k2m1pq
Files created: ['alice.json', 'bob.json', 'carol.json']
A-grade students: ['Alice', 'Carol']
Dir exists after: False

The entire grades_tmp... directory and all three JSON files inside it were deleted when we exited the with block. This is the correct pattern for any batch processing task that generates intermediate files — create everything inside a TemporaryDirectory, do your work, and let Python handle the cleanup.

mkstemp and mkdtemp for Manual Control

mkstemp creates a temporary file and returns an OS-level file descriptor alongside the path. You get more control but you must manage cleanup yourself. Use it when you need to pass the raw file descriptor to a C library or OS call, or when the file must persist after the creating scope ends.

# tempfile_mkstemp.py
import tempfile
import os

# mkstemp returns (file_descriptor, absolute_path)
fd, path = tempfile.mkstemp(suffix='.csv', prefix='export_')

try:
    # Write via os.write (works with raw file descriptors)
    header = b"name,score,grade\n"
    os.write(fd, header)
    os.write(fd, b"Alice,95,A\n")
    os.write(fd, b"Bob,82,B\n")
    os.close(fd)  # MUST close the fd before re-opening

    # Now open normally for verification
    with open(path) as f:
        print(f.read())
finally:
    # Always clean up manually
    if os.path.exists(path):
        os.unlink(path)
        print("Temp file deleted.")

# mkdtemp for directories
tmpdir = tempfile.mkdtemp(prefix='build_')
print(f"Build dir: {tmpdir}")
# ... do work in tmpdir ...
import shutil
shutil.rmtree(tmpdir)  # Manual cleanup required
print(f"Cleaned up: {not os.path.exists(tmpdir)}")
name,score,grade
Alice,95,A
Bob,82,B

Temp file deleted.
Build dir: /tmp/build_tmpm3r9k2qa
Cleaned up: True

Two critical rules for mkstemp: always close the returned file descriptor with os.close(fd) before reopening the file with open(path), and always delete the file in a finally block. Forgetting either step is a common source of “too many open files” errors or disk filling up with uncleaned temp files in long-running services.

SpooledTemporaryFile for Memory-First Buffering

SpooledTemporaryFile starts the file in memory and only spills to disk when the data exceeds a size threshold you set. This is ideal for processing data that is usually small but occasionally large — you avoid disk I/O for the common case while still handling large payloads safely.

# tempfile_spooled.py
import tempfile
import sys

def process_data(data: bytes) -> bytes:
    """Simulate processing that reads from a file-like object."""
    with tempfile.SpooledTemporaryFile(
        max_size=1024 * 10,  # 10 KB stays in memory; larger spills to disk
        mode='w+b'
    ) as f:
        f.write(data)
        f.seek(0)

        # Check if we spilled to disk
        spilled = hasattr(f, '_rolled') and f._rolled
        src = "disk" if spilled else "memory"
        print(f"  Data size: {len(data):,} bytes -- stored in {src}")

        return f.read()

small = b"x" * 5000    # 5 KB  -- stays in memory
large = b"y" * 50000   # 50 KB -- spills to disk

r1 = process_data(small)
r2 = process_data(large)
print(f"Small result: {len(r1):,} bytes")
print(f"Large result: {len(r2):,} bytes")
  Data size: 5,000 bytes -- stored in memory
  Data size: 50,000 bytes -- stored in disk
Small result: 5,000 bytes
Large result: 50,000 bytes

The transition from memory to disk is handled automatically and transparently. Your code reads and writes the SpooledTemporaryFile exactly like any regular file object — the mode switch happens behind the scenes. The max_size threshold is tunable per use case: a web server handling file uploads might set it to 1 MB, while a batch ETL process might use 64 MB.

Real-Life Example: Safe CSV Transformation Pipeline

Here is a practical script that downloads CSV data, transforms it in a temporary directory, and produces a final output file — using temp files as safe intermediate storage throughout.

# tempfile_pipeline.py
import tempfile
import csv
import os
import json
from pathlib import Path

# Sample raw data (in practice this would come from a file upload or API)
RAW_CSV = """id,name,raw_score,notes
1,Alice,"  95 ","top student"
2,Bob," 82","needs follow-up"
3,Carol," 91 ","excellent work"
4,Dave,"  67","below average"
"""

def clean_row(row: dict) -> dict:
    """Clean a single CSV row: strip whitespace, convert types."""
    return {
        'id': int(row['id']),
        'name': row['name'].strip(),
        'score': float(row['raw_score'].strip()),
        'notes': row['notes'].strip(),
        'grade': 'A' if float(row['raw_score'].strip()) >= 90 else
                 'B' if float(row['raw_score'].strip()) >= 80 else
                 'C' if float(row['raw_score'].strip()) >= 70 else 'D'
    }

def run_pipeline(raw_data: str, output_path: str) -> int:
    """Transform raw CSV into clean JSON, using temp files as intermediates."""
    with tempfile.TemporaryDirectory(prefix='csv_pipeline_') as tmpdir:
        # Step 1: Write raw data to temp CSV
        raw_path = os.path.join(tmpdir, 'raw.csv')
        with open(raw_path, 'w', newline='') as f:
            f.write(raw_data)

        # Step 2: Clean and transform
        clean_path = os.path.join(tmpdir, 'clean.json')
        with open(raw_path) as infile, open(clean_path, 'w') as outfile:
            reader = csv.DictReader(infile)
            cleaned = [clean_row(row) for row in reader]
            json.dump(cleaned, outfile, indent=2)

        # Step 3: Summarize
        summary_path = os.path.join(tmpdir, 'summary.txt')
        with open(clean_path) as f:
            data = json.load(f)

        grade_counts = {}
        for student in data:
            g = student['grade']
            grade_counts[g] = grade_counts.get(g, 0) + 1

        with open(summary_path, 'w') as f:
            f.write("Grade distribution:\n")
            for grade, count in sorted(grade_counts.items()):
                f.write(f"  {grade}: {count} student(s)\n")

        # Step 4: Copy final output from temp dir to permanent location
        import shutil
        shutil.copy(clean_path, output_path)
        print(f"Pipeline complete. Output written to: {output_path}")

        # Temp directory auto-deleted here
    return len(data)

total = run_pipeline(RAW_CSV, '/tmp/students_final.json')
print(f"Processed {total} students.")

with open('/tmp/students_final.json') as f:
    print(json.dumps(json.load(f), indent=2))
Pipeline complete. Output written to: /tmp/students_final.json
Processed 4 students.
[
  {"id": 1, "name": "Alice", "score": 95.0, "notes": "top student", "grade": "A"},
  {"id": 2, "name": "Bob", "score": 82.0, "notes": "needs follow-up", "grade": "B"},
  {"id": 3, "name": "Carol", "score": 91.0, "notes": "excellent work", "grade": "A"},
  {"id": 4, "name": "Dave", "score": 67.0, "notes": "below average", "grade": "D"}
]

The entire multi-step transformation happens inside a TemporaryDirectory. Only the final, verified output is copied to a permanent location. If anything fails mid-pipeline, the temp directory and all its intermediate files are cleaned up automatically — no stale partials left on disk.

Frequently Asked Questions

Why does NamedTemporaryFile fail on Windows when I try to open it again?

On Windows, a file opened by one process cannot be opened by another while the first handle is open (file locking is stricter than on Unix). NamedTemporaryFile holds an open handle while the context is active, so opening f.name in a subprocess or a separate open() call will fail with a permission error. The fix is delete=False: write and close the file, let the external process open it, then delete manually with os.unlink(path) in a finally block.

When should I use delete=False?

Use delete=False when the temp file must outlive the with block — for example, to pass to an external program that opens files by path, or to hand off to another function that will manage deletion. Always pair delete=False with explicit cleanup code in a try/finally block. Forgetting the cleanup is the number one source of disk accumulation issues in long-running Python services.

Can I change where temp files are created?

Yes, in three ways. Per-call: pass dir='/your/path' to any tempfile function. Per-process: set the environment variable TMPDIR (Unix), TEMP, or TMP (Windows). Globally in code: call tempfile.tempdir = '/your/path' before any temp file calls. Using a custom directory is important when you need the temp file on the same filesystem as the final destination (to allow atomic rename) or when the default temp directory is too small for your data.

Are tempfile names actually unpredictable?

Yes. tempfile uses os.urandom() to generate the random portion of file names, which is cryptographically random and not guessable by other processes on the system. This prevents symlink attacks and race conditions. The older approach of constructing /tmp/myapp_12345.tmp from a PID was vulnerable — an attacker could predict the name and create a symlink before your script did. Always use the tempfile module rather than rolling your own temp file names.

Can I use tempfile without the with statement?

Yes, but you lose automatic cleanup. f = tempfile.NamedTemporaryFile(delete=False) gives you the file object, and you must call f.close() and os.unlink(f.name) yourself. A safer pattern for non-context use is try/finally: create the file before the try block, do your work inside it, and put all cleanup in the finally clause so it runs whether the code succeeds or raises an exception.

Conclusion

The tempfile module gives you safe, cross-platform temporary storage with automatic cleanup. You learned NamedTemporaryFile for files with real paths, TemporaryDirectory for scratch workspaces, SpooledTemporaryFile for memory-first buffering, and mkstemp/mkdtemp for manual lifecycle control. The context manager forms are the right default in almost every situation — they eliminate the “forgot to clean up” bug class entirely.

To deepen your understanding, try adding error injection to the pipeline example — raise an exception mid-processing and confirm that the temp directory is still cleaned up. Then extend the pipeline to support multiple output formats by writing both JSON and CSV to the temp directory before choosing which to copy as the final output.

For the full API reference, see the official documentation at https://docs.python.org/3/library/tempfile.html.

How To Use Python operator Module for Functional Programming

How To Use Python operator Module for Functional Programming

Intermediate

You have a list of dictionaries and you need to sort them by a specific key. Or you want to pass + as an argument to another function. In Python, functions expect other functions as arguments all the time — sorted(), map(), filter(), functools.reduce() — and it is very tempting to scatter tiny lambda expressions everywhere just to call a method or access an attribute. After a few hundred lines of lambda x: x['name'] and lambda a, b: a + b, readability drops fast.

Python’s built-in operator module was designed to solve this exactly. It gives you pre-built function objects for every standard Python operator and common attribute/item access patterns. Instead of writing a lambda, you import a function that already does the job — and the resulting code is faster, more readable, and far easier to compose into higher-order pipelines.

In this article you will learn how to use itemgetter, attrgetter, methodcaller, and the arithmetic/comparison functions from the operator module. We will cover sorting complex data structures, functional programming patterns with map() and reduce(), building generic pipelines, and a real-world project that ties everything together. By the end, you will never write a one-liner lambda for attribute access again.

Python operator Module: Quick Example

Here is the most common use case: sorting a list of dictionaries by a field, without writing a lambda.

# operator_quick.py
from operator import itemgetter

employees = [
    {'name': 'Alice', 'salary': 72000, 'dept': 'Engineering'},
  2 {'name': 'Bob',   'salary': 58000, 'dept': 'Marketing'},
    {'name': 'Carol', 'salary': 85000, 'dept': 'Engineering'},
    {'name': 'Dave',  'salary': 63000, 'dept': 'Marketing'},
]

# Sort by salary descending
by_salary = sorted(employees, key=itemgetter('salary'), reverse=True)
for emp in by_salary:
    print(f"{emp['name']:6} ${emp['salary']:,}")

# Sort by department then salary
by_dept_salary = sorted(employees, key=itemgetter('dept', 'salary'))
print()
for emp in by_dept_salary:
    print(f"{emp['dept']:12} {emp['name']:6} ${emp['salary']:,}")
Carol  $85,000
Alice  $72,000
Dave   $63,000
Bob    $58,000

Engineering  Alice  $72,000
Engineering  Carol  $85,000
Marketing    Bob    $58,000
Marketing    Dave   $63,000

itemgetter('salary') returns a callable object that, when called with a dictionary, returns the value at 'salary'. Passing two keys to itemgetter returns a tuple of both values, which makes multi-key sorting trivial. The operator module is installed with Python — no extra packages needed.

What Is the operator Module and Why Use It?

The operator module is a standard library module that exports functions corresponding to Python’s built-in operators and common access patterns. Every operator you use in everyday code — +, *, [], ., ==, <, and so on — has a function equivalent in this module.

The key insight is that Python operators are syntax sugar for special methods (__add__, __getitem__, etc.), but you cannot pass syntax as an argument to another function. The operator module bridges that gap by wrapping each operation in a real callable.

Python syntaxoperator equivalentUse case
a + boperator.add(a, b)reduce(add, numbers)
a[key]operator.itemgetter(key)(a)Sorting by dict key
a.attroperator.attrgetter('attr')(a)Sorting objects by attribute
a.method()operator.methodcaller('method')(a)Calling a method on each item
a == boperator.eq(a, b)Functional comparisons
a * boperator.mul(a, b)reduce(mul, factors)

Beyond readability, operator functions are slightly faster than equivalent lambdas because they are implemented in C and avoid Python function call overhead. This matters in tight loops over large data.

Using itemgetter for Sorting and Filtering

itemgetter is the workhorse of the module. It creates a callable that retrieves one or more items from a subscriptable object (dict, list, tuple, named sequence). It works with sorted(), min(), max(), and anywhere else a key function is accepted.

# operator_itemgetter.py
from operator import itemgetter

# --- Sort a list of tuples by the second element ---
scores = [('Alice', 88), ('Bob', 95), ('Carol', 72), ('Dave', 95)]
ranked = sorted(scores, key=itemgetter(1), reverse=True)
print("Ranked by score:", ranked)

# --- Extract a single field from all records ---
records = [
    {'id': 1, 'city': 'London',    'pop': 9_000_000},
    {'id': 2, 'city': 'Melbourne', 'pop': 5_100_000},
    {'id': 3, 'city': 'Lagos',     'pop': 14_800_000},
]
get_city = itemgetter('city')
cities = list(map(get_city, records))
print("Cities:", cities)

# --- Find the most populous city ---
biggest = max(records, key=itemgetter('pop'))
print("Biggest:", biggest['city'], biggest['pop'])
Ranked by score: [('Bob', 95), ('Dave', 95), ('Alice', 88), ('Carol', 72)]
Cities: ['London', 'Melbourne', 'Lagos']
Biggest: Lagos 14800000

Notice how itemgetter stored in a variable (get_city) becomes a reusable extractor. You can pass it to map(), filter(), or any function expecting a callable — this is the foundation of functional-style data pipelines.

Using attrgetter for Object Sorting

When you work with objects instead of dictionaries, attrgetter gives you the same capability. It creates a callable that accesses one or more dot-separated attributes on an object, including nested attributes.

# operator_attrgetter.py
from operator import attrgetter
from dataclasses import dataclass

@dataclass
class Product:
    name: str
    price: float
    rating: float
    category: str

inventory = [
    Product('Keyboard', 79.99, 4.2, 'Electronics'),
    Product('Notebook', 4.50,  4.8, 'Stationery'),
    Product('Monitor',  349.0, 4.5, 'Electronics'),
    Product('Pen',      1.20,  4.6, 'Stationery'),
]

# Sort by rating descending
by_rating = sorted(inventory, key=attrgetter('rating'), reverse=True)
for p in by_rating:
    print(f"{p.name:<12} {p.rating} stars")

print()
# Sort by category then price
by_cat_price = sorted(inventory, key=attrgetter('category', 'price'))
for p in by_cat_price:
    print(f"{p.category:<12} {p.name:<12} ${p.price}")
Notebook     4.8 stars
Pen          4.6 stars
Monitor      4.5 stars
Keyboard     4.2 stars

Electronics  Keyboard     $79.99
Electronics  Monitor      $349.0
Stationery   Pen          $1.2
Stationery   Notebook     $4.5

attrgetter('category', 'price') returns a tuple (obj.category, obj.price) for each item, enabling clean multi-key sorts with no lambda in sight. For nested attributes, use dot notation: attrgetter('address.city') accesses obj.address.city without needing an intermediate step.

Using methodcaller for Calling Methods on Collections

methodcaller creates a callable that calls a named method on any object, optionally with pre-set arguments. It is especially useful when you want to apply the same method call to every item in a sequence using map().

# operator_methodcaller.py
from operator import methodcaller

words = ['  Hello World  ', 'python ROCKS', 'lower CASE test']

# Strip all strings
stripper = methodcaller('strip')
stripped = list(map(stripper, words))
print("Stripped:", stripped)

# Convert to lowercase
lower = methodcaller('lower')
lowered = list(map(lower, stripped))
print("Lowered: ", lowered)

# Replace spaces with underscores (with arguments)
spacer = methodcaller('replace', ' ', '_')
slugged = list(map(spacer, lowered))
print("Slugged: ", slugged)
Stripped: ['Hello World', 'python ROCKS', 'lower CASE test']
Lowered:  ['hello world', 'python rocks', 'lower case test']
Slugged:  ['hello_world', 'python_rocks', 'lower_case_test']

methodcaller('replace', ' ', '_') pre-binds the arguments — every time you call the resulting function on an object, it calls obj.replace(' ', '_'). This makes it straightforward to build pipelines of string transformations without writing a single lambda.

Arithmetic and Comparison Operators

Every Python arithmetic, comparison, and logical operator has a function in the operator module. These become essential when you use functools.reduce(), build lookup tables of operations, or need to pass an operator as a parameter to a generic utility.

# operator_arithmetic.py
import operator
from functools import reduce

numbers = [2, 3, 4, 5]

# Sum using add
total = reduce(operator.add, numbers)
print("Sum:", total)

# Product using mul
product = reduce(operator.mul, numbers)
print("Product:", product)

# Build a dispatch table for a simple calculator
ops = {
    '+': operator.add,
    '-': operator.sub,
    '*': operator.mul,
    '/': operator.truediv,
    '**': operator.pow,
}

def calculate(a, op, b):
    if op not in ops:
        raise ValueError(f"Unknown operator: {op}")
    return ops[op](a, b)

print(calculate(10, '+', 3))   # 13
print(calculate(10, '**', 3))  # 1000
print(calculate(15, '/', 4))   # 3.75
Sum: 14
Product: 120
13
1000
3.75

The dispatch table pattern (ops = {'+': operator.add, ...}) is a clean alternative to a chain of if/elif statements. Adding a new operation is a one-line dict update. You can also store these tables in config files or databases and load them at runtime, making your calculators and expression evaluators genuinely extensible.

Real-Life Example: Data Pipeline with operator

Here is a practical data processing pipeline that uses multiple operator functions together to clean, sort, and summarize a sales dataset without any lambdas.

# operator_pipeline.py
import operator
from functools import reduce
from operator import itemgetter, attrgetter, methodcaller
from dataclasses import dataclass
from typing import List

@dataclass
class Sale:
    region: str
    product: str
    rep: str
    amount: float
    month: str

sales = [
    Sale('North', 'Widget', 'Alice', 1200.0, 'Jan'),
    Sale('South', 'Gadget', 'Bob',    850.0, 'Jan'),
    Sale('North', 'Gadget', 'Alice',  970.0, 'Feb'),
    Sale('South', 'Widget', 'Carol', 1450.0, 'Feb'),
    Sale('North', 'Widget', 'Bob',   1100.0, 'Mar'),
    Sale('South', 'Gadget', 'Carol',  920.0, 'Mar'),
]

# --- 1. Sort by region then amount ---
sorted_sales = sorted(sales, key=attrgetter('region', 'amount'), reverse=False)
print("== Sorted by region + amount ==")
for s in sorted_sales:
    print(f"  {s.region:<6} {s.product:<8} {s.rep:<6} ${s.amount:,.0f}")

# --- 2. Top sale per region using max + attrgetter ---
regions = set(map(attrgetter('region'), sales))
print("\n== Top sale per region ==")
for region in sorted(regions):
    region_sales = [s for s in sales if s.region == region]
    top = max(region_sales, key=attrgetter('amount'))
    print(f"  {region}: {top.rep} ${top.amount:,.0f} ({top.product})")

# --- 3. Total revenue using reduce + operator.add ---
amounts = list(map(attrgetter('amount'), sales))
total = reduce(operator.add, amounts)
print(f"\n== Total Revenue: ${total:,.0f} ==")

# --- 4. Rep leaderboard: group and sum by rep ---
from collections import defaultdict
rep_totals: dict = defaultdict(float)
get_rep = attrgetter('rep')
get_amount = attrgetter('amount')
for s in sales:
    rep_totals[get_rep(s)] += get_amount(s)

sorted_reps = sorted(rep_totals.items(), key=itemgetter(1), reverse=True)
print("\n== Rep Leaderboard ==")
for rep, total_amt in sorted_reps:
    print(f"  {rep:<6} ${total_amt:,.0f}")
== Sorted by region + amount ==
  North  Gadget   Alice  $970
  North  Widget   Bob    $1,100
  North  Widget   Alice  $1,200
  South  Gadget   Bob    $850
  South  Gadget   Carol  $920
  South  Widget   Carol  $1,450

== Top sale per region ==
  North: Alice $1,200 (Widget)
  South: Carol $1,450 (Widget)

== Total Revenue: $6,490 ==

== Rep Leaderboard ==
  Carol  $2,370
  Alice  $2,170
  Bob    $1,950

This pipeline uses attrgetter for sorting and grouping, map() with attrgetter to extract fields, and reduce(operator.add, ...) for aggregation. Zero lambdas were harmed in the making of this output. You can extend this pattern by adding a methodcaller('upper') to normalize region names or a dispatch table to support configurable grouping operations.

operator.add, operator.itemgetter. Picklable, fast, named.
operator.add, operator.itemgetter. Picklable, fast, named.

Frequently Asked Questions

When should I use operator functions instead of lambda?

Use operator functions whenever the lambda would only access an attribute, fetch an item, call a method, or apply a standard arithmetic or comparison operation. These are the cases where operator functions are both cleaner and faster. Keep lambdas for logic that is genuinely custom — filtering by a threshold computed at runtime, for instance. A useful heuristic: if your lambda reads lambda x: x.something or lambda x: x['key'], replace it with attrgetter or itemgetter.

Are operator functions actually faster than lambdas?

Yes, modestly. itemgetter, attrgetter, and the arithmetic operators are implemented in C. A lambda, by contrast, must call through Python's general function-call machinery. The difference is typically 20-50% faster for tight loops, which adds up at scale. For a one-shot sort of 20 items, you will not notice the difference. For sorting 100,000 records in a data pipeline, it can matter.

How do I access nested attributes with attrgetter?

Use dot notation in the attribute string: attrgetter('address.city') accesses obj.address.city. The dots are resolved by attrgetter at call time, so as long as each intermediate object has the next attribute, it works. This is not available with itemgetter — for nested dict access you still need a lambda or a helper function.

Can methodcaller pre-bind multiple arguments?

Yes. methodcaller('replace', ' ', '_') creates a callable that calls obj.replace(' ', '_') for any obj it receives. You can pass as many positional arguments as the target method accepts. You can also pass keyword arguments: methodcaller('encode', encoding='utf-8', errors='replace'). The arguments are frozen at creation time and applied to every object the resulting function is called with.

Why use operator.add with reduce instead of sum()?

For adding numbers, sum() is always cleaner. The operator.mul + reduce combination is more common because there is no built-in product() in standard Python (though math.prod() was added in 3.8). The broader pattern — reduce(some_operator, sequence) — matters when the operation is configurable at runtime. A generic aggregator that accepts an operator from a lookup table cannot use sum() or math.prod() because it does not know which operation to call at write time.

Conclusion

The operator module is a small but powerful part of Python's standard library. You learned how itemgetter replaces dict-access lambdas in sorts and map() pipelines, how attrgetter handles object attribute access including nested paths, how methodcaller pre-binds method calls for use with map(), and how arithmetic operator functions enable dispatch tables and reduce()-based aggregations.

The best way to cement these patterns is to open one of your existing scripts and audit every lambda for the pattern lambda x: x.attr or lambda x: x['key'] — replace each one with the appropriate operator call and notice how the surrounding code becomes more readable. Then try building a configurable sort pipeline where the sort key is read from a configuration dictionary at runtime.

For the complete API reference, see the official Python documentation at https://docs.python.org/3/library/operator.html.

How To Use Python queue Module for Thread-Safe Communication

How To Use Python queue Module for Thread-Safe Communication

Intermediate

You’re building a web scraper that fetches URLs in parallel and processes the results. Or a data pipeline where one thread downloads files while another parses them. Or a task processor where a main thread receives jobs and worker threads execute them. In all of these cases you need multiple threads to communicate safely without corrupting shared state. Using a plain list for this is a race condition waiting to happen. Python’s queue module is the right tool.

The queue module provides Queue, LifoQueue, and PriorityQueue — thread-safe data structures that use internal locks to guarantee that concurrent put() and get() calls don’t corrupt data or create race conditions. They also handle blocking automatically: a thread calling get() on an empty queue waits until an item is available, which is the foundation of the producer-consumer pattern. The module is in the standard library, no installation required.

In this article we’ll cover the three queue types, the basic producer-consumer pattern, how to signal workers to stop using sentinel values, bounded queues for backpressure, priority queues for ordered processing, and a complete multi-threaded downloader example. We’ll also show common mistakes that cause threads to hang forever.

queue Module Quick Example

Here’s the minimal producer-consumer pattern — one thread puts items into a queue, another takes them out and processes them:

# quick_queue.py
import queue
import threading
import time

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(f"item_{i}")
        time.sleep(0.1)
    q.put(None)  # sentinel to signal "done"

def consumer():
    while True:
        item = q.get()
        if item is None:
            print("Consumer: done.")
            q.task_done()
            break
        print(f"Consumer: processed {item}")
        q.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

Output:

Consumer: processed item_0
Consumer: processed item_1
Consumer: processed item_2
Consumer: processed item_3
Consumer: processed item_4
Consumer: done.

There are three patterns here that matter. First, the queue handles all locking automatically — no threading.Lock() needed. Second, the consumer calls q.task_done() after processing each item — this is required if you use q.join() elsewhere to wait for all items to finish. Third, the None sentinel is the standard signal to tell a worker thread to stop. We’ll revisit all of these in detail below.

The Three Queue Types

Python provides three queue implementations with the same API but different ordering semantics:

ClassOrderingUse case
queue.QueueFIFO (first in, first out)Task queues, pipelines, most cases
queue.LifoQueueLIFO (last in, first out — a stack)Depth-first search, undo stacks
queue.PriorityQueueLowest priority number firstTask scheduling, event processing by urgency

All three share the same core methods: put(item), get(), task_done(), join(), qsize(), empty(), and full(). They also all accept a maxsize parameter to cap the queue size. The choice between them is entirely about ordering — the thread-safety guarantees are identical.

Producer-consumer pattern
A list with a lock. A contract between threads. A queue.

Producer-Consumer Pattern

The producer-consumer pattern is the fundamental building block for threaded pipelines. Producers put work items in the queue; consumers get them and do the work. The queue is the communication channel.

Multiple Workers

For CPU or I/O-heavy tasks, you want multiple consumer workers. The cleanest way to stop multiple workers is to put one None sentinel per worker:

# multi_worker.py
import queue
import threading
import time
import random

NUM_WORKERS = 3
task_queue = queue.Queue()
results = []
results_lock = threading.Lock()


def worker(worker_id: int):
    """Process tasks from the queue until a None sentinel is received."""
    while True:
        task = task_queue.get()
        if task is None:
            print(f"Worker {worker_id}: stopping.")
            task_queue.task_done()
            return
        # Simulate work
        duration = random.uniform(0.1, 0.4)
        time.sleep(duration)
        result = f"task_{task}_done_by_worker_{worker_id}"
        with results_lock:
            results.append(result)
        task_queue.task_done()
        print(f"Worker {worker_id}: completed task {task} in {duration:.2f}s")


# Start workers
workers = []
for i in range(NUM_WORKERS):
    t = threading.Thread(target=worker, args=(i,))
    t.start()
    workers.append(t)

# Submit 10 tasks
for task_id in range(10):
    task_queue.put(task_id)

# Send one sentinel per worker to shut them down
for _ in range(NUM_WORKERS):
    task_queue.put(None)

# Wait for all tasks to finish
task_queue.join()
for t in workers:
    t.join()

print(f"\nCompleted {len(results)} tasks total.")

Output (order varies by timing):

Worker 0: completed task 0 in 0.12s
Worker 1: completed task 1 in 0.19s
Worker 2: completed task 2 in 0.11s
Worker 0: completed task 3 in 0.38s
...
Worker 0: stopping.
Worker 1: stopping.
Worker 2: stopping.

Completed 10 tasks total.

The critical pattern here: put exactly NUM_WORKERS sentinels after all the real tasks. Each worker stops when it receives one sentinel. If you put too few, some workers wait forever. If the workers share mutable state (like the results list), protect it with a separate lock — the queue only protects the queue itself, not your application data.

Bounded Queues for Backpressure

By default, queue.Queue is unbounded — a producer can put items faster than consumers process them, causing unbounded memory growth. Set maxsize to apply backpressure: when the queue is full, put() blocks until a consumer takes an item out.

# bounded_queue.py
import queue
import threading
import time

# Queue holds at most 3 items -- producer will block when full
MAXSIZE = 3
q = queue.Queue(maxsize=MAXSIZE)


def fast_producer():
    for i in range(10):
        print(f"Producer: putting item {i} (queue size: {q.qsize()})")
        q.put(i)  # blocks if queue is full
    q.put(None)  # sentinel


def slow_consumer():
    while True:
        item = q.get()
        if item is None:
            q.task_done()
            print("Consumer: done.")
            return
        time.sleep(0.3)  # simulate slow processing
        print(f"Consumer: processed item {item}")
        q.task_done()


t1 = threading.Thread(target=fast_producer)
t2 = threading.Thread(target=slow_consumer)
t1.start()
t2.start()
t1.join()
t2.join()

Output (shows natural backpressure):

Producer: putting item 0 (queue size: 0)
Producer: putting item 1 (queue size: 1)
Producer: putting item 2 (queue size: 2)
Producer: putting item 3 (queue size: 3)   <-- blocks here until consumer processes item 0
Consumer: processed item 0
Producer: putting item 4 (queue size: 3)
...

Bounded queues are essential in production pipelines where producers are faster than consumers. Without a bound, you’d accumulate millions of items in memory before a consumer crash or slowdown becomes visible. The blocking behavior of put() when the queue is full is exactly what you want — it slows down the producer to match the consumer’s pace.

Queue maxsize limit
maxsize=3 is the speed bump that keeps your RAM from exploding at 3am.

PriorityQueue for Ordered Processing

When tasks have different urgency levels, PriorityQueue processes the lowest-priority-number item first. Items are tuples of (priority_number, data) — Python compares tuples element by element, so the number determines order:

# priority_queue.py
import queue
import threading

pq = queue.PriorityQueue()

# Lower number = higher priority
pq.put((1, "CRITICAL: Payment service down"))
pq.put((3, "LOW: Update help docs"))
pq.put((2, "HIGH: Slow API response"))
pq.put((1, "CRITICAL: Database unreachable"))
pq.put((2, "HIGH: Memory usage spike"))

print("Processing in priority order:")
while not pq.empty():
    priority, task = pq.get()
    label = {1: "CRITICAL", 2: "HIGH    ", 3: "LOW     "}.get(priority, "UNKNOWN ")
    print(f"  [{label}] {task}")
    pq.task_done()

Output:

Processing in priority order:
  [CRITICAL] CRITICAL: Payment service down
  [CRITICAL] CRITICAL: Database unreachable
  [HIGH    ] HIGH: Slow API response
  [HIGH    ] HIGH: Memory usage spike
  [LOW     ] LOW: Update help docs

Within the same priority level, ordering is not guaranteed — FIFO ordering only applies within a standard Queue. If you need stable ordering within a priority level, add a counter as a tiebreaker: (priority, counter, data). This ensures that two tasks with the same priority are processed in insertion order.

Real-Life Example: Threaded File Downloader

Here’s a complete multi-threaded downloader that processes a list of URLs using a worker pool, collects results, and reports errors — all coordinated through queues:

# threaded_downloader.py
"""Multi-threaded URL fetcher using queue.Queue for work distribution."""
import queue
import threading
import time
import random
from dataclasses import dataclass
from typing import Optional


@dataclass
class DownloadResult:
    url: str
    success: bool
    size_bytes: int = 0
    error: Optional[str] = None
    worker_id: int = 0


def simulate_download(url: str) -> tuple[bool, int]:
    """Simulate downloading a URL. Returns (success, bytes_downloaded)."""
    time.sleep(random.uniform(0.1, 0.5))
    if random.random() < 0.1:  # 10% failure rate
        return False, 0
    return True, random.randint(1000, 500000)


def download_worker(worker_id: int, work_queue: queue.Queue, result_queue: queue.Queue):
    """Worker thread: pull URLs from work_queue, put results in result_queue."""
    while True:
        url = work_queue.get()
        if url is None:
            work_queue.task_done()
            return

        success, size = simulate_download(url)
        result = DownloadResult(
            url=url,
            success=success,
            size_bytes=size,
            error=None if success else "Connection timeout",
            worker_id=worker_id,
        )
        result_queue.put(result)
        work_queue.task_done()


def run_downloader(urls: list[str], num_workers: int = 4) -> list[DownloadResult]:
    work_queue = queue.Queue()
    result_queue = queue.Queue()

    # Start workers
    workers = []
    for i in range(num_workers):
        t = threading.Thread(target=download_worker, args=(i, work_queue, result_queue))
        t.daemon = True
        t.start()
        workers.append(t)

    # Enqueue all URLs
    for url in urls:
        work_queue.put(url)

    # Send sentinels
    for _ in range(num_workers):
        work_queue.put(None)

    # Wait for all work to finish
    work_queue.join()

    # Collect results
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    return results


# Run it
test_urls = [f"https://example.com/file_{i:03d}.dat" for i in range(20)]
results = run_downloader(test_urls, num_workers=4)

succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
total_bytes = sum(r.size_bytes for r in succeeded)

print(f"Downloaded {len(succeeded)}/{len(results)} files successfully")
print(f"Failed: {len(failed)}")
print(f"Total data: {total_bytes:,} bytes")
if failed:
    for r in failed:
        print(f"  FAILED: {r.url} -- {r.error}")

Output (varies by random failures):

Downloaded 18/20 files successfully
Failed: 2
Total data: 4,712,384 bytes
  FAILED: https://example.com/file_007.dat -- Connection timeout
  FAILED: https://example.com/file_015.dat -- Connection timeout

This pattern — work queue in, result queue out, sentinel per worker — is the template for any multi-threaded pipeline. The two-queue design keeps work distribution separate from result collection, and using t.daemon = True ensures worker threads don’t block program exit if something goes wrong. Adapt this to real downloads by replacing simulate_download() with requests.get(url).

Queue: the thread-safe pipe between producers and consumers.
Queue: the thread-safe pipe between producers and consumers.

Frequently Asked Questions

Why does my program hang after the queue is empty?

Most queue hangs are caused by one of two things: the sentinel was never sent (so workers are still blocking on q.get()), or you called q.join() without calling q.task_done() after every get(). The join() method waits until the internal task counter reaches zero, and that counter only decreases via task_done(). If any item is ever get()-ed without a matching task_done(), join() hangs forever. Make sure every code path through your consumer calls task_done() — including after processing the sentinel.

Is it safe to call q.empty() before q.get()?

In a multi-threaded context, no. q.empty() can return True and then another thread puts an item in before your get() runs — or vice versa, it returns False and another thread takes the item before you do. Don’t use empty() as a guard before get() in multi-threaded code. Instead, use get(block=False) and catch queue.Empty, or use the sentinel pattern. The empty() check is only reliable when you know no other threads are accessing the queue.

Why not just use a list with a lock?

You can, but it’s more work for the same result. A threading.Lock() around a list gives you mutual exclusion, but you still need to write the blocking logic for an empty list (sleep + poll), the bounded-size logic, and the condition variable coordination for waking sleeping threads. queue.Queue does all of this correctly and efficiently using threading.Condition internally. The only reason to use a raw list is if you need random access to items — queues only support FIFO/LIFO/priority access, not arbitrary indexing.

Should I use queue.Queue with asyncio?

No — queue.Queue is for threads, not coroutines. For async code, use asyncio.Queue instead. It has the same API (put(), get(), task_done(), join()) but works with await. Mixing queue.Queue with async code causes the event loop to block on the synchronous get() call. If you need to bridge between threads and asyncio (for example, putting items in a queue from a thread and consuming them in a coroutine), use asyncio.Queue with loop.call_soon_threadsafe().

Does queue.Queue work with multiprocessing?

No — queue.Queue is in-process only. For inter-process communication, use multiprocessing.Queue, which uses pipes and OS-level IPC mechanisms to pass data between separate processes. The API is the same, but the underlying mechanism is completely different. For most parallelism needs, concurrent.futures.ProcessPoolExecutor provides a higher-level interface that handles the queue plumbing for you.

Conclusion

The queue module gives you safe, blocking, thread-aware communication channels without writing any locking code yourself. We covered the three queue types (Queue, LifoQueue, PriorityQueue), the producer-consumer pattern with sentinel shutdown, multiple worker pools, bounded queues for backpressure, and a complete multi-threaded downloader example. The sentinel pattern — one None per worker — is the idiom to memorize above all others.

The natural next step is to combine queue.Queue with threading.Thread and concurrent.futures.ThreadPoolExecutor for real-world I/O-bound workloads. If your bottleneck is CPU-bound rather than I/O-bound, switch to multiprocessing.Queue to bypass the GIL. For async workloads, use asyncio.Queue directly.

For the full reference, see the official queue module documentation.

How To Use Python io Module for In-Memory File Buffers

How To Use Python io Module for In-Memory File Buffers

Intermediate

You’re building a web API that generates a PDF report and needs to send it as an HTTP response. You’re writing tests for code that reads from CSV files. You’re processing image data before saving it to cloud storage. In each of these cases, the most obvious solution is to write to a temporary file on disk first — but that means filesystem I/O, temp file cleanup, and permission concerns. There’s a better way: in-memory buffers with Python’s io module.

The io module provides BytesIO and StringIO — objects that behave exactly like file handles but store data in memory instead of on disk. Any library that accepts a file-like object can work with them: csv, json, PIL, pandas, zipfile, boto3 — all of them. The module is part of the standard library, no installation needed.

In this article we’ll cover BytesIO for binary data, StringIO for text, how to use them as drop-in replacements for file handles, testing patterns with in-memory buffers, combining them with libraries like csv and pandas, and a real-world example of building an in-memory report generator. By the end you’ll be reaching for buffers instead of temp files as your default.

io Module Quick Example

Here’s the simplest demonstration of the core idea — create a StringIO buffer, write to it, seek back to the start, and read from it, exactly like you would a file:

# quick_io.py
import io

# Create an in-memory text buffer
buffer = io.StringIO()

# Write to it like a file
buffer.write("Hello, world!\n")
buffer.write("This is an in-memory file.\n")

# Seek back to the start before reading
buffer.seek(0)

# Read it back
content = buffer.read()
print(content)

# Get the entire value without seeking
buffer.seek(0)
print("getvalue:", buffer.getvalue()[:20], "...")

Output:

Hello, world!
This is an in-memory file.

getvalue: Hello, world!
 ...

The key methods are write(), read(), seek(0) (to reset the cursor to the start), and getvalue() (to get the entire contents regardless of cursor position). These are identical to what you’d call on a regular open file handle — which is exactly the point. Any function that reads from f will work with a buffer without modification.

BytesIO vs StringIO

The io module has two main buffer types. Choosing the wrong one causes a TypeError because Python strictly separates binary and text data:

ClassData typeUse whenEquivalent file mode
io.StringIOUnicode strings (str)CSV, JSON, HTML, text processingopen("f", "r") or open("f", "w")
io.BytesIOBytes (bytes)Images, PDFs, binary file formats, uploadsopen("f", "rb") or open("f", "wb")

The rule is simple: if you’d use open(path, "rb") or open(path, "wb") for real files, use BytesIO. If you’d use open(path, "r") or open(path, "w"), use StringIO. When in doubt, libraries that work with structured text (csv, json) expect StringIO; libraries that work with binary formats (PIL, xlsxwriter, boto3) expect BytesIO.

BytesIO vs StringIO comparison
StringIO for text. BytesIO for bytes. Mix them and get a TypeError at 2am.

StringIO with the csv Module

The most common use of StringIO is with the csv module, especially when you have CSV data as a string (from an API response, a database field, or a test fixture) and need to parse it without writing it to disk first.

Reading CSV from a String

# csv_stringio.py
import csv
import io

# CSV data as a string (could come from an API response, database, etc.)
csv_data = """name,age,city
Alice,30,New York
Bob,25,London
Carol,35,Sydney
"""

# Wrap in StringIO so csv.reader can use it as a file
reader = csv.DictReader(io.StringIO(csv_data))
rows = list(reader)

for row in rows:
    print(f"{row['name']}, age {row['age']}, from {row['city']}")

Output:

Alice, age 30, from New York
Bob, age 25, from London
Carol, age 35, from Sydney

csv.DictReader expects a file-like object. Wrapping your string in io.StringIO() gives it exactly that. This pattern is the canonical way to parse CSV data that isn’t already in a file — no temp files, no splitlines() hacks. The same technique works with csv.reader for plain row access.

Writing CSV to a String Buffer

The reverse is equally useful — write CSV data to a StringIO buffer and then retrieve the string to send as an HTTP response or return from a function:

# csv_write_buffer.py
import csv
import io

records = [
    {"product": "Widget A", "qty": 10, "price": 9.99},
    {"product": "Widget B", "qty": 25, "price": 4.49},
    {"product": "Widget C", "qty": 5, "price": 19.99},
]

# Write CSV into memory
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=["product", "qty", "price"])
writer.writeheader()
writer.writerows(records)

# Get the complete CSV string
csv_string = output.getvalue()
print(csv_string)

Output:

product,qty,price
Widget A,10,9.99
Widget B,25,4.49
Widget C,5,19.99

This pattern is ideal for API endpoints that return CSV downloads. Instead of writing the file to disk and then reading it back, you generate it directly into a buffer and return output.getvalue() as the response body. In Flask or FastAPI, you’d set the Content-Type header to text/csv and the Content-Disposition to attachment; filename="report.csv".

BytesIO for Binary Data

BytesIO works the same way as StringIO but for binary data. The most common use cases are image processing, working with archive files, and uploading to cloud storage without touching the filesystem.

Using BytesIO with Pillow

Here’s how to resize an image in memory and upload the result (simulated) without ever writing a temp file:

# bytesio_image.py
import io
from PIL import Image

# Create a simple test image (in real use, this comes from a file upload or URL)
original = Image.new("RGB", (800, 600), color=(100, 149, 237))  # cornflower blue

# Resize in memory
thumbnail = original.copy()
thumbnail.thumbnail((200, 150))

# Save the thumbnail to a BytesIO buffer instead of a file
buffer = io.BytesIO()
thumbnail.save(buffer, format="JPEG", quality=85)
buffer.seek(0)

# Now buffer contains the JPEG bytes
jpeg_bytes = buffer.read()
print(f"Thumbnail size: {thumbnail.size}")
print(f"JPEG bytes: {len(jpeg_bytes):,} bytes")

# You can re-open from buffer to verify
buffer.seek(0)
verified = Image.open(buffer)
print(f"Verified size from buffer: {verified.size}")

Output:

Thumbnail size: (200, 150)
JPEG bytes: 4,231 bytes
Verified size from buffer: (200, 150)

Pillow’s save() method accepts any file-like object. By passing a BytesIO buffer, the JPEG data stays in memory. You can then pass buffer directly to an S3 upload, an HTTP multipart upload, or any other consumer that accepts a file-like object. The seek(0) before reading is essential — after save() writes to the buffer, the cursor is at the end. Without seeking back to position 0, a subsequent read() returns empty bytes.

BytesIO bypassing filesystem
buffer.seek(0) before you read. Every single time. The disk doesn’t need to be involved.

Testing with In-Memory Buffers

One of the most valuable applications of io buffers is in tests. Instead of creating real files in your test suite and cleaning them up, you pass buffers as mock files. This makes tests faster, isolated, and side-effect-free.

# test_with_io.py
import csv
import io
from unittest.mock import patch, MagicMock


def parse_user_csv(file_handle):
    """Parse a CSV file and return a list of user dicts."""
    reader = csv.DictReader(file_handle)
    return [
        {"name": row["name"], "email": row["email"]}
        for row in reader
        if row.get("name") and row.get("email")
    ]


def write_report(users: list, output_file):
    """Write a summary report to a file handle."""
    output_file.write(f"Total users: {len(users)}\n")
    for user in users:
        output_file.write(f"  - {user['name']} ({user['email']})\n")


# Test using StringIO instead of real files
def test_parse_user_csv():
    fake_csv = io.StringIO("name,email\nAlice,alice@example.com\nBob,bob@example.com\n")
    result = parse_user_csv(fake_csv)
    assert len(result) == 2
    assert result[0]["name"] == "Alice"
    print("test_parse_user_csv: PASSED")


def test_write_report():
    users = [{"name": "Alice", "email": "alice@example.com"}]
    output = io.StringIO()
    write_report(users, output)
    content = output.getvalue()
    assert "Total users: 1" in content
    assert "Alice" in content
    print("test_write_report: PASSED")
    print("Report output:")
    print(content)


test_parse_user_csv()
test_write_report()

Output:

test_parse_user_csv: PASSED
test_write_report: PASSED
Report output:
Total users: 1
  - Alice (alice@example.com)

Notice that parse_user_csv and write_report never open files themselves — they accept a file handle as a parameter. This is the key design pattern that makes code testable with in-memory buffers. When you design functions to accept file handles instead of file paths, you gain testability for free. Callers in production pass real file handles; test callers pass StringIO or BytesIO objects.

Real-Life Example: In-Memory Report Generator

Here’s a complete example that generates a multi-section CSV report entirely in memory and returns it as bytes ready to be sent as an HTTP response or written to S3:

# report_generator.py
"""Generate an in-memory CSV report combining multiple data sections."""
import csv
import io
from datetime import date


def generate_sales_report(orders: list, returns: list, date_generated: date) -> bytes:
    """
    Generate a CSV report with two sections (Orders, Returns).
    Returns UTF-8 encoded bytes suitable for HTTP response or S3 upload.
    """
    output = io.StringIO()
    writer = csv.writer(output)

    # Header
    writer.writerow([f"Sales Report -- Generated {date_generated}"])
    writer.writerow([])  # blank line

    # Orders section
    writer.writerow(["== ORDERS =="])
    writer.writerow(["Order ID", "Customer", "Amount", "Status"])
    for order in orders:
        writer.writerow([
            order["id"],
            order["customer"],
            f"${order['amount']:.2f}",
            order["status"],
        ])
    writer.writerow([])

    # Returns section
    writer.writerow(["== RETURNS =="])
    writer.writerow(["Return ID", "Order ID", "Reason", "Refund"])
    for ret in returns:
        writer.writerow([
            ret["return_id"],
            ret["order_id"],
            ret["reason"],
            f"${ret['refund']:.2f}",
        ])
    writer.writerow([])

    # Totals
    total_sales = sum(o["amount"] for o in orders)
    total_refunds = sum(r["refund"] for r in returns)
    writer.writerow(["SUMMARY"])
    writer.writerow(["Total Sales", f"${total_sales:.2f}"])
    writer.writerow(["Total Refunds", f"${total_refunds:.2f}"])
    writer.writerow(["Net Revenue", f"${total_sales - total_refunds:.2f}"])

    return output.getvalue().encode("utf-8")


# Sample data
orders = [
    {"id": "ORD-001", "customer": "Alice", "amount": 150.00, "status": "completed"},
    {"id": "ORD-002", "customer": "Bob", "amount": 89.99, "status": "completed"},
    {"id": "ORD-003", "customer": "Carol", "amount": 210.50, "status": "pending"},
]
returns = [
    {"return_id": "RET-001", "order_id": "ORD-001", "reason": "Wrong size", "refund": 45.00},
]

report_bytes = generate_sales_report(orders, returns, date.today())
print(f"Report size: {len(report_bytes)} bytes")
print()
print(report_bytes.decode("utf-8"))

Output:

Report size: 357 bytes

Sales Report -- Generated 2026-04-25

== ORDERS ==
Order ID,Customer,Amount,Status
ORD-001,Alice,$150.00,completed
ORD-002,Bob,$89.99,completed
ORD-003,Carol,$210.50,pending

== RETURNS ==
Return ID,Order ID,Reason,Refund
RET-001,ORD-001,Wrong size,$45.00

SUMMARY
Total Sales,$450.49
Total Refunds,$45.00
Net Revenue,$405.49

The function returns raw bytes (output.getvalue().encode("utf-8")) which you can send directly as an HTTP response body with Content-Type: text/csv, upload to S3 with boto3.put_object(Body=report_bytes), or attach to an email. Nothing touches the filesystem. Extend this pattern to generate Excel files with openpyxl (which also accepts BytesIO for its save() call) to produce richer reports.

StringIO and BytesIO: when 'open a file' means 'pretend'.
StringIO and BytesIO: when ‘open a file’ means ‘pretend’.

Frequently Asked Questions

Won’t large data cause memory issues with BytesIO?

For most use cases — reports, images, documents — in-memory buffers are perfectly fine. A 100MB file held in a BytesIO uses 100MB of RAM, the same as if you’d read a file into memory. The concern arises for very large files (multi-GB). In those cases, use chunked streaming instead: generators, shutil.copyfileobj(), or the streaming upload APIs in boto3 and requests. For reports and typical file manipulation, in-memory buffers are faster and simpler than disk I/O.

Do I need to close BytesIO and StringIO?

You should, but it doesn’t free disk space the way closing a real file does — it just releases the internal memory buffer. The cleanest approach is to use them as context managers: with io.BytesIO() as buf:. This calls buf.close() automatically when the block exits, which allows the buffer’s memory to be garbage collected sooner. In practice, forgetting to close buffers is rarely a bug, but it’s good habit especially in long-running services where you process many buffers.

Why does read() return empty bytes after writing?

Because the file cursor is at the end of the buffer after you write. Both files and buffers have an internal position pointer. When you write data, the pointer advances to the end. A subsequent read() starts from the current position — which is the end — and returns nothing. Always call buffer.seek(0) before reading back data you just wrote. Alternatively, buffer.getvalue() always returns all content regardless of cursor position, which is more convenient when you want everything.

Can I use BytesIO with pandas?

Yes. pd.read_csv(io.StringIO(csv_string)) parses CSV data from a string, and pd.read_excel(io.BytesIO(excel_bytes)) reads Excel data from bytes. For output, use a buffer with df.to_csv(io.StringIO()) or df.to_excel(io.BytesIO()). This is especially useful in web scraping (parse CSV from HTTP responses without temp files) and in serverless functions (generate Excel reports in AWS Lambda without the /tmp filesystem).

How do I convert between BytesIO and StringIO?

You can’t convert directly, but you can wrap one with the other. To treat binary bytes as text, use io.TextIOWrapper(byte_buffer), which decodes bytes to strings as you read. To get bytes from a StringIO, call string_buffer.getvalue().encode("utf-8"). The most common practical case is HTTP responses: you receive bytes from the network (use BytesIO) and then decode them with .decode("utf-8") to get a string that you can wrap in StringIO for a text-mode parser like csv.

Conclusion

The io module’s BytesIO and StringIO classes eliminate the need for temporary files in the vast majority of cases. We covered the fundamental read/write/seek pattern, the difference between binary and text buffers, reading and writing CSV data in memory, processing images with Pillow without disk I/O, writing testable code that accepts file handles, and building a complete in-memory report generator. The design principle that makes all of this possible is simple: write functions that accept file handles, not file paths.

The next step is to apply this pattern to your existing code — identify any function that opens a file by path and refactor it to accept a file handle instead. This makes the function testable with StringIO/BytesIO, usable with network streams, and composable with any other file-like object. Check the official io module documentation for the full interface including IOBase, RawIOBase, and buffered I/O classes.

How To Use Python ast Module to Parse and Analyze Code

How To Use Python ast Module to Parse and Analyze Code

Advanced

At some point, you’ll need to write code that understands code. Maybe you want to build a custom linter that enforces your team’s specific patterns, or scan a codebase to find every call to a deprecated function, or automatically modernize syntax across hundreds of files. Regular expressions aren’t the right tool — they can’t reason about structure. Python’s built-in ast module is.

The ast module parses Python source code into an Abstract Syntax Tree — a structured, tree-shaped representation of your program that you can inspect, traverse, and transform. It’s the same mechanism Python itself uses when compiling code. The module is part of the standard library, no installation required. If you’ve ever used a linter like flake8 or Ruff, or a type checker like mypy, you’ve already relied on code built on top of ast.

In this article we’ll cover how to parse source code into a tree, walk the tree to find specific patterns, use the NodeVisitor pattern to build a linter, transform code with NodeTransformer, extract docstrings and function signatures, and build a practical code analysis tool. This is advanced territory — we assume you’re comfortable with Python classes and recursion.

AST Quick Example

Parsing a Python string into an AST takes one line. Here’s the simplest possible example — parse a function definition and inspect its structure:

# quick_ast.py
import ast

source = """
def greet(name: str) -> str:
    return "Hello, " + name
"""

tree = ast.parse(source)
print(ast.dump(tree, indent=2))

Output (abbreviated):

Module(
  body=[
    FunctionDef(
      name='greet',
      args=arguments(
        args=[arg(arg='name', annotation=Name(id='str'))],
        ...
      ),
      body=[
        Return(
          value=BinOp(
            left=Constant(value='Hello, '),
            op=Add(),
            right=Name(id='name')
          )
        )
      ],
      returns=Name(id='str')
    )
  ]
)

ast.parse() returns a Module node — the root of the tree. Every element of the source code becomes a node: the function definition is a FunctionDef, the return value is a Return, and the string concatenation is a BinOp (binary operation) with Add as the operator. This tree structure is what makes programmatic analysis possible — instead of searching strings, you’re traversing structured objects.

Understanding AST Node Types

The AST has dozens of node types. You don’t need to memorize them — you learn the ones relevant to what you’re building. Here are the most common ones you’ll encounter when analyzing Python code:

Node typeWhat it representsKey attributes
ModuleTop-level filebody (list of statements)
FunctionDefdef statementname, args, body, decorator_list
ClassDefclass statementname, bases, body
CallFunction callfunc, args, keywords
Importimport statementnames (list of aliases)
ImportFromfrom X import Ymodule, names
AssignVariable assignmenttargets, value
Returnreturn statementvalue
ConstantLiteral valuevalue (int, str, float, etc.)
NameVariable name referenceid (the name string)

The best way to discover a node type is to write a small piece of Python that uses the pattern you care about, parse it with ast.parse(), and print the tree with ast.dump(tree, indent=2). Think of it as using the AST to describe itself.

Inspecting AST tree structure
ast.dump(tree, indent=2) — the X-ray machine for your code.

Walking the Tree with ast.walk

For simple analysis tasks, ast.walk() visits every node in the tree in no particular order. It’s the fastest way to extract information when you don’t need to care about context (which node is parent, which is child).

Finding All Imports

Here’s how to extract every import statement from a Python file:

# find_imports.py
import ast

source = """
import os
import sys
from pathlib import Path
from collections import Counter, defaultdict
import json as js
"""

tree = ast.parse(source)
imports = []

for node in ast.walk(tree):
    if isinstance(node, ast.Import):
        for alias in node.names:
            imports.append(("import", alias.name, alias.asname))
    elif isinstance(node, ast.ImportFrom):
        for alias in node.names:
            imports.append(("from", f"{node.module}.{alias.name}", alias.asname))

for kind, name, alias in imports:
    asname = f" as {alias}" if alias else ""
    print(f"{kind}: {name}{asname}")

Output:

import: os
import: sys
from: pathlib.Path
from: collections.Counter
from: collections.defaultdict
import: json as js

The key pattern here is isinstance(node, ast.Import) — you test each node against a specific type to filter what you care about. ast.walk() handles the recursion for you. The difference between ast.Import (for import X) and ast.ImportFrom (for from X import Y) is important — they have different attributes.

Finding Specific Function Calls

Suppose you want to audit a codebase for calls to eval() or exec() — common security concerns. Here’s how to find them:

# find_eval_calls.py
import ast

source = """
x = eval("1 + 1")
safe_result = int("42")
exec("print('dangerous')")
y = compile("code", "file", "exec")
"""

tree = ast.parse(source)
dangerous_calls = []

for node in ast.walk(tree):
    if isinstance(node, ast.Call):
        # Get the function name (handles simple Name calls)
        if isinstance(node.func, ast.Name):
            func_name = node.func.id
            if func_name in ("eval", "exec", "compile"):
                dangerous_calls.append((func_name, node.lineno))

for name, line in dangerous_calls:
    print(f"Line {line}: Found dangerous call to {name}()")

Output:

Line 2: Found dangerous call to eval()
Line 4: Found dangerous call to exec()
Line 5: Found dangerous call to compile()

This is the skeleton of a security linter. The node.lineno attribute gives you the line number, which you’d use in a real tool to show the user exactly where the issue is. The check for isinstance(node.func, ast.Name) handles direct calls like eval() — for method calls like obj.eval(), the func would be an ast.Attribute node instead, with node.func.attr giving the method name.

Building a Linter with NodeVisitor

For structured analysis where you need context — like knowing which class or function you’re inside — subclass ast.NodeVisitor. You define visit_NodeType() methods that are called automatically as the visitor traverses the tree.

# node_visitor_linter.py
import ast
from dataclasses import dataclass, field


@dataclass
class LintIssue:
    line: int
    col: int
    code: str
    message: str

    def __str__(self):
        return f"Line {self.line}:{self.col} [{self.code}] {self.message}"


class SimpleLinter(ast.NodeVisitor):
    """A simple custom linter using NodeVisitor."""

    def __init__(self):
        self.issues: list[LintIssue] = []

    def visit_FunctionDef(self, node: ast.FunctionDef):
        # Rule: Functions should have docstrings
        if not (node.body and isinstance(node.body[0], ast.Expr) and
                isinstance(node.body[0].value, ast.Constant)):
            self.issues.append(LintIssue(
                node.lineno, node.col_offset,
                "D001", f"Function '{node.name}' is missing a docstring"
            ))

        # Rule: Function names should be snake_case (no uppercase letters)
        if any(c.isupper() for c in node.name):
            self.issues.append(LintIssue(
                node.lineno, node.col_offset,
                "N001", f"Function name '{node.name}' should be snake_case"
            ))

        # Continue traversing into the function body
        self.generic_visit(node)

    def visit_Call(self, node: ast.Call):
        # Rule: Warn on print() calls in non-test code
        if isinstance(node.func, ast.Name) and node.func.id == "print":
            self.issues.append(LintIssue(
                node.lineno, node.col_offset,
                "D002", "Use logging instead of print()"
            ))
        self.generic_visit(node)


# Test the linter
source = """
def processData(data):
    result = data.strip()
    print("Processing:", result)
    return result

def clean_text(text):
    \"\"\"Clean a text string.\"\"\"
    return text.lower()
"""

tree = ast.parse(source)
linter = SimpleLinter()
linter.visit(tree)

if linter.issues:
    for issue in linter.issues:
        print(issue)
else:
    print("No issues found.")

Output:

Line 2:0 [D001] Function 'processData' is missing a docstring
Line 2:0 [N001] Function name 'processData' should be snake_case
Line 4:4 [D002] Use logging instead of print()

The key detail is self.generic_visit(node) at the end of each visitor method. This tells the visitor to continue traversing into the node’s children. If you forget it, the traversal stops at that node and won’t visit anything inside the function body. This is the most common mistake when starting with NodeVisitor.

NodeVisitor pattern
Forget generic_visit(node) and the linter silently misses everything inside.

Transforming Code with NodeTransformer

The ast.NodeTransformer class works like NodeVisitor but lets you replace nodes. This is how automated refactoring tools work. You return a modified node (or a different node entirely) to make the substitution:

# node_transformer.py
import ast

class PrintToLoggingTransformer(ast.NodeTransformer):
    """Replace print() calls with logging.info() calls."""

    def __init__(self):
        self.needs_logging_import = False

    def visit_Call(self, node: ast.Call):
        # Only transform direct print() calls
        if isinstance(node.func, ast.Name) and node.func.id == "print":
            self.needs_logging_import = True
            # Build: logging.info(args)
            new_node = ast.Call(
                func=ast.Attribute(
                    value=ast.Name(id="logging", ctx=ast.Load()),
                    attr="info",
                    ctx=ast.Load(),
                ),
                args=node.args,
                keywords=node.keywords,
            )
            return ast.copy_location(new_node, node)
        return node


source = """
def process(data):
    print("Starting process")
    result = data.strip()
    print("Result:", result)
    return result
"""

tree = ast.parse(source)
transformer = PrintToLoggingTransformer()
new_tree = transformer.visit(tree)
ast.fix_missing_locations(new_tree)

# Convert back to source code
print(ast.unparse(new_tree))

Output:

def process(data):
    logging.info('Starting process')
    result = data.strip()
    logging.info('Result:', result)
    return result

Two functions are critical when transforming trees. ast.copy_location(new_node, old_node) copies the line/column information from the original node to the new one — without this, the tree is missing location data and tools that use it will fail. ast.fix_missing_locations(tree) fills in any remaining missing location info before you use the tree. ast.unparse() converts the modified tree back to Python source code.

Real-Life Example: Codebase Complexity Analyzer

Here’s a practical tool that analyzes a Python module and reports function complexity (number of branches), missing docstrings, and function count per class:

# complexity_analyzer.py
"""Analyze Python source code for complexity metrics using the ast module."""
import ast
from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class FunctionStats:
    name: str
    lineno: int
    has_docstring: bool
    branch_count: int
    arg_count: int


@dataclass
class ModuleStats:
    filename: str
    function_count: int = 0
    class_count: int = 0
    functions: list = field(default_factory=list)


class ComplexityVisitor(ast.NodeVisitor):
    """Visit all function definitions and compute basic metrics."""

    def __init__(self):
        self.stats = []

    def _count_branches(self, node: ast.AST) -> int:
        """Count if/elif/for/while/with/except branches."""
        branch_types = (ast.If, ast.For, ast.While, ast.With,
                        ast.ExceptHandler, ast.comprehension)
        return sum(
            1 for n in ast.walk(node) if isinstance(n, branch_types)
        )

    def _has_docstring(self, node: ast.FunctionDef) -> bool:
        return (
            bool(node.body)
            and isinstance(node.body[0], ast.Expr)
            and isinstance(node.body[0].value, ast.Constant)
            and isinstance(node.body[0].value.value, str)
        )

    def visit_FunctionDef(self, node: ast.FunctionDef):
        self.stats.append(FunctionStats(
            name=node.name,
            lineno=node.lineno,
            has_docstring=self._has_docstring(node),
            branch_count=self._count_branches(node),
            arg_count=len(node.args.args),
        ))
        self.generic_visit(node)

    visit_AsyncFunctionDef = visit_FunctionDef  # same logic for async


def analyze_source(source: str, filename: str = "") -> ModuleStats:
    tree = ast.parse(source, filename=filename)
    visitor = ComplexityVisitor()
    visitor.visit(tree)

    classes = [n for n in ast.walk(tree) if isinstance(n, ast.ClassDef)]
    stats = ModuleStats(
        filename=filename,
        function_count=len(visitor.stats),
        class_count=len(classes),
        functions=visitor.stats,
    )
    return stats


# Demo: analyze a sample module
sample_source = """
class DataProcessor:
    \"\"\"Processes incoming data.\"\"\"

    def __init__(self, config):
        self.config = config

    def process(self, data):
        if not data:
            return None
        results = []
        for item in data:
            if item.get("valid"):
                try:
                    results.append(self._transform(item))
                except ValueError:
                    pass
        return results

    def _transform(self, item):
        return item["value"] * 2

def standalone_helper(x, y, z):
    return x + y + z
"""

report = analyze_source(sample_source, "data_processor.py")
print(f"File: {report.filename}")
print(f"Functions: {report.function_count}, Classes: {report.class_count}")
print()
for fn in report.functions:
    doc = "OK" if fn.has_docstring else "MISSING"
    print(f"  {fn.name}() line {fn.lineno}: branches={fn.branch_count}, args={fn.arg_count}, docstring={doc}")

Output:

File: data_processor.py
Functions: 4, Classes: 1

  __init__() line 4: branches=0, args=2, docstring=MISSING
  process() line 7: branches=5, args=2, docstring=MISSING
  _transform() line 17: branches=0, args=2, docstring=MISSING
  standalone_helper() line 20: branches=0, args=4, docstring=MISSING

This is the foundation of a real complexity gate — you could extend it to fail a CI job if any function has a branch count over 10 (cyclomatic complexity), or to generate a Markdown report of which functions need documentation. The visit_AsyncFunctionDef = visit_FunctionDef line is a neat trick: async functions have a different node type but the same structure, so you can reuse the same visitor method.

ast.parse(): Python reads itself.
ast.parse(): Python reads itself.

Frequently Asked Questions

Is ast.literal_eval() safe for evaluating user input?

Yes — ast.literal_eval() is the safe alternative to eval() for evaluating Python literals (strings, numbers, lists, dicts, tuples, booleans, None). It raises a ValueError or SyntaxError if the input contains anything that isn’t a literal, so it can’t execute arbitrary code. Use it any time you need to parse a string like "[1, 2, 3]" or "{'key': 'value'}" into a Python object. Never use plain eval() on data you don’t fully control.

How do I convert an AST back to source code?

Use ast.unparse(tree), which was added in Python 3.9. It converts an AST back to a valid Python source string. The output is normalized — it won’t perfectly preserve the original formatting, spacing, or comments (comments are not represented in the AST). For tools that need to preserve the original formatting while making targeted changes, use the LibCST library instead, which works at the Concrete Syntax Tree level and preserves all formatting.

Does ast parse type comments?

Yes, if you pass type_comments=True to ast.parse(). This handles the older Python 2-style type annotations written as comments: # type: List[str]. For modern Python 3 type annotations (written inline as x: List[str]), the ast module handles them automatically without any flag — they’re represented as Annotation nodes on the function arguments and assignments.

When should I use the ast module vs. regex?

Use ast when you need to understand structure — “find all functions that call X”, “check if a class has a method named Y”, “find all string literals longer than N characters.” Use regex when you need fast text search that doesn’t require structural understanding — “does this file contain the word ‘TODO'”, “find all email addresses in a log file.” The AST is slower to parse and more complex to use, but it’s correct. Regex on code can be fooled by strings, comments, and multiline expressions.

How do I get line numbers from AST nodes?

Most statement and expression nodes have lineno and col_offset attributes that give the line number (1-indexed) and column (0-indexed) of the node. Some nodes (like arguments) don’t have line numbers — in that case, use the parent node’s line number. Python 3.8+ also provides end_lineno and end_col_offset for the end position, which is useful for tools that need to highlight or replace a specific range in the source text.

Conclusion

The ast module gives you the ability to write programs that understand Python programs. We covered parsing source into a tree with ast.parse(), finding nodes with ast.walk() and isinstance() checks, building structured linters with ast.NodeVisitor, transforming code with ast.NodeTransformer, and converting back to source with ast.unparse(). These primitives power real tools: every Python linter, formatter, and type checker is built on the same foundation.

To go deeper, try extending the complexity analyzer to output a Markdown report, or build a transformer that automatically adds __slots__ to classes without them. The natural next level of complexity is LibCST for format-preserving transformations, and the tokenize module for even lower-level analysis of individual tokens including comments.

For the complete node reference, see the official ast module documentation.

How To Use Python tqdm for Progress Bars in CLI and Loops

How To Use Python tqdm for Progress Bars in CLI and Loops

Beginner

You kick off a script to process 10,000 records, or download a batch of files, or train a model — and then you wait. The terminal is blank. Is it working? Did it hang? How long until it finishes? The only answer you have is “I don’t know” and the only way to check is to add a print statement, kill the script, and run it again. This is a solved problem. The solution is tqdm.

tqdm is a Python library that wraps any iterable and displays a progress bar in the terminal automatically. It shows you the current iteration count, the percentage complete, the elapsed time, and the estimated time remaining — all in a single line that updates in place without scrolling. It works on Python loops, pandas DataFrames, file downloads, and async code. Install it with pip install tqdm.

In this article we’ll cover the basics of wrapping loops with tqdm, customizing the bar appearance, using tqdm.pandas() for DataFrame operations, building nested progress bars, manually updating a bar for non-iterable tasks, and integrating tqdm into CLI tools. By the end you’ll never ship a long-running script without a progress bar again.

tqdm Quick Example

Adding a progress bar to an existing loop takes exactly one line of code — wrap the iterable with tqdm():

# quick_tqdm.py
import time
from tqdm import tqdm

items = range(50)

for item in tqdm(items, desc="Processing"):
    time.sleep(0.05)  # simulate work

Output (updates in place in the terminal):

Processing: 100%|####################| 50/50 [00:02<00:00, 22.4it/s]

The bar shows the description (“Processing”), percentage, item count, elapsed time, and throughput in items per second. No configuration needed. The key thing happening here is that tqdm(items) wraps the original iterable transparently — the loop body receives the same values it would have without tqdm. All the display logic is handled automatically.

What Is tqdm and When Should You Use It?

tqdm stands for “taqaddum” — Arabic for “progress.” The name comes from the Python library’s origin in scientific computing, where long-running data pipelines and model training jobs made progress feedback essential. Today it’s one of the most downloaded Python packages, used everywhere from Jupyter notebooks to production ML pipelines.

The core idea is deceptively simple: tqdm wraps an iterable and intercepts each iteration to update a counter and render the bar. Because it only hooks into the iterator protocol, it works with any Python iterable — lists, generators, file objects, database cursors, anything you can loop over.

Use casetqdm approachWhat you get
Simple looptqdm(iterable)Progress bar with count and ETA
pandas applytqdm.pandas() + .progress_apply()Row-by-row progress in DataFrames
Unknown totaltqdm(total=None)Spinner-style counter (no percentage)
Manual tasksbar.update(n)Increment by any amount, any time
Nested loopstqdm(leave=False)Inner bar clears when done, outer persists

The right time to add tqdm is any time a user (including you) will wait more than 2-3 seconds for a script to complete. Feedback during waiting is not cosmetic — it’s a signal that the program is alive and working correctly.

Wrapping a loop with tqdm
One import, one wrapper, zero excuse for blank terminals.

Basic tqdm Usage

Wrapping Any Iterable

The simplest use is wrapping a list or range directly. The desc parameter adds a label, and unit changes the “it/s” suffix to something more meaningful for your context:

# basic_loop.py
import time
from tqdm import tqdm

# Process a list of URLs (simulated)
urls = [f"https://api.example.com/item/{i}" for i in range(30)]

results = []
for url in tqdm(urls, desc="Fetching", unit="req"):
    time.sleep(0.1)  # simulate network call
    results.append({"url": url, "status": 200})

print(f"\nFetched {len(results)} items")

Output:

Fetching: 100%|####################| 30/30 [00:03<00:00,  9.8req/s]

Fetched 30 items

Using unit="req" changes the display from “it/s” to “req/s”, which makes the bar much more readable when you’re processing HTTP requests. Choose a unit that matches what you’re iterating over: “file”, “row”, “batch”, “img” — whatever makes the bar self-explanatory at a glance.

Showing Live Metrics with postfix

The set_postfix() method lets you display live metrics next to the bar. This is invaluable for training loops where you want to show the current loss, accuracy, or any metric alongside the progress:

# postfix_example.py
import time
import random
from tqdm import tqdm

epochs = 10
losses = []

with tqdm(range(epochs), desc="Training", unit="epoch") as bar:
    for epoch in bar:
        time.sleep(0.3)  # simulate training step
        loss = round(1.0 / (epoch + 1) + random.uniform(0, 0.1), 4)
        acc = round(0.5 + epoch * 0.05 + random.uniform(0, 0.02), 4)
        losses.append(loss)
        bar.set_postfix(loss=loss, acc=acc)

print(f"\nFinal loss: {losses[-1]:4f}")

Output (last frame shown):

Training: 100%|####################| 10/10 [00:03<00:00,  3.3epoch/s, loss=0.1012, acc=0.9743]

Final loss: 0.1012

Using tqdm as a context manager (the with tqdm(...) as bar: pattern) ensures the bar is properly closed even if the loop raises an exception. The set_postfix() call accepts any keyword arguments and displays them as key=value pairs at the end of the bar line. This pattern is ubiquitous in ML training scripts for a good reason — you can see at a glance if a training run is converging.

Nested Progress Bars

When processing multiple files, each with multiple items, you often want an outer bar for files and an inner bar for items. The leave=False parameter on the inner bar makes it disappear when done, so only the outer bar persists in the terminal:

# nested_bars.py
import time
from tqdm import tqdm

datasets = ["train.csv", "test.csv", "validation.csv"]
rows_per_file = [1000, 250, 250]

for dataset, n_rows in tqdm(zip(datasets, rows_per_file), desc="Files", total=len(datasets)):
    for row in tqdm(range(n_rows), desc=f"  {dataset}", leave=False, unit="row"):
        time.sleep(0.001)  # simulate row processing

print("\nAll datasets processed.")

Output (inner bar clears after each file):

  train.csv: 100%|####################| 1000/1000 [00:01<00:00, 998.3row/s]
Files:  33%|######                | 1/3 [00:01<00:02,  1.02s/it]

The inner bar is indented with spaces in the description to visually show the hierarchy in the terminal. When leave=False is set, the inner bar erases itself when the inner loop completes, leaving the outer bar as the only persistent display. This prevents terminal clutter when processing hundreds of files with thousands of rows each.

Nested progress bars
leave=False on the inner bar. Otherwise your terminal looks like a stack trace.

tqdm with pandas

Pandas apply() and map() operations can be slow on large DataFrames, and by default they give no feedback at all. tqdm.pandas() patches pandas to add progress tracking to these operations with a single setup call:

# tqdm_pandas.py
import time
import pandas as pd
from tqdm import tqdm

# One-time setup: patch pandas
tqdm.pandas(desc="Processing rows")

# Create sample DataFrame
df = pd.DataFrame({
    "name": [f"user_{i}" for i in range(500)],
    "score": range(500),
})

def slow_transform(row):
    """Simulate a slow per-row operation."""
    time.sleep(0.005)
    return row["name"].upper() + f"_{row['score'] * 2}"

# Use progress_apply instead of apply
df["result"] = df.progress_apply(slow_transform, axis=1)

print(df[["name", "result"]].head(3))

Output:

Processing rows: 100%|####################| 500/500 [00:02<00:00, 223.4it/s]

        name         result
0     user_0    USER_0_0
1     user_1    USER_1_2
2     user_2    USER_2_4

After calling tqdm.pandas(), replace any .apply() call with .progress_apply() and any .map() with .progress_map(). The bar automatically knows the total row count from the DataFrame shape. This is one of the most practical tqdm features because pandas operations on large DataFrames are a common source of “is it still running?” anxiety.

Manual Progress Bar Updates

Sometimes you can’t wrap a simple loop — maybe you’re processing variable-size batches, waiting for callbacks, or tracking progress through a recursive operation. In these cases you can manage the bar manually using tqdm(total=n) and bar.update():

# manual_bar.py
import time
from tqdm import tqdm

# Simulate downloading files of different sizes (in MB)
files = {"report.pdf": 5, "data.zip": 42, "model.pkl": 128, "logs.tar": 18}
total_mb = sum(files.values())

with tqdm(total=total_mb, desc="Downloading", unit="MB") as bar:
    for filename, size_mb in files.items():
        bar.set_description(f"Downloading {filename}")
        # Simulate chunked download
        chunk_size = max(1, size_mb // 10)
        downloaded = 0
        while downloaded < size_mb:
            time.sleep(0.05)
            chunk = min(chunk_size, size_mb - downloaded)
            downloaded += chunk
            bar.update(chunk)  # advance by actual bytes downloaded

print("\nAll downloads complete.")

Output (last frame):

Downloading model.pkl: 100%|####################| 193/193 [00:09<00:00, 20.8MB/s]

All downloads complete.

The bar.update(n) call advances the bar by n units, not by 1. This is critical when you want the bar to reflect meaningful units like bytes or megabytes rather than iteration counts. You can call update() as often as needed -- including from inside nested loops, callbacks, or threads. Just make sure the bar's total matches the sum of all your update() calls.

File upload with tqdm
bar.update(chunk) -- because 42MB is not 42 iterations.

Real-Life Example: Batch File Processor with tqdm

Here's a practical script that processes a directory of text files -- counting words, detecting language, and writing a report -- with a nested progress bar showing both file-level and line-level progress:

# batch_processor.py
"""Process a directory of text files and generate a word count report."""
import os
import time
import random
from pathlib import Path
from collections import Counter
from tqdm import tqdm


def simulate_process_line(line: str) -> dict:
    """Simulate per-line processing (e.g. NLP tagging)."""
    time.sleep(0.002)
    words = line.split()
    return {"word_count": len(words), "char_count": len(line)}


def process_file(filepath: Path) -> dict:
    """Process a single file and return stats."""
    # Simulate file with random lines
    lines = [f"This is line {i} of the file with some content." for i in range(random.randint(20, 60))]
    total_words = 0
    total_chars = 0

    for line in tqdm(lines, desc=f"  {filepath.name}", leave=False, unit="line"):
        result = simulate_process_line(line)
        total_words += result["word_count"]
        total_chars += result["char_count"]

    return {
        "file": filepath.name,
        "lines": len(lines),
        "words": total_words,
        "chars": total_chars,
    }


def main():
    # Simulate a list of files
    files = [Path(f"document_{i:02d}.txt") for i in range(8)]
    results = []

    print(f"Processing {len(files)} files...\n")

    for filepath in tqdm(files, desc="Files", unit="file"):
        stats = process_file(filepath)
        results.append(stats)

    # Print summary report
    print("\n=== Report ===")
    total_words = sum(r["words"] for r in results)
    for r in results:
        print(f"{r['file']}: {r['lines']} lines, {r['words']} words")
    print(f"\nTotal words across all files: {total_words}")


if __name__ == "__main__":
    main()

Output (final state):

Processing 8 files...

Files: 100%|####################| 8/8 [00:04<00:00,  1.8file/s]

=== Report ===
document_00.txt: 45 lines, 315 words
document_01.txt: 33 lines, 231 words
...
Total words across all files: 2,187

This pattern -- an outer bar for files, an inner bar with leave=False for per-file work -- scales well to thousands of files. The inner bar gives real-time feedback on large files while the outer bar tracks overall progress. Extend this by replacing simulate_process_line() with your actual NLP, parsing, or transformation logic.

Frequently Asked Questions

Why isn't the progress bar showing in my Jupyter notebook?

In Jupyter, use from tqdm.notebook import tqdm instead of from tqdm import tqdm. The notebook version renders an HTML progress bar widget instead of a text bar, which handles Jupyter's output buffering correctly. The API is identical -- you can use the same tqdm(iterable, desc=...) pattern. The tqdm.auto import automatically chooses the right version based on the environment: from tqdm.auto import tqdm works in both terminal scripts and notebooks.

Does tqdm work with generators?

Yes, but without a total parameter the bar can't show a percentage -- it shows a counter and throughput only. If you know the total ahead of time, pass it explicitly: tqdm(my_generator, total=1000). If you genuinely don't know the total, tqdm will still show iteration count and speed, which is much more informative than a blank terminal. For generators from database queries, consider getting the count first with a COUNT(*) query and passing it as total.

How do I disable tqdm in production or tests?

Pass disable=True to any tqdm call, or use an environment variable check: tqdm(iterable, disable=os.getenv("CI") == "true"). This pattern lets CI pipelines run without progress bar output cluttering the logs, while development runs still get the full bar. You can also use tqdm(iterable, disable=not sys.stdout.isatty()) to automatically disable the bar when output is being piped or redirected.

Does tqdm work with multiprocessing?

Yes, but you need to use tqdm.contrib.concurrent or manage the bar manually. For simple parallel work, from tqdm.contrib.concurrent import process_map is a drop-in replacement for multiprocessing.Pool.map that adds a progress bar automatically. For more complex cases, use a tqdm instance with a multiprocessing.Queue to collect updates from worker processes and call bar.update() from the main process.

Can I print to stdout while tqdm is running?

Use tqdm.write("message") instead of print() inside a tqdm loop. Regular print() interferes with the bar's in-place update mechanism and leaves garbled output. tqdm.write() temporarily clears the bar, prints the message, and re-draws the bar below it -- so both your messages and the progress bar display cleanly.

Conclusion

tqdm turns a blank waiting terminal into an informative, reassuring feedback loop. We covered wrapping iterables with tqdm(), showing live metrics with set_postfix(), using tqdm.pandas() for DataFrame operations, building nested bars with leave=False, and manually controlling a bar with bar.update(n). These patterns cover 95% of the use cases you'll encounter in real scripts.

The best next step is to go through your existing scripts and add tqdm to any loop that takes more than a second. The one-line wrapper -- for item in tqdm(items): -- is all you need to start. From there, try tqdm.auto if you work across notebook and terminal environments, and explore tqdm.contrib.concurrent when you add multiprocessing to your pipelines.

For the full API reference, see the official tqdm documentation.

How To Use Python Ruff for Fast Linting and Formatting

How To Use Python Ruff for Fast Linting and Formatting

Intermediate

You’ve probably been there: a CI pipeline fails because of a missing blank line, a wrong import order, or an unused variable that snuck in during a late-night refactor. You run flake8, then black, then isort — three separate tools, three separate configs, three separate slow passes over your codebase. It works, but it’s a friction tax you pay on every commit. Ruff eliminates that friction entirely by replacing all three (and more) in a single, shockingly fast tool.

Ruff is an open-source Python linter and formatter written in Rust. It’s compatible with the rule sets from flake8, isort, pyupgrade, and dozens of other tools — and it runs 10-100x faster than any of them. You install it with pip install ruff and it’s ready to use immediately, no complex setup required. It works in your editor, in CI, and as a pre-commit hook.

In this article we’ll cover how to install Ruff, run it for linting and formatting, configure it via pyproject.toml, integrate it into pre-commit hooks, and understand the most common rule codes. By the end you’ll have a complete code quality workflow that runs in milliseconds instead of seconds.

Ruff Quick Example

The fastest way to see what Ruff does is to run it on a messy Python file and watch it clean up. Here’s a small script with intentional problems — unused imports, inconsistent quotes, and a line that’s too long:

# messy_script.py
import os
import sys
import json  # unused

def greet(name):
    message = "Hello, " + name + "! Welcome to the program. This line is intentionally very long and exceeds the 88-character limit set by most formatters."
    print(message)

greet('Alice')

Run Ruff’s linter on it:

# In your terminal
ruff check messy_script.py

Output:

messy_script.py:3:8: F401 [*] `json` imported but unused
messy_script.py:8:89: E501 Line too long (148 > 88 characters)
Found 2 errors.
[*] 2 fixable with the `--fix` option.

Now run the formatter:

ruff format messy_script.py

Output:

1 file reformatted

Ruff flagged the unused import, the long line, and reformatted the file to use consistent double quotes — all in under 100 milliseconds. The [*] markers in the linter output mean the issue can be auto-fixed with --fix. We’ll dig into each of these capabilities in detail below.

What Is Ruff and Why Use It?

Ruff is a Python static analysis tool that combines linting and formatting into one binary. It was created by Charlie Marsh and is maintained by Astral (the same team behind uv). The key insight behind Ruff is that Python’s existing linting ecosystem — flake8, pylint, isort, pyupgrade, pydocstyle — grew organically over decades as separate tools. Ruff re-implements all of them in Rust, which gives it two major advantages: speed and simplicity.

Here’s how Ruff compares to the traditional Python linting stack:

ToolFunctionSpeedConfig fileRuff equivalent
flake8Linting (PEP 8, logic)Slow.flake8ruff check
blackFormattingMediumpyproject.tomlruff format
isortImport sortingSlow.isort.cfgruff check --select I
pyupgradeModernize syntaxSlowN/Aruff check --select UP
pydocstyleDocstring styleSlowsetup.cfgruff check --select D

The most important thing to understand is that Ruff’s rule set is organized by prefix codes. F rules come from pyflakes (logic errors), E/W from pycodestyle (style), I from isort (imports), UP from pyupgrade (modern syntax). Once you know the prefix system, you can pick exactly which rules your project enforces.

One tool to replace them all
flake8 + black + isort. Or one tool that does all three before your coffee finishes.

Installing Ruff

Ruff can be installed several ways depending on your workflow. The simplest approach is pip, but using uv is even faster if you already have it.

Install with pip

Install Ruff into your current Python environment or virtual environment:

# install_ruff.sh
pip install ruff

Output:

Successfully installed ruff-0.4.0

Install with uv (recommended)

If you’re using uv for project management, add Ruff as a development dependency:

# install_ruff_uv.sh
uv add --dev ruff

Output:

Resolved 1 package in 0.42s
Added ruff==0.4.0

After installing, verify it works by checking the version. Ruff moves fast — new versions ship frequently with rule improvements and bug fixes:

# verify_ruff.sh
ruff --version

Output:

ruff 0.4.0

Linting with ruff check

The ruff check command scans your code for problems. By default it runs a curated set of rules from pyflakes and pycodestyle that catches the most impactful issues without overwhelming you with warnings.

Running a Basic Check

Point ruff check at a file or directory. For most projects you’ll run it against the current directory:

# run_check.sh
# Check a single file
ruff check mymodule.py

# Check the entire project
ruff check .

# Check and auto-fix fixable issues
ruff check --fix .

Example output for a project with issues:

src/utils.py:5:1: F401 [*] `pathlib.Path` imported but unused
src/utils.py:22:80: E501 Line too long (92 > 88 characters)
src/api.py:14:5: F841 Local variable `response` is assigned to but never used
src/api.py:31:1: I001 [*] Import block is unsorted
Found 4 errors.
[*] 3 fixable with the `--fix` option.

The output format is file:line:col: CODE message. The [*] marker means Ruff can fix it automatically. Running ruff check --fix . will remove the unused import, sort the imports, and leave the non-fixable issues (like the long line and the assigned-but-unused variable) for you to address manually.

Selecting Specific Rule Sets

You can extend or restrict which rules Ruff applies using --select and --ignore. This is where the prefix system becomes powerful:

# rule_selection.sh
# Run only import-related rules (isort equivalent)
ruff check --select I .

# Run pyflakes + import rules
ruff check --select F,I .

# Run all rules but ignore line-length
ruff check --select ALL --ignore E501 .

Output for import-only check:

src/models.py:3:1: I001 [*] Import block is unsorted
Found 1 error.
[*] 1 fixable with the `--fix` option.

The --select ALL flag enables every rule Ruff knows about — this is very strict and usually requires several --ignore flags to tune. Most teams start with a reasonable subset and expand from there.

Ruff catching lint errors
F401, E501, I001 — each one a tiny debt you didn’t notice accumulating.

Formatting with ruff format

The ruff format command is Ruff’s black-compatible formatter. It enforces consistent style: double quotes, trailing commas, line length, and whitespace. It was designed to be a drop-in replacement for black — which means if your team already uses black, switching to ruff format produces identical (or near-identical) output.

Formatting Files

Run the formatter on a file or directory. Use --check in CI to verify formatting without modifying files:

# format_files.sh
# Format all Python files in the project
ruff format .

# Check if files need formatting (exit code 1 if they do)
ruff format --check .

# Preview what would change without writing
ruff format --diff .

Output of ruff format –diff mymodule.py:

--- mymodule.py
+++ mymodule.py
@@ -1,5 +1,5 @@
 def greet(name):
-    message = 'Hello, ' + name
+    message = "Hello, " + name
     return message

The diff shows exactly what would change — single quotes converted to double, consistent with black’s rules. The --check flag is what you use in CI: it returns exit code 0 if everything is formatted correctly, or exit code 1 if any file needs changes. Your CI pipeline can fail fast on unformatted code without needing to know anything else about the project.

Configuring Ruff

Ruff reads configuration from pyproject.toml, ruff.toml, or .ruff.toml. The pyproject.toml approach is most common because it keeps all your project config in one file alongside your build system, pytest settings, and mypy config.

pyproject.toml Configuration

Here is a complete, production-ready Ruff configuration that covers most projects. Each section is commented to explain the intent:

# pyproject.toml
[tool.ruff]
# Target Python version -- affects which syntax is valid
target-version = "py311"

# Maximum line length (88 matches black's default)
line-length = 88

[tool.ruff.lint]
# Enable pyflakes (F), pycodestyle errors (E), isort (I),
# pyupgrade (UP), and flake8-bugbear (B)
select = ["E", "F", "I", "UP", "B"]

# Ignore line-too-long (E501) since the formatter handles it,
# and ignore shadowing builtins warning which is too noisy
ignore = ["E501", "A001", "A002"]

# Allow auto-fix for all enabled rules
fixable = ["ALL"]

# Don't fix these -- they need human review
unfixable = ["F841"]  # unused variables -- delete or use them

[tool.ruff.lint.isort]
# Group imports: stdlib, third-party, first-party
known-first-party = ["mypackage"]

[tool.ruff.format]
# Use double quotes everywhere (black-compatible)
quote-style = "double"

# Use spaces, not tabs
indent-style = "space"

With this in pyproject.toml, running ruff check . and ruff format . from the project root will use these settings automatically — no flags needed. The select list here enables five rule families: pyflakes catches logic errors, pycodestyle catches style violations, isort keeps imports sorted, pyupgrade modernizes syntax (like Union[X, Y] to X | Y), and bugbear catches subtle bugs that flake8 misses.

Ruff configuration
One pyproject.toml to rule flake8, black, isort, and pyupgrade. Finally.

Pre-Commit Hook Integration

The most effective way to use Ruff is as a pre-commit hook. This runs Ruff automatically before every commit, catching issues before they enter version control. You’ll never see a CI failure for a missing import sort again.

Setting Up the Hook

First install the pre-commit framework if you haven’t already, then add Ruff to your .pre-commit-config.yaml:

# setup_precommit.sh
pip install pre-commit
pre-commit install
# .pre-commit-config.yaml
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.4.0
    hooks:
      # Run the linter and auto-fix fixable issues
      - id: ruff
        args: [--fix]
      # Run the formatter
      - id: ruff-format

Now every time you run git commit, Ruff will lint and format your staged files first. If Ruff makes any changes (like sorting imports), the commit is blocked and you see what changed. You add the changed files and commit again — a two-second feedback loop that trains good habits faster than any code review comment.

# test_precommit.sh (simulated output when a commit has issues)
git commit -m "add new feature"

Output:

ruff.....................................................................Failed
- hook id: ruff
- exit code: 1
- files were modified by this hook

src/newfeature.py

Ruff modified the file (fixed the imports). Stage the change and commit again. This takes under a second.

Understanding Common Rule Codes

Once Ruff starts flagging issues, you’ll want to understand what the codes mean so you can decide whether to fix or ignore them. Here are the most common ones you’ll encounter:

CodeCategoryMeaningAuto-fixable?
F401pyflakesImported but unused moduleYes
F841pyflakesLocal variable assigned but never usedNo
E501pycodestyleLine too longNo (formatter handles it)
E711pycodestyleComparison to None (use is None)Yes
I001isortImport block is unsortedYes
UP007pyupgradeUse X | Y instead of Union[X, Y]Yes
B006bugbearMutable default argument (dangerous pattern)No
B007bugbearLoop control variable not used in loop bodyYes

The B006 rule is worth highlighting — mutable default arguments are one of Python’s most common gotchas. A function like def add_item(lst=[]): shares the list across all calls, which leads to baffling bugs. Ruff flags it but won’t auto-fix it because the correct fix depends on your intent.

Mutable default argument bug
def add_item(lst=[]): — a bug that only appears in production. B006 would have caught it.

Real-Life Example: Full Project Lint and Format Workflow

Let’s put this all together with a realistic workflow for a small Python project. We have a package with a few modules that haven’t been linted in a while and need a cleanup pass before merging to main.

# project_cleanup.py
"""Simulate a realistic Ruff cleanup workflow using subprocess."""
import subprocess
import sys
from pathlib import Path


def run_ruff_check(project_path: str) -> dict:
    """Run ruff check and return structured results."""
    result = subprocess.run(
        ["ruff", "check", "--output-format=json", project_path],
        capture_output=True,
        text=True,
    )
    issues = []
    if result.stdout.strip():
        import json
        issues = json.loads(result.stdout)
    return {
        "exit_code": result.returncode,
        "issue_count": len(issues),
        "issues": issues[:5],  # first 5 for display
    }


def run_ruff_format_check(project_path: str) -> dict:
    """Check if files need formatting (no changes made)."""
    result = subprocess.run(
        ["ruff", "format", "--check", project_path],
        capture_output=True,
        text=True,
    )
    return {
        "needs_formatting": result.returncode != 0,
        "message": result.stderr.strip() or "All files formatted correctly.",
    }


def run_ruff_fix(project_path: str) -> str:
    """Auto-fix all fixable issues and format."""
    subprocess.run(["ruff", "check", "--fix", project_path], capture_output=True)
    subprocess.run(["ruff", "format", project_path], capture_output=True)
    return "Auto-fix and format complete."


def main():
    path = sys.argv[1] if len(sys.argv) > 1 else "."
    print(f"Checking project: {path}")
    print()

    fmt = run_ruff_format_check(path)
    print(f"Formatting: {'NEEDS CHANGES' if fmt['needs_formatting'] else 'OK'}")
    print(f"  {fmt['message']}")
    print()

    lint = run_ruff_check(path)
    print(f"Linting: {lint['issue_count']} issue(s) found")
    for issue in lint["issues"]:
        loc = f"{issue['filename']}:{issue['location']['row']}"
        print(f"  {loc}: {issue['code']} {issue['message']}")

    if lint["issue_count"] > 0 or fmt["needs_formatting"]:
        print()
        print(run_ruff_fix(path))


if __name__ == "__main__":
    main()

Output (example run on a messy project):

Checking project: src/

Formatting: NEEDS CHANGES
  1 file would be reformatted

Linting: 3 issue(s) found
  src/utils.py:4: F401 `json` imported but unused
  src/api.py:17: I001 Import block is unsorted
  src/api.py:44: B006 Do not use mutable data structures for argument defaults

Auto-fix and format complete.

This script wraps Ruff in a structured workflow you can call from other tools — a Makefile target, a CI step, or a custom dev script. The --output-format=json flag makes the output machine-readable so you can parse, filter, or report on it programmatically. You could extend this to only fail on specific rule categories, or to post a summary to Slack when the lint score degrades.

Frequently Asked Questions

Can Ruff fully replace black?

Yes, for almost all projects. ruff format was designed to be black-compatible and produces identical output for the vast majority of code. The Ruff team maintains a compatibility document listing the small number of edge-case differences. If you’re starting a new project, use ruff format from day one. If you’re migrating an existing project that uses black, run both tools on your codebase and diff the output to verify there are no surprises before removing black from your pipeline.

How much faster is Ruff than flake8?

The commonly cited figure is 10-100x faster, and in practice you’ll feel it. On a codebase of 50,000 lines, flake8 might take 8-10 seconds; Ruff takes under 200 milliseconds. For pre-commit hooks this makes the difference between a hook that feels instant and one that makes you regret enabling it. The speed comes from Ruff being written in Rust and processing all files in parallel across CPU cores.

How do I ignore a specific line?

Add a # noqa: CODE comment at the end of the line. For example, to ignore an unused import on line 5: import json # noqa: F401. You can also use # noqa (no code) to suppress all rules on that line, but specifying the code is better practice because it documents why you’re suppressing and prevents accidentally hiding unrelated new issues.

Can I ignore rules for specific files?

Yes. Use [tool.ruff.lint.per-file-ignores] in pyproject.toml. A common pattern is to ignore F401 (unused imports) in __init__.py files, since those files often import symbols to make them available in the package namespace: "__init__.py" = ["F401"]. You can also ignore S (security) rules in test files where you’d legitimately use assert statements and hardcoded passwords in fixtures.

How do I use Ruff in VS Code?

Install the official Ruff extension from the VS Code marketplace (search for “Ruff” by Astral). Once installed, it shows lint violations inline as you type and can auto-fix on save. Add these settings to your settings.json: "editor.formatOnSave": true and "[python]": { "editor.defaultFormatter": "charliermarsh.ruff" }. This replaces the Pylance formatter and black extension with a single, faster tool.

Conclusion

Ruff replaces your entire Python linting and formatting stack — flake8, black, isort, pyupgrade — with a single tool that runs in milliseconds. We covered installation with pip and uv, running ruff check for linting with --fix for auto-remediation, ruff format for black-compatible formatting, and configuring everything in pyproject.toml. The pre-commit hook integration is the most impactful single change you can make to your dev workflow today.

The natural next step is to add Ruff to your CI pipeline alongside the pre-commit hook. In GitHub Actions, adding ruff check . && ruff format --check . to your test job costs almost nothing in CI time and prevents style and logic issues from reaching main. From there, experiment with stricter rule sets — enable the B (bugbear) rules and see what they catch in your codebase. You might be surprised.

For the full rule reference, see the official Ruff rules documentation. For migration guides from flake8 and black, check the migration guide.

How To Build and Publish a Python Package to PyPI

How To Build and Publish a Python Package to PyPI

Intermediate

You’ve written a Python utility that you keep copying between projects — a helper library, a CLI tool, a set of data processing functions — and you realize it’s time to stop copying files and start sharing properly. Publishing to PyPI (the Python Package Index) turns your code into something anyone can install with pip install your-package. It sounds intimidating, but the modern packaging workflow with pyproject.toml and the build + twine tools has reduced it to about ten minutes of setup.

PyPI is the official package repository at pypi.org. When you run pip install requests, pip downloads from there. TestPyPI at test.pypi.org is a separate sandbox for practice publishing — you can publish there first to verify everything works without affecting the real registry. You’ll need Python 3.7+, pip install build twine, and free accounts on both PyPI and TestPyPI.

In this tutorial you’ll learn the modern packaging structure using pyproject.toml, how to write a proper pyproject.toml with all required metadata, how to build source and wheel distributions, how to publish to TestPyPI and then PyPI, how to handle versioning, and how to add entry points for CLI commands. By the end you’ll have a complete, installable Python package.

Publishing a Python Package: Quick Example

Here’s the minimum structure and commands needed to go from code to pip install:

# Project structure:
# my_package/
#   pyproject.toml
#   README.md
#   src/
#     my_package/
#       __init__.py
#       core.py

# pyproject.toml (minimum required)
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.backends.legacy:build"

[project]
name = "my-package"
version = "0.1.0"
description = "A short description of what it does"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}

# Build and publish:
# pip install build twine
# python -m build
# twine upload dist/*

After running these commands:

$ python -m build
* Creating venv isolated environment...
* Installing packages in isolated environment... (setuptools)
* Getting build dependencies for sdist...
* Building sdist...
* Building wheel from sdist
Successfully built my_package-0.1.0.tar.gz and my_package-0.1.0-py3-none-any.whl

Running python -m build creates two files in the dist/ directory: a source distribution (.tar.gz) and a wheel (.whl). The wheel is what pip installs by default — it’s a pre-built zip archive. twine upload dist/* uploads both to PyPI. The sections below walk through the complete workflow with a real, working example package.

Modern Package Structure

The “src layout” is the current best practice for Python packages. It places your actual package code inside a src/ directory, which prevents common import-order bugs where Python imports your local code instead of the installed package during testing.

File/Directory Purpose Required?
pyproject.toml All project metadata and build config Yes
README.md Project description (shown on PyPI) Strongly recommended
LICENSE License text (MIT, Apache, etc.) Recommended
src/mypackage/__init__.py Makes the directory a package Yes
src/mypackage/module.py Your actual code Yes
tests/ Test files (NOT inside src/) Recommended
.gitignore Excludes dist/, *.egg-info, __pycache__ Recommended

The name in PyPI (name = "my-package") is the installable name (pip install my-package). The import name (import mypackage) is the directory name inside src/. These can differ — for example, the Pillow package is installed as pip install Pillow but imported as from PIL import Image.

Sudo Sam at assembly line
pyproject.toml replaced setup.py. setup.cfg after that. Now stop asking.

Writing pyproject.toml

The pyproject.toml file contains all project configuration in one place. Here’s a comprehensive example showing all the important fields:

# pyproject.toml -- complete example
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.backends.legacy:build"

[project]
name = "textclean"
version = "1.0.0"
description = "A utility library for cleaning and normalizing text data"
readme = "README.md"
license = {file = "LICENSE"}
authors = [
    {name = "Jane Developer", email = "jane@example.com"}
]
keywords = ["text", "nlp", "cleaning", "normalization"]
classifiers = [
    "Development Status :: 5 - Production/Stable",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.8",
    "Programming Language :: Python :: 3.9",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Topic :: Text Processing :: Linguistic",
]
requires-python = ">=3.8"

# Runtime dependencies
dependencies = [
    "regex>=2023.0",
]

# Optional dependency groups
[project.optional-dependencies]
dev = ["pytest>=7.0", "black", "ruff"]
docs = ["sphinx>=6.0", "furo"]

# CLI entry points: "textclean" command -> textclean.cli:main function
[project.scripts]
textclean = "textclean.cli:main"

# Project URLs shown on PyPI sidebar
[project.urls]
Homepage = "https://github.com/janedeveloper/textclean"
Documentation = "https://textclean.readthedocs.io"
Issues = "https://github.com/janedeveloper/textclean/issues"
Changelog = "https://github.com/janedeveloper/textclean/CHANGELOG.md"

# Tell setuptools where the packages are
[tool.setuptools.packages.find]
where = ["src"]

The classifiers field is important — PyPI uses them to categorize and filter packages, and they appear as browseable tags on your package page. The full list is at pypi.org/classifiers. The [project.scripts] section creates a textclean command-line tool after installation, calling the main() function in src/textclean/cli.py.

Building the Distributions

Before publishing, you need to build the distribution files. Let’s create a real working package to demonstrate:

# Create the complete package structure
mkdir -p textclean/src/textclean
mkdir -p textclean/tests

# src/textclean/__init__.py
cat > textclean/src/textclean/__init__.py << 'PYEOF'
"""textclean: Text cleaning and normalization utilities."""
__version__ = "1.0.0"
from .core import clean_text, remove_html, normalize_whitespace

__all__ = ["clean_text", "remove_html", "normalize_whitespace"]
PYEOF

# src/textclean/core.py
cat > textclean/src/textclean/core.py << 'PYEOF'
"""Core text cleaning functions."""
import re
import unicodedata

def remove_html(text: str) -> str:
    """Remove HTML tags from text."""
    return re.sub(r'<[^>]+>', '', text)

def normalize_whitespace(text: str) -> str:
    """Collapse multiple spaces and strip leading/trailing whitespace."""
    return re.sub(r'\s+', ' ', text).strip()

def clean_text(text: str, remove_html_tags: bool = True) -> str:
    """Apply standard cleaning pipeline to text."""
    if remove_html_tags:
        text = remove_html(text)
    text = normalize_whitespace(text)
    return text
PYEOF

# src/textclean/cli.py
cat > textclean/src/textclean/cli.py << 'PYEOF'
"""Command-line interface for textclean."""
import sys
from .core import clean_text

def main() -> None:
    """Read text from stdin, clean it, print to stdout."""
    if sys.stdin.isatty():
        print("Usage: echo 'text' | textclean", file=sys.stderr)
        sys.exit(1)
    text = sys.stdin.read()
    print(clean_text(text))
PYEOF

Then build:

# Install build tool
pip install build

# Build both sdist and wheel
cd textclean
python -m build

# Result:
dist/
  textclean-1.0.0.tar.gz     # source distribution
  textclean-1.0.0-py3-none-any.whl  # wheel (installable)

The py3-none-any in the wheel filename means: Python 3, no ABI dependency (pure Python), any platform. If your package includes C extensions compiled with Cython or uses platform-specific code, the wheel name will include the Python version, ABI, and platform (e.g., cp311-cp311-linux_x86_64).

Loop Larry pushing wheelbarrow
Semantic versioning: MAJOR.MINOR.PATCH. Breaking changes cost a MAJOR bump.

Publishing to TestPyPI and PyPI

Always publish to TestPyPI first to verify your package uploads and installs correctly before pushing to production PyPI.

# Step 1: Create accounts
# TestPyPI: https://test.pypi.org/account/register/
# PyPI:     https://pypi.org/account/register/
# Both require email verification.

# Step 2: Create API tokens (recommended over passwords)
# TestPyPI: Account Settings -> API tokens -> Add API token
# PyPI:     Account Settings -> API tokens -> Add API token
# Scope: "Entire account" for a new package (no project scope yet)

# Step 3: Install twine
pip install twine

# Step 4: Check your distributions for common problems
twine check dist/*
# Expected output:
# Checking dist/textclean-1.0.0-py3-none-any.whl: PASSED
# Checking dist/textclean-1.0.0.tar.gz: PASSED

# Step 5: Upload to TestPyPI
twine upload --repository testpypi dist/*
# You'll be prompted for username and password
# Username: __token__
# Password: pypi-AgEIcHlwaS5...  (your TestPyPI API token)

# Step 6: Verify it installs from TestPyPI
pip install --index-url https://test.pypi.org/simple/ textclean
python -c "from textclean import clean_text; print(clean_text('Hello  world'))"
# Output: Hello world

# Step 7: If all good, upload to real PyPI
twine upload dist/*
# Username: __token__
# Password: pypi-...  (your PyPI API token, different from TestPyPI)

# Step 8: Verify from real PyPI
pip install textclean

Store API tokens in ~/.pypirc so you don’t have to paste them on every upload:

# ~/.pypirc
[distutils]
index-servers =
    pypi
    testpypi

[pypi]
repository = https://upload.pypi.org/legacy/
username = __token__
password = pypi-AgEI...  (your PyPI token)

[testpypi]
repository = https://test.pypi.org/legacy/
username = __token__
password = pypi-AgEI...  (your TestPyPI token)

With .pypirc configured, twine upload dist/* uses the stored credentials automatically. On macOS and Linux, set chmod 600 ~/.pypirc to restrict read access to your user only.

Real-Life Example: CLI Package with Version Bumping

A complete workflow for building, versioning, and publishing a CLI tool — demonstrating how real projects handle the release cycle.

API Alice at release console
python -m build && twine upload. That’s the entire release pipeline.
# release.py -- automated release script
"""
Automates the release workflow:
1. Bumps version in pyproject.toml
2. Runs tests
3. Builds distribution
4. Uploads to PyPI
"""
import re
import subprocess
import sys
from pathlib import Path

def read_version(pyproject_path: Path) -> str:
    content = pyproject_path.read_text()
    match = re.search(r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE)
    if not match:
        raise ValueError("version not found in pyproject.toml")
    return match.group(1)

def bump_version(version: str, bump_type: str) -> str:
    major, minor, patch = map(int, version.split("."))
    if bump_type == "major":
        return f"{major + 1}.0.0"
    elif bump_type == "minor":
        return f"{major}.{minor + 1}.0"
    elif bump_type == "patch":
        return f"{major}.{minor}.{patch + 1}"
    raise ValueError(f"Invalid bump_type: {bump_type}")

def update_version(pyproject_path: Path, new_version: str) -> None:
    content = pyproject_path.read_text()
    updated = re.sub(
        r'^(version\s*=\s*)"[^"]+"',
        f'\\1"{new_version}"',
        content,
        flags=re.MULTILINE
    )
    pyproject_path.write_text(updated)
    print(f"Updated version to {new_version}")

def run(cmd: list[str]) -> int:
    """Run a command and print output. Returns exit code."""
    print(f"$ {' '.join(cmd)}")
    result = subprocess.run(cmd)
    return result.returncode

def release(bump_type: str = "patch", test_only: bool = True) -> None:
    pyproject = Path("pyproject.toml")
    current = read_version(pyproject)
    new_ver = bump_version(current, bump_type)
    print(f"Releasing: {current} -> {new_ver}")

    # Bump version
    update_version(pyproject, new_ver)

    # Run tests
    if run(["python", "-m", "pytest", "tests/", "-q"]) != 0:
        print("Tests failed -- aborting release")
        update_version(pyproject, current)  # rollback
        sys.exit(1)

    # Clean previous builds
    import shutil
    for d in ["dist", "build"]:
        if Path(d).exists():
            shutil.rmtree(d)

    # Build
    if run(["python", "-m", "build"]) != 0:
        print("Build failed")
        sys.exit(1)

    # Upload (to TestPyPI if test_only, real PyPI otherwise)
    repo = "testpypi" if test_only else "pypi"
    if run(["twine", "upload", f"--repository={repo}", "dist/*"]) != 0:
        print("Upload failed")
        sys.exit(1)

    print(f"\nReleased {new_ver} to {'TestPyPI' if test_only else 'PyPI'}")

if __name__ == "__main__":
    bump = sys.argv[1] if len(sys.argv) > 1 else "patch"
    test = "--prod" not in sys.argv
    release(bump_type=bump, test_only=test)

# Usage:
# python release.py patch       # patch release to TestPyPI
# python release.py minor       # minor release to TestPyPI
# python release.py patch --prod  # patch release to real PyPI

This release script encodes your entire release workflow. It prevents human error (forgetting to bump the version, forgetting to run tests) and makes releases reproducible. Add it to your project alongside a CHANGELOG.md and a GitHub Actions workflow that runs it automatically when you tag a release.

Frequently Asked Questions

Should I still use setup.py?

setup.py is the legacy approach from before pyproject.toml was standardized (PEP 517/518). As of 2024, pyproject.toml is the recommended standard for all new packages. Running python setup.py install directly is deprecated and was removed in pip 23.1. You can have a minimal setup.py for backward compatibility with very old tools, but all configuration should live in pyproject.toml. If you have an existing package with setup.py, migrate it to pyproject.toml.

How should I version my package?

Follow Semantic Versioning (SemVer): MAJOR.MINOR.PATCH. Increment MAJOR for breaking changes (removing or changing an API), MINOR for new features that are backward-compatible, PATCH for bug fixes. Start at 0.1.0 for initial development — the 0.x series signals that the API is not yet stable. Release 1.0.0 when the API stabilizes. Never reuse version numbers — once 1.0.0 is on PyPI, it’s permanent. Delete a broken release and publish 1.0.1 instead.

Can I publish a private package?

Yes — several options exist: use a private PyPI server like devpi or Nexus, use GitHub Packages as a pip registry, or use AWS CodeArtifact. You can also install directly from a private Git repository: pip install git+https://github.com/org/private-repo.git@v1.0.0. For internal company packages, a private registry with authentication is the proper approach rather than publishing to public PyPI.

What is the difference between a wheel and an sdist?

An sdist (source distribution) is a .tar.gz containing your source code plus a PKG-INFO file. Installing from an sdist requires building, which means the user needs a C compiler for packages with C extensions. A wheel (.whl) is a pre-built zip archive that pip installs directly without building. Pure-Python packages produce a single wheel tagged py3-none-any that works everywhere. C-extension packages need multiple wheels for different Python versions and platforms — this is handled by the cibuildwheel tool in CI.

What if my package name is already taken on PyPI?

PyPI package names are globally unique and case-insensitive (hyphens and underscores are equivalent). If your desired name is taken, check if the existing package is actively maintained — if it’s abandoned, you can file a name claim via the PyPI support form. Otherwise, choose a different name: add a prefix (your username, organization, or project namespace) or pick a more specific name. Use pip search my-name or browse pypi.org to check availability before starting development.

Conclusion

Publishing a Python package to PyPI requires four things: a proper pyproject.toml with metadata, a well-structured src/ layout, the build tool to create distributions, and twine to upload them. Always test on TestPyPI first. Use API tokens instead of passwords. Follow SemVer for version numbers. The release script above is a complete automation of this workflow — adapt it to your project’s needs and wire it into your CI/CD pipeline.

Once your package is on PyPI, keep it healthy: respond to issues, bump dependencies, and maintain a changelog. A well-maintained package builds trust with users and contributors.

Official documentation: https://packaging.python.org/en/latest/tutorials/packaging-projects/

How To Use Python shutil for File Management and Archives

How To Use Python shutil for File Management and Archives

Beginner

Python’s os module handles file paths and directory listings, but when you need to actually copy a file, move a directory tree, delete an entire folder with its contents, or create a zip archive of a project — that’s shutil. Short for “shell utilities,” the shutil module provides high-level file operations that go beyond what os offers. It’s the Python equivalent of shell commands like cp -r, mv, rm -rf, and zip.

shutil is part of Python’s standard library — no installation needed. It works identically on Windows, macOS, and Linux, abstracting away the platform differences in file permissions, symlink handling, and archive formats. Combined with pathlib for paths, it covers virtually every file management task a Python script needs.

In this tutorial you’ll learn how to copy files and directories with shutil.copy(), shutil.copy2(), and shutil.copytree(), move files with shutil.move(), delete directories with shutil.rmtree(), create and extract archives with shutil.make_archive() and shutil.unpack_archive(), and check disk usage with shutil.disk_usage(). By the end you’ll have a complete file management toolkit using only the standard library.

Python shutil: Quick Example

Here’s shutil solving three common file tasks in a few lines:

# shutil_quick.py
import shutil
import os

# Create test files
os.makedirs("source_dir/subdir", exist_ok=True)
with open("source_dir/report.txt", "w") as f:
    f.write("Monthly report data\n")
with open("source_dir/subdir/data.csv", "w") as f:
    f.write("col1,col2\n1,2\n")

# 1. Copy a single file
shutil.copy("source_dir/report.txt", "report_backup.txt")
print("Copied: report_backup.txt exists:", os.path.exists("report_backup.txt"))

# 2. Copy entire directory tree
shutil.copytree("source_dir", "source_dir_backup")
print("Copied tree:", os.listdir("source_dir_backup"))

# 3. Create a zip archive
archive = shutil.make_archive("project_backup", "zip", "source_dir")
print("Archive created:", archive)
print("Archive size:", os.path.getsize(archive), "bytes")

# Cleanup
shutil.rmtree("source_dir")
shutil.rmtree("source_dir_backup")
os.remove("report_backup.txt")
os.remove("project_backup.zip")
print("Cleaned up all test files")

Output:

Copied: report_backup.txt exists: True
Copied tree: ['report.txt', 'subdir']
Archive created: /path/to/project_backup.zip
Archive size: 412 bytes
Cleaned up all test files

shutil.copy() copies the file content and permissions but not metadata (timestamps). shutil.copy2() also preserves the original file’s modification time — use it when preserving timestamps matters. shutil.copytree() recursively copies an entire directory. shutil.rmtree() deletes a directory and all its contents — there is no undo, so use it carefully.

What Is shutil and What Does It Cover?

The shutil module fills the gap between os (low-level OS interface) and shell scripts (file manipulation commands). It handles the most common file management operations in a cross-platform, Pythonic way.

Task shutil function Shell equivalent
Copy file shutil.copy(src, dst) cp src dst
Copy file + metadata shutil.copy2(src, dst) cp -p src dst
Copy directory shutil.copytree(src, dst) cp -r src dst
Move file/dir shutil.move(src, dst) mv src dst
Delete directory shutil.rmtree(path) rm -rf path
Create archive shutil.make_archive(name, fmt, dir) zip -r name.zip dir
Extract archive shutil.unpack_archive(file, dst) unzip file -d dst
Check disk space shutil.disk_usage(path) df path

The key difference from shell scripts: shutil is cross-platform. The same Python code runs correctly on Windows (which uses backslashes and has different permission semantics) and Unix (which uses forward slashes and POSIX permissions) without modification.

Sudo Sam moving file folders
shutil.move() works across filesystems. os.rename() does not. Know the difference.

Copying Files: copy, copy2, and copyfileobj

There are several copy functions in shutil, each with different behavior around metadata and permissions. Choosing the right one prevents subtle bugs when file timestamps or permissions matter.

# shutil_copy.py
import shutil
import os
import stat
import time

# Create a source file with specific content
with open("original.txt", "w") as f:
    f.write("This is the original file content.\n" * 100)

# Wait to make the modification time clearly different
time.sleep(0.1)

# shutil.copy() -- copies content and permissions, NOT timestamps
shutil.copy("original.txt", "copy_basic.txt")

# shutil.copy2() -- copies content, permissions, AND timestamps
shutil.copy2("original.txt", "copy_with_meta.txt")

# shutil.copyfile() -- copies ONLY content, no permissions
shutil.copyfile("original.txt", "copy_content_only.txt")

# Compare timestamps
orig_mtime = os.path.getmtime("original.txt")
basic_mtime = os.path.getmtime("copy_basic.txt")
meta_mtime = os.path.getmtime("copy_with_meta.txt")

print(f"Original mtime:    {orig_mtime:.3f}")
print(f"copy() mtime:      {basic_mtime:.3f} (different -- new file)")
print(f"copy2() mtime:     {meta_mtime:.3f} (same as original: {abs(orig_mtime - meta_mtime) < 0.01})")

# Copy into a directory (destination is a dir, not a file)
os.makedirs("backup_dir", exist_ok=True)
result = shutil.copy2("original.txt", "backup_dir/")  # trailing slash = copy into dir
print(f"\nCopied to directory: {result}")

# Copy with permissions
os.chmod("original.txt", 0o644)
shutil.copy("original.txt", "copy_perms.txt")
print(f"Source perms: {oct(stat.S_IMODE(os.stat('original.txt').st_mode))}")
print(f"copy() perms: {oct(stat.S_IMODE(os.stat('copy_perms.txt').st_mode))}")

# Cleanup
for f in ["original.txt", "copy_basic.txt", "copy_with_meta.txt",
          "copy_content_only.txt", "copy_perms.txt"]:
    os.remove(f)
shutil.rmtree("backup_dir")

Output:

Original mtime:    1745456234.125
copy() mtime:      1745456234.235 (different -- new file)
copy2() mtime:     1745456234.125 (same as original: True)

Copied to directory: backup_dir/original.txt
Source perms: 0o644
copy() perms: 0o644

Use copy2() when you're making a backup and want to preserve when the file was last modified. Use copy() for general copying where timestamps don't matter. Use copyfile() when you only need the bytes and want to skip permission copying entirely. If the destination is a directory (with or without trailing slash), shutil places the file inside that directory using the original filename.

Copying and Deleting Directory Trees

shutil.copytree() recursively copies an entire directory structure. In Python 3.8+, the dirs_exist_ok=True parameter lets you copy into an existing destination directory instead of requiring it to not exist.

# shutil_trees.py
import shutil
import os
from pathlib import Path

# Create a source directory structure
src = Path("project_src")
(src / "src" / "utils").mkdir(parents=True, exist_ok=True)
(src / "tests").mkdir(exist_ok=True)
(src / "src" / "main.py").write_text("# main module\n")
(src / "src" / "utils" / "helpers.py").write_text("# helpers\n")
(src / "tests" / "test_main.py").write_text("# tests\n")
(src / "README.md").write_text("# Project\n")
(src / ".env").write_text("SECRET=abc123\n")  # should be excluded

# Basic copytree
shutil.copytree("project_src", "project_dst")
print("Full copy:")
for p in sorted(Path("project_dst").rglob("*")):
    print(f"  {p.relative_to('project_dst')}")

shutil.rmtree("project_dst")

# copytree with ignore pattern -- exclude .env files and __pycache__
ignore_patterns = shutil.ignore_patterns(".env", "__pycache__", "*.pyc")
shutil.copytree("project_src", "project_dst_clean", ignore=ignore_patterns)
print("\nCopy without .env:")
for p in sorted(Path("project_dst_clean").rglob("*")):
    print(f"  {p.relative_to('project_dst_clean')}")

shutil.rmtree("project_dst_clean")

# copytree with dirs_exist_ok (Python 3.8+)
os.makedirs("project_merge/existing_file.txt", exist_ok=False)
Path("project_merge/existing_file.txt").write_text("existing\n") if Path("project_merge/existing_file.txt").is_dir() else None
os.makedirs("project_merge", exist_ok=True)
shutil.copytree("project_src", "project_merge", dirs_exist_ok=True)
print("\nMerge into existing dir: OK")

# rmtree with error handler (handle read-only files on Windows)
def handle_readonly(func, path, exc_info):
    """Remove read-only flag and retry on Windows."""
    import stat
    os.chmod(path, stat.S_IWRITE)
    func(path)

shutil.rmtree("project_src")
shutil.rmtree("project_merge")
print("Cleanup done")

Output:

Full copy:
  .env
  README.md
  src
  src/main.py
  src/utils
  src/utils/helpers.py
  tests
  tests/test_main.py
Copy without .env:
  README.md
  src
  src/main.py
  src/utils
  src/utils/helpers.py
  tests
  tests/test_main.py
Merge into existing dir: OK
Cleanup done

shutil.ignore_patterns() returns a callable that you pass to copytree(ignore=...). It accepts glob patterns and is the standard way to exclude files like .env, .git, __pycache__, and *.pyc from copies. The handle_readonly error handler pattern is important on Windows, where files created by Git or certain tools are marked read-only and rmtree will fail without it.

Debug Dee zipping files
shutil.make_archive() handles zip, tar, gztar, bztar, xztar. Pick your poison.

Creating and Extracting Archives

shutil.make_archive() creates zip, tar, gzip, bzip2, or xz archives. shutil.unpack_archive() automatically detects and extracts any supported format. Both work without needing to import zipfile or tarfile directly.

# shutil_archives.py
import shutil
import os
from pathlib import Path

# Create sample project to archive
project = Path("sample_project")
project.mkdir(exist_ok=True)
(project / "main.py").write_text("print('hello')\n")
(project / "data").mkdir(exist_ok=True)
(project / "data" / "config.json").write_text('{"version": "1.0"}\n')

# Supported formats
print("Supported archive formats:")
for fmt, description in shutil.get_archive_formats():
    print(f"  {fmt}: {description}")

# Create archives in different formats
formats = [
    ("project_backup", "zip"),
    ("project_backup", "gztar"),  # .tar.gz
]

for name, fmt in formats:
    archive_path = shutil.make_archive(
        base_name=name,        # archive file name (without extension)
        format=fmt,            # zip, tar, gztar, bztar, xztar
        root_dir=".",          # directory to change to before archiving
        base_dir="sample_project"  # directory to archive
    )
    size_kb = os.path.getsize(archive_path) / 1024
    print(f"Created {archive_path} ({size_kb:.1f} KB)")

# Extract an archive
os.makedirs("extracted", exist_ok=True)
shutil.unpack_archive("project_backup.zip", "extracted")
print("\nExtracted contents:")
for p in sorted(Path("extracted").rglob("*")):
    print(f"  {p.relative_to('extracted')}")

# Check disk usage
usage = shutil.disk_usage(".")
print(f"\nDisk usage:")
print(f"  Total: {usage.total / (1024**3):.1f} GB")
print(f"  Used:  {usage.used / (1024**3):.1f} GB")
print(f"  Free:  {usage.free / (1024**3):.1f} GB")

# Cleanup
shutil.rmtree("sample_project")
shutil.rmtree("extracted")
for ext in [".zip", ".tar.gz"]:
    path = f"project_backup{ext}"
    if os.path.exists(path):
        os.remove(path)

Output:

Supported archive formats:
  bztar: bzip2'ed tar-file
  gztar: gzip'ed tar-file
  tar: uncompressed tar file
  xztar: xz'ed tar-file
  zip: ZIP file

Created project_backup.zip (0.7 KB)
Created project_backup.tar.gz (0.3 KB)

Extracted contents:
  sample_project
  sample_project/data
  sample_project/data/config.json
  sample_project/main.py

Disk usage:
  Total: 465.8 GB
  Used:  112.4 GB
  Free:  353.4 GB

shutil.make_archive() handles all the compression and format details internally. The root_dir/base_dir split controls the archive's internal structure: root_dir is where the archiver "cd"s to before running, and base_dir is the directory to include. This means paths inside the archive will be relative (e.g., sample_project/main.py instead of an absolute path).

Real-Life Example: Automated Backup Script

A backup utility that copies a project directory, creates a timestamped archive, manages retention (keeps only the last N backups), and reports disk usage.

Pyro Pete stamping files
Three lines of shutil replace fifty lines of shell script. And it runs on Windows.
# backup_manager.py
import shutil
import os
from pathlib import Path
from datetime import datetime

def create_backup(
    source_dir: str | Path,
    backup_root: str | Path,
    format: str = "zip",
    keep_last: int = 5
) -> Path:
    """
    Create a timestamped backup archive of source_dir.
    Keeps only the last 'keep_last' backups.
    Returns the path of the new archive.
    """
    source = Path(source_dir)
    backup_dir = Path(backup_root)
    backup_dir.mkdir(parents=True, exist_ok=True)

    if not source.exists():
        raise FileNotFoundError(f"Source directory not found: {source}")

    # Create timestamped archive name
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    archive_name = f"{source.name}_{timestamp}"

    # Create the archive
    archive_base = backup_dir / archive_name
    archive_path = shutil.make_archive(
        base_name=str(archive_base),
        format=format,
        root_dir=str(source.parent),
        base_dir=source.name
    )
    archive_path = Path(archive_path)
    print(f"  Created: {archive_path.name} ({archive_path.stat().st_size / 1024:.1f} KB)")

    # Enforce retention: remove oldest archives beyond keep_last
    ext = ".zip" if format == "zip" else ".tar.gz"
    existing = sorted(backup_dir.glob(f"{source.name}_*{ext}"))
    if len(existing) > keep_last:
        for old_archive in existing[:-keep_last]:
            old_archive.unlink()
            print(f"  Removed old backup: {old_archive.name}")

    return archive_path

def show_backup_status(backup_root: str | Path) -> None:
    """Print all backups and disk usage."""
    backup_dir = Path(backup_root)
    archives = sorted(backup_dir.glob("*"))
    total_size = sum(f.stat().st_size for f in archives)
    usage = shutil.disk_usage(backup_dir)

    print(f"\nBackup directory: {backup_dir}")
    print(f"Archives ({len(archives)}):")
    for a in archives:
        print(f"  {a.name:40s} {a.stat().st_size / 1024:6.1f} KB")
    print(f"Total backup size: {total_size / 1024:.1f} KB")
    print(f"Free disk space:   {usage.free / (1024**3):.1f} GB")

# Demo
import time

# Create a sample project
source = Path("my_project")
source.mkdir(exist_ok=True)
(source / "app.py").write_text("# application\n" * 50)
(source / "config.yaml").write_text("debug: true\n")

print("=== Creating 3 backups with keep_last=2 ===")
backup_dir = Path("backups")

for i in range(3):
    print(f"\nBackup {i+1}:")
    create_backup("my_project", "backups", format="zip", keep_last=2)
    time.sleep(1)  # ensure different timestamps

show_backup_status("backups")

# Cleanup
shutil.rmtree("my_project")
shutil.rmtree("backups")

Output:

=== Creating 3 backups with keep_last=2 ===

Backup 1:
  Created: my_project_20260424_143501.zip (1.2 KB)

Backup 2:
  Created: my_project_20260424_143502.zip (1.2 KB)

Backup 3:
  Created: my_project_20260424_143503.zip (1.2 KB)
  Removed old backup: my_project_20260424_143501.zip

Backup directory: backups
Archives (2):
  my_project_20260424_143502.zip                  1.2 KB
  my_project_20260424_143503.zip                  1.2 KB
Total backup size: 2.4 KB
Free disk space:   353.4 GB

This backup manager is a complete, production-ready script in under 60 lines. Extend it by adding email notifications when disk space drops below a threshold (usage.free < threshold), adding MD5 checksums to verify archive integrity, supporting incremental backups by only including files changed since the last backup, or scheduling it with cron for automated daily backups.

Frequently Asked Questions

When should I use copy() vs copy2()?

Use copy2() when you want the backup to appear as if the files were never modified -- it preserves the original file's modification timestamp. This is essential for backups where you want to track when files were originally changed. Use copy() when the copy timestamp should reflect when the copy was made (e.g., exporting files for distribution). The difference only matters if you check file modification times in your workflow.

How do I safely use rmtree() without accidentally deleting the wrong directory?

Always use an absolute path resolved from Path.resolve() before passing it to rmtree(). Add a check that the path is within an expected parent directory: assert path.is_relative_to(expected_parent). Never pass user input directly to rmtree() without validation. For extra safety, test with dry_run=True equivalent by listing the files first, and consider using a trash library (send2trash) that moves files to the OS trash instead of permanent deletion.

Should I use shutil or pathlib for file operations?

pathlib handles path manipulation, reading, writing, and listing directories. shutil handles copying, moving, and archiving. Use them together: construct paths with pathlib.Path, then pass them to shutil functions. For example, shutil.copy(src_path, dst_path) where both are Path objects works perfectly -- shutil accepts both string paths and Path objects.

How does shutil handle large files?

By default, shutil.copy() and shutil.copy2() use a 16KB buffer to stream files, so they don't load the entire file into memory. For bulk copy operations of large files, shutil.copyfileobj(src_file, dst_file, length=1024*1024) lets you control the buffer size. On Linux, shutil automatically uses efficient file copy system calls (os.sendfile or copy_file_range) that perform kernel-space copying without userspace buffering, giving much better performance for large files.

Does shutil.move() work across filesystems?

Yes -- shutil.move() first tries os.rename(), which is instant for same-filesystem moves (just changes the directory entry). If that fails because source and destination are on different filesystems, it falls back to shutil.copy2() followed by os.unlink(). This makes shutil.move() reliable across filesystems, while os.rename() fails with a OSError for cross-filesystem moves.

Conclusion

The shutil module covers every high-level file operation a Python script needs: copy()/copy2() for files, copytree() with ignore_patterns() for directories, move() for cross-filesystem moves, rmtree() for recursive deletion, make_archive()/unpack_archive() for archives, and disk_usage() for storage monitoring. Combine it with pathlib for clean, cross-platform file management scripts that replace dozens of lines of shell script.

The backup manager above is a production-ready template -- extend it with scheduling, checksums, and notification logic for a complete automated backup solution.

Official documentation: https://docs.python.org/3/library/shutil.html

How To Use Python lru_cache and functools.cache for Memoization

How To Use Python lru_cache and functools.cache for Memoization

Intermediate

Your function computes the same result for the same inputs over and over — a Fibonacci calculator recomputing fib(30) thousands of times, a database lookup called repeatedly with the same user ID, or an API call fetching the same configuration on every request. Recomputing the same result wastes time. Memoization is the technique of caching a function’s output so that repeated calls with the same arguments return the cached result instantly instead of recomputing.

Python’s functools module provides two decorators for memoization: @lru_cache (Least Recently Used cache, available since Python 3.2) and @cache (unbounded cache, added in Python 3.9). Both are zero-dependency standard library tools — no installation required. Add one decorator line above your function and Python handles all caching automatically.

In this tutorial you’ll learn how @lru_cache and @cache work, how to use them effectively, how to inspect cache statistics with cache_info(), how to clear the cache with cache_clear(), when to use each, and how to build a manual memoization decorator for cases requiring custom expiration or key logic. By the end you’ll be able to eliminate redundant computation from any Python function with a single line of code.

Python lru_cache: Quick Example

The classic demonstration is Fibonacci — without caching it’s exponentially slow, with caching it’s linear:

# lru_cache_quick.py
import time
from functools import lru_cache, cache

# Without cache -- exponential time O(2^n)
def fib_slow(n: int) -> int:
    if n < 2:
        return n
    return fib_slow(n - 1) + fib_slow(n - 2)

# With lru_cache -- linear time O(n)
@lru_cache(maxsize=128)
def fib_cached(n: int) -> int:
    if n < 2:
        return n
    return fib_cached(n - 1) + fib_cached(n - 2)

# Compare
n = 35
start = time.perf_counter()
result_slow = fib_slow(n)
slow_time = time.perf_counter() - start

start = time.perf_counter()
result_cached = fib_cached(n)
fast_time = time.perf_counter() - start

print(f"fib({n}) = {result_slow}")
print(f"Without cache: {slow_time:.3f}s")
print(f"With lru_cache: {fast_time:.6f}s")
print(f"Speedup: {slow_time / fast_time:.0f}x")
print(f"Cache stats: {fib_cached.cache_info()}")

Output:

fib(35) = 9227465
Without cache: 2.847s
With lru_cache: 0.000023s
Speedup: 123000x
Cache stats: CacheInfo(hits=33, misses=36, maxsize=128, currsize=36)

The cache stores results keyed by the function arguments. On the first call with argument n, the result is computed and stored (a miss). On subsequent calls with the same n, the stored result is returned instantly (a hit). For fib(35), there are only 36 unique subproblems, so after computing them once, all 33 repeated subproblem calls are cache hits.

What Is LRU Cache and How Does It Work?

An LRU (Least Recently Used) cache stores a fixed number of results. When the cache is full and a new result needs to be stored, the least recently used entry is evicted to make room. This ensures the cache stays within a bounded memory footprint while keeping the most recently accessed results available.

Decorator maxsize Python Version Use When
@lru_cache(maxsize=128) Bounded (LRU eviction) 3.2+ Memory-constrained, large input spaces
@lru_cache(maxsize=None) Unbounded 3.2+ All inputs will be cached forever
@cache Unbounded (alias) 3.9+ Same as maxsize=None, cleaner syntax
@cached_property Per-instance 3.8+ Lazy-evaluated class properties

The cache key is built from all positional and keyword arguments, so func(1, 2) and func(2, 1) are different cache entries. All arguments must be hashable -- lists, dicts, and sets cannot be used as cache keys. If you need to cache a function that takes unhashable arguments, you'll need a custom memoization approach (shown later in this tutorial).

Cache Katie with idle gears
Cache hit: O(1). Cache miss: O(whatever your function is). Cache the hot path.

Using @cache and cache_info()

@functools.cache is a simpler alias for @lru_cache(maxsize=None) added in Python 3.9. It's the right choice when you want to cache all results without any eviction. Use it with cache_info() to monitor hit rates and cache_clear() to invalidate the cache when underlying data changes.

# cache_usage.py
import functools
import time

# Simulate an expensive database lookup
FAKE_DB = {1: "Alice", 2: "Bob", 3: "Charlie", 4: "Diana"}
lookup_count = 0

@functools.cache
def get_user(user_id: int) -> str | None:
    global lookup_count
    lookup_count += 1
    time.sleep(0.05)  # simulate DB latency
    return FAKE_DB.get(user_id)

# First round -- all misses
print("=== Round 1 (cold cache) ===")
for uid in [1, 2, 3, 1, 2, 1]:  # 1 and 2 repeated
    user = get_user(uid)
    print(f"  User {uid}: {user}")

print(f"Actual DB lookups: {lookup_count}")
print(f"Cache info: {get_user.cache_info()}")

# Second round -- all hits (cache persists)
print("\n=== Round 2 (warm cache) ===")
lookups_before = lookup_count
start = time.perf_counter()
for uid in [1, 2, 3]:
    get_user(uid)
elapsed = time.perf_counter() - start
print(f"New DB lookups: {lookup_count - lookups_before}")
print(f"Time (all cached): {elapsed:.4f}s")
print(f"Cache info: {get_user.cache_info()}")

# Clear cache when data changes
print("\n=== After cache_clear() ===")
get_user.cache_clear()
print(f"Cache info: {get_user.cache_info()}")
_ = get_user(1)  # must re-fetch
print(f"DB lookups after clear: {lookup_count}")

Output:

=== Round 1 (cold cache) ===
  User 1: Alice
  User 2: Bob
  User 3: Charlie
  User 1: Alice
  User 2: Bob
  User 1: Alice
Actual DB lookups: 3
Cache info: CacheInfo(hits=3, misses=3, maxsize=None, currsize=3)

=== Round 2 (warm cache) ===
New DB lookups: 0
Time (all cached): 0.0001s
Cache info: CacheInfo(hits=6, misses=3, maxsize=None, currsize=3)

=== After cache_clear() ===
Cache info: CacheInfo(hits=0, misses=0, maxsize=None, currsize=0)
DB lookups after clear: 4

Notice that in Round 1, six calls only hit the database three times -- the repeated calls for users 1 and 2 were served from cache. The cache_info() method returns a named tuple with hits, misses, current size, and maxsize. A healthy cache hit rate for a warm cache should be 70%+; if you're seeing mostly misses, either the cache is too small or the input space is too varied for caching to help.

Choosing maxsize with lru_cache

The maxsize parameter controls how many distinct results the cache holds. When the cache is full and a new result arrives, the least recently used entry is evicted. Setting maxsize correctly is a trade-off between memory usage and cache effectiveness.

# lru_maxsize.py
from functools import lru_cache
import random

# Simulate a function with variable input distribution
@lru_cache(maxsize=10)
def process_item(item_id: int) -> str:
    return f"processed-{item_id}"

# Test with a skewed distribution (80% of calls use top 10 items)
call_log = []
random.seed(42)
for _ in range(1000):
    if random.random() < 0.8:
        item_id = random.randint(1, 10)   # hot items
    else:
        item_id = random.randint(11, 100) # cold items
    process_item(item_id)

info = process_item.cache_info()
hit_rate = info.hits / (info.hits + info.misses) * 100
print(f"maxsize=10, hit rate: {hit_rate:.1f}%")
print(f"  hits={info.hits}, misses={info.misses}, currsize={info.currsize}")

# Compare with maxsize=50
process_item.cache_clear()

@lru_cache(maxsize=50)
def process_item_large(item_id: int) -> str:
    return f"processed-{item_id}"

random.seed(42)
for _ in range(1000):
    if random.random() < 0.8:
        item_id = random.randint(1, 10)
    else:
        item_id = random.randint(11, 100)
    process_item_large(item_id)

info2 = process_item_large.cache_info()
hit_rate2 = info2.hits / (info2.hits + info2.misses) * 100
print(f"\nmaxsize=50, hit rate: {hit_rate2:.1f}%")
print(f"  hits={info2.hits}, misses={info2.misses}, currsize={info2.currsize}")

# Powers of 2 are most efficient for maxsize
print("\nNote: set maxsize to a power of 2 (32, 64, 128, 256) for best performance")

Output:

maxsize=10, hit rate: 88.6%
  hits=875, misses=125, currsize=10

maxsize=50, hit rate: 91.2%
  hits=895, misses=105, currsize=50

Note: set maxsize to a power of 2 (32, 64, 128, 256) for best performance

With a skewed distribution (most calls go to a small set of "hot" items), even a small cache of 10 achieves 88% hit rate. Setting maxsize to a power of 2 is a micro-optimization because the internal data structure is a dict with a doubly-linked list, and power-of-2 sizes work well with Python's dict hash probing. For most applications, use maxsize=128 (the default) as a starting point and adjust based on cache_info() data.

Pyro Pete pointing at speedometer
@lru_cache: one decorator, three-orders-of-magnitude speedup. Do it.

cached_property for Class Attributes

functools.cached_property is designed for class attributes that are expensive to compute but don't change after the first access. Unlike @property, which recomputes on every access, @cached_property computes once and stores the result on the instance.

# cached_property_demo.py
import functools
import math
import time

class Circle:
    def __init__(self, radius: float):
        self.radius = radius

    @functools.cached_property
    def area(self) -> float:
        """Computed once, cached forever on this instance."""
        print(f"  Computing area for radius={self.radius}...")
        time.sleep(0.01)  # simulate expensive computation
        return math.pi * self.radius ** 2

    @functools.cached_property
    def circumference(self) -> float:
        print(f"  Computing circumference for radius={self.radius}...")
        return 2 * math.pi * self.radius

c = Circle(5.0)
print("First access:")
print(f"  area = {c.area:.4f}")
print(f"  area (again) = {c.area:.4f}")   # no recomputation
print(f"  circumference = {c.circumference:.4f}")

# The cached value is stored directly in __dict__
print(f"\nInstance __dict__: {c.__dict__}")

# Change radius -- but cached values remain! Only use for immutable objects.
c.radius = 10.0
print(f"\nAfter radius change:")
print(f"  area still = {c.area:.4f}")   # still the old value!

# To force recomputation, delete from __dict__
del c.__dict__['area']
print(f"  area after cache clear = {c.area:.4f}")  # recomputed

Output:

First access:
  Computing area for radius=5.0...
  area = 78.5398
  area (again) = 78.5398
  circumference = 31.4159
Instance __dict__: {'radius': 5.0, 'area': 78.5398..., 'circumference': 31.4159...}
After radius change:
  area still = 78.5398
  area after cache clear = 314.1593

The critical warning: cached_property stores the result in the instance's __dict__, so changing the inputs (like self.radius) does not invalidate the cache. Only use it for properties derived from immutable data or data that never changes after initialization. For mutable objects, use a regular @property or implement explicit invalidation logic.

Real-Life Example: API Response Cache with TTL

A custom memoization decorator that adds TTL (Time To Live) expiration -- useful for caching API responses that should refresh periodically.

API Alice managing cache freshness
lru_cache has no TTL. That's why you're here building one.
# ttl_cache.py
import functools
import time
from typing import Any, Callable

def ttl_cache(maxsize: int = 128, ttl: float = 60.0):
    """
    Memoization decorator with TTL expiration.
    Cached results expire after 'ttl' seconds.
    """
    def decorator(func: Callable) -> Callable:
        cache: dict[Any, tuple[Any, float]] = {}  # key -> (result, timestamp)
        lock_order: list = []  # LRU tracking (simplified)

        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            # Build a hashable cache key
            key = args + tuple(sorted(kwargs.items()))
            now = time.monotonic()

            # Check cache -- return if fresh
            if key in cache:
                result, ts = cache[key]
                if now - ts < ttl:
                    return result
                else:
                    del cache[key]  # expired

            # Cache miss or expired -- compute and store
            result = func(*args, **kwargs)
            if len(cache) >= maxsize:
                # Evict oldest entry (simplified LRU)
                oldest_key = next(iter(cache))
                del cache[oldest_key]
            cache[key] = (result, now)
            return result

        def cache_info() -> dict:
            return {"size": len(cache), "maxsize": maxsize, "ttl": ttl}

        def cache_clear() -> None:
            cache.clear()

        wrapper.cache_info = cache_info
        wrapper.cache_clear = cache_clear
        return wrapper
    return decorator


# Simulate API call
api_call_count = 0

@ttl_cache(maxsize=10, ttl=2.0)  # expires after 2 seconds
def fetch_weather(city: str) -> dict:
    global api_call_count
    api_call_count += 1
    # In production: requests.get(f"https://api.weather.example.com/{city}")
    return {"city": city, "temp": 22 + hash(city) % 10, "fetched_at": time.time()}

# Demonstrate TTL behavior
print("=== TTL Cache Demo ===")
cities = ["London", "Paris", "Tokyo"]

print("\nRound 1 (cold cache):")
for city in cities:
    result = fetch_weather(city)
    print(f"  {city}: {result['temp']}C (API calls so far: {api_call_count})")

print("\nRound 2 (warm cache -- same results, no API calls):")
for city in cities:
    result = fetch_weather(city)
    print(f"  {city}: {result['temp']}C (API calls so far: {api_call_count})")

print(f"\nSleeping 2.5s to let TTL expire...")
time.sleep(2.5)

print("\nRound 3 (TTL expired -- re-fetches from API):")
for city in cities:
    result = fetch_weather(city)
    print(f"  {city}: {result['temp']}C (API calls so far: {api_call_count})")

print(f"\nTotal API calls: {api_call_count} (should be 6 = 2 rounds x 3 cities)")
print(f"Cache info: {fetch_weather.cache_info()}")

Output:

=== TTL Cache Demo ===

Round 1 (cold cache):
  London: 26C (API calls so far: 1)
  Paris: 24C (API calls so far: 2)
  Tokyo: 28C (API calls so far: 3)

Round 2 (warm cache -- same results, no API calls):
  London: 26C (API calls so far: 3)
  Paris: 24C (API calls so far: 3)
  Tokyo: 28C (API calls so far: 3)

Sleeping 2.5s to let TTL expire...

Round 3 (TTL expired -- re-fetches from API):
  London: 26C (API calls so far: 4)
  Paris: 24C (API calls so far: 5)
  Tokyo: 28C (API calls so far: 6)

Total API calls: 6 (should be 6 = 2 rounds x 3 cities)
Cache info: {'size': 3, 'maxsize': 10, 'ttl': 2.0}

This TTL cache pattern is useful for any data that's expensive to fetch but acceptable to be slightly stale -- configuration values, user preferences, exchange rates, or any API response with a known freshness window. For production use, consider the cachetools library which provides TTLCache, LRUCache, and LFUCache with thread-safe options and well-tested eviction logic.

Frequently Asked Questions

Why does lru_cache require hashable arguments?

The cache uses function arguments as dictionary keys. Python dictionaries require hashable keys -- objects that implement __hash__. Strings, numbers, tuples (of hashable items), and frozen sets are hashable. Lists, dicts, and sets are not. If you need to cache a function that takes a list, convert it to a tuple first: @lru_cache; def func(items): ... then call it as func(tuple(my_list)). For dict arguments, convert to a sorted tuple of items: func(tuple(sorted(my_dict.items()))).

Can I use lru_cache on class methods?

Using @lru_cache directly on instance methods causes a memory leak because self is part of the cache key, preventing the instance from being garbage collected. Use @functools.cached_property for properties, or use @lru_cache on a module-level function instead of a method. For methods that should cache per-instance, store a functools.lru_cache-wrapped function in __init__: self.method = functools.lru_cache(maxsize=128)(self._method_impl).

Is lru_cache thread-safe?

Yes -- @lru_cache is thread-safe. The CPython implementation uses internal locking for the cache data structure. However, the cached function itself is not protected: if two threads simultaneously call a function with the same uncached arguments, both will compute the result and one will overwrite the other's cache entry (a "thundering herd" or cache stampede). This is generally harmless (correct result, some wasted computation) but can be a problem for very expensive functions. For strict once-only computation, add explicit locking inside the function.

When should I NOT use lru_cache?

Don't use lru_cache on functions with side effects (writing to a database, sending emails) -- the cache skips the side effect on repeat calls. Don't use it on functions that return different results for the same inputs (random number generators, functions that read changing data). Don't use it when memory is constrained and the input space is very large -- an unbounded @cache on a function called with millions of unique arguments will consume all available memory. Don't use it on methods where self varies (use cached_property instead).

What is the difference between @cache and @lru_cache(maxsize=None)?

@cache (Python 3.9+) is functionally identical to @lru_cache(maxsize=None) -- both create an unbounded cache with no eviction. The difference is implementation: @cache uses a simpler, slightly faster implementation because it doesn't need to track LRU order (no eviction needed). It also avoids the overhead of the LRU doubly-linked list. Use @cache when you're on Python 3.9+ and want unbounded caching; use @lru_cache(maxsize=N) when you need bounded memory.

Conclusion

Memoization with @lru_cache and @cache is one of the highest-value optimization techniques in Python: one line of code can produce order-of-magnitude speedups for functions with repeated inputs. Use @cache for unbounded memoization (Python 3.9+), @lru_cache(maxsize=N) for bounded memory, and @cached_property for lazy class attributes. Monitor effectiveness with cache_info() and clear stale data with cache_clear().

For TTL-based caching, thread-safe caches, or more eviction strategies, the cachetools library builds on these patterns and adds production-ready implementations.

Official documentation: https://docs.python.org/3/library/functools.html#functools.lru_cache

How To Use Python threading for Multi-Threaded Programs

How To Use Python threading for Multi-Threaded Programs

Intermediate

You’re downloading files, calling APIs, or waiting on slow I/O — and your program does all of this sequentially, sitting idle while one operation completes before starting the next. For I/O-bound work like network requests, file reads, or database calls, Python’s threading module lets you run multiple operations concurrently so your program doesn’t waste time waiting. A download that takes 10 seconds sequentially for 10 files can complete in roughly 1-2 seconds when run in parallel threads.

Python’s threading module is part of the standard library and available in every Python installation. The important context: because of the Global Interpreter Lock (GIL), Python threads don’t run Python bytecode truly in parallel on multiple CPU cores. But for I/O-bound work (network, disk, database), threads spend most of their time waiting — and the GIL releases during I/O waits, so threads genuinely run concurrently for this type of work. For CPU-bound tasks, use multiprocessing instead.

In this tutorial you’ll learn how to create and start threads, use daemon vs non-daemon threads, synchronize with Lock and Event, build a producer-consumer pattern with Queue, and use ThreadPoolExecutor for managing pools of worker threads. By the end you’ll be able to write thread-safe concurrent programs for I/O-bound workloads.

Python threading: Quick Example

Here’s a simple example showing how threading reduces the total time for multiple I/O-bound tasks:

# threading_quick.py
import threading
import time

def download_file(filename: str, delay: float) -> None:
    """Simulate downloading a file (I/O wait)."""
    print(f"  Starting: {filename}")
    time.sleep(delay)  # simulate network I/O
    print(f"  Done:     {filename}")

files = [("report.pdf", 2), ("data.csv", 1.5), ("image.png", 1)]

# Sequential (slow)
print("=== Sequential ===")
start = time.perf_counter()
for name, delay in files:
    download_file(name, delay)
print(f"Sequential time: {time.perf_counter() - start:.1f}s")

# Threaded (fast)
print("\n=== Threaded ===")
start = time.perf_counter()
threads = [threading.Thread(target=download_file, args=(name, delay)) for name, delay in files]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Threaded time:   {time.perf_counter() - start:.1f}s")

Output:

=== Sequential ===
  Starting: report.pdf
  Done:     report.pdf
  Starting: data.csv
  Done:     data.csv
  Starting: image.png
  Done:     image.png
Sequential time: 4.5s

=== Threaded ===
  Starting: report.pdf
  Starting: data.csv
  Starting: image.png
  Done:     image.png
  Done:     data.csv
  Done:     report.pdf
Threaded time:   2.0s

The threaded version finishes in roughly 2 seconds — the time of the longest single task — while sequential takes 4.5 seconds total. t.start() begins the thread and returns immediately; t.join() blocks the main thread until that thread finishes. Always call join() to ensure threads complete before your program exits.

What Is Python Threading and When To Use It?

A thread is a unit of execution within a process. Unlike processes (which have separate memory), threads in the same process share memory, which makes communication easy but also introduces the risk of data races when multiple threads write to the same variable simultaneously.

Approach Best For GIL Impact Memory
threading I/O-bound: network, files, DB Releases during I/O waits Shared
multiprocessing CPU-bound: math, compression No GIL (separate processes) Separate
asyncio I/O-bound with many connections Single thread, cooperative Shared
concurrent.futures Either, with high-level API Same as threading/MP Depends

Use threading when: you have I/O-bound work, you need to run multiple blocking operations concurrently, or you’re integrating with libraries that don’t support asyncio. Avoid threading for: CPU-bound number crunching (use multiprocessing), or highly concurrent network applications (use asyncio).

Stack Trace Steve with padlock key
I/O-bound threads: the GIL releases while they wait. CPU-bound threads: not so lucky.

Creating and Managing Threads

You create a thread by passing a callable to threading.Thread(target=func, args=(...), kwargs={...}). You can also subclass Thread and override its run() method for more complex thread logic. Threads can be daemon threads (killed automatically when the main program exits) or non-daemon threads (the program waits for them to finish).

# threading_create.py
import threading
import time

# Method 1: thread with target function
def worker(name: str, count: int) -> None:
    for i in range(count):
        print(f"  [{name}] step {i+1}/{count}")
        time.sleep(0.1)

# Method 2: subclass Thread
class CounterThread(threading.Thread):
    def __init__(self, name: str, limit: int):
        super().__init__(name=name)
        self.limit = limit
        self.result = 0  # store result as attribute

    def run(self) -> None:
        for i in range(self.limit):
            self.result += i
            time.sleep(0.05)

# Create and start function-based threads
t1 = threading.Thread(target=worker, args=("Alpha", 3))
t2 = threading.Thread(target=worker, args=("Beta", 3))

print("=== Function-based threads ===")
t1.start()
t2.start()
t1.join()
t2.join()

# Daemon thread -- dies when main thread exits
print("\n=== Daemon thread ===")
def background_monitor():
    while True:
        print("  [monitor] heartbeat")
        time.sleep(0.3)

monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
time.sleep(0.7)
print("Main thread done -- daemon stops automatically")

# Subclass-based thread with result
print("\n=== Subclass-based thread ===")
counter = CounterThread("Counter", limit=10)
counter.start()
counter.join()
print(f"  Sum 0..9 = {counter.result}")

# Thread metadata
print(f"\nActive threads: {threading.active_count()}")
print(f"Current thread: {threading.current_thread().name}")

Output:

=== Function-based threads ===
  [Alpha] step 1/3
  [Beta] step 1/3
  [Alpha] step 2/3
  [Beta] step 2/3
  [Alpha] step 3/3
  [Beta] step 3/3
=== Daemon thread ===
  [monitor] heartbeat
  [monitor] heartbeat
Main thread done -- daemon stops automatically
=== Subclass-based thread ===
  Sum 0..9 = 45
Active threads: 1
Current thread: MainThread

Daemon threads are useful for background tasks like health monitors, log flushers, and cleanup workers that should not prevent the program from exiting. The key risk: daemon threads are killed immediately when the main thread exits, so they should never be in the middle of a critical operation (like writing to a database) when that happens. Use non-daemon threads for any work that must complete before exit.

Thread Safety with Lock and RLock

When multiple threads read and write the same shared data, you need synchronization to prevent data corruption. The most common tool is threading.Lock — a mutual exclusion lock that ensures only one thread accesses a critical section at a time.

# threading_lock.py
import threading
import time

# Unsafe: multiple threads incrementing a shared counter
class UnsafeCounter:
    def __init__(self):
        self.value = 0

    def increment(self, n: int) -> None:
        for _ in range(n):
            current = self.value
            time.sleep(0)  # yield to other threads
            self.value = current + 1

# Safe: protected with Lock
class SafeCounter:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def increment(self, n: int) -> None:
        for _ in range(n):
            with self._lock:   # acquire/release automatically
                self.value += 1

def run_counters(counter_class, n_threads=5, increments=1000):
    counter = counter_class()
    threads = [threading.Thread(target=counter.increment, args=(increments,))
               for _ in range(n_threads)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    expected = n_threads * increments
    print(f"  {counter_class.__name__}: got {counter.value}, expected {expected}, "
          f"{'OK' if counter.value == expected else 'DATA RACE!'}")

print("Counter comparison (5 threads x 1000 increments = 5000 expected):")
run_counters(UnsafeCounter)
run_counters(SafeCounter)

# RLock -- reentrant lock for recursive code
print("\n=== RLock (reentrant) ===")
rlock = threading.RLock()

def recursive_task(depth: int, lock: threading.RLock) -> None:
    with lock:
        if depth > 0:
            print(f"  Depth {depth}, acquiring lock again...")
            recursive_task(depth - 1, lock)   # same thread can re-acquire RLock
        else:
            print(f"  Base case")

recursive_task(3, rlock)
print("  RLock released all levels -- done")

Output:

Counter comparison (5 threads x 1000 increments = 5000 expected):
  UnsafeCounter: got 3847, expected 5000, DATA RACE!
  SafeCounter: got 5000, expected 5000, OK
=== RLock (reentrant) ===
  Depth 3, acquiring lock again...
  Depth 2, acquiring lock again...
  Depth 1, acquiring lock again...
  Base case
  RLock released all levels -- done

The with self._lock: pattern is the correct way to use locks — it ensures the lock is always released, even if the code inside raises an exception. Never use lock.acquire() without a matching lock.release() in a finally block. Use RLock instead of Lock when the same thread might need to acquire the lock multiple times (e.g., in recursive functions) — a regular Lock deadlocks if the same thread tries to acquire it twice.

Cache Katie at mission control
threading.Lock() — one thread in, everyone else waits. That’s the deal.

Events, Barriers, and Thread Communication

Beyond locks, Python’s threading module provides higher-level synchronization primitives for coordinating thread behavior. Event signals between threads, Barrier makes a group of threads wait for each other, and Queue (from the queue module) is the preferred way to pass data between threads safely.

# threading_event.py
import threading
import time
import queue

# Event: one thread signals another
print("=== Event: start signal ===")
start_event = threading.Event()
results = []

def waiter(name: str, event: threading.Event) -> None:
    print(f"  [{name}] waiting for signal...")
    event.wait()   # blocks until event.set() is called
    print(f"  [{name}] received signal, working...")
    time.sleep(0.1)
    results.append(name)

workers = [threading.Thread(target=waiter, args=(f"Worker-{i}", start_event)) for i in range(3)]
for w in workers:
    w.start()

time.sleep(0.3)
print("  [main] sending start signal")
start_event.set()  # wake up all waiting threads

for w in workers:
    w.join()
print(f"  Results: {results}")

# Queue: producer-consumer pattern
print("\n=== Queue: producer-consumer ===")
work_queue = queue.Queue(maxsize=3)

def producer(q: queue.Queue, items: list) -> None:
    for item in items:
        q.put(item)   # blocks if queue is full
        print(f"  [producer] added: {item}")
    q.put(None)  # sentinel to signal done

def consumer(q: queue.Queue) -> None:
    while True:
        item = q.get()   # blocks until item available
        if item is None:
            q.task_done()
            break
        print(f"  [consumer] processing: {item}")
        time.sleep(0.1)
        q.task_done()

tasks = ["task-A", "task-B", "task-C", "task-D"]
prod = threading.Thread(target=producer, args=(work_queue, tasks))
cons = threading.Thread(target=consumer, args=(work_queue,))

prod.start()
cons.start()
prod.join()
cons.join()
work_queue.join()
print("  All tasks processed")

Output:

=== Event: start signal ===
  [Worker-0] waiting for signal...
  [Worker-1] waiting for signal...
  [Worker-2] waiting for signal...
  [main] sending start signal
  [Worker-0] received signal, working...
  [Worker-1] received signal, working...
  [Worker-2] received signal, working...
  Results: ['Worker-0', 'Worker-1', 'Worker-2']
=== Queue: producer-consumer ===
  [producer] added: task-A
  [producer] added: task-B
  [consumer] processing: task-A
  [producer] added: task-C
  [consumer] processing: task-B
  ...
  All tasks processed

queue.Queue is thread-safe by design — its internal lock means you never need a separate Lock to protect it. The sentinel value (None) pattern is the standard way to signal a consumer to stop: put a None in the queue for each consumer thread. Always call q.task_done() after processing each item if you’re using q.join() to wait for completion.

Real-Life Example: Concurrent URL Checker

A thread pool-based URL health checker that checks multiple URLs concurrently and reports their status, using ThreadPoolExecutor for clean thread management.

Pyro Pete typing with thread dials
ThreadPoolExecutor manages the pool. You manage the panic when URLs go red.
# url_checker.py
import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen
from urllib.error import URLError, HTTPError

def check_url(url: str, timeout: int = 5) -> dict:
    """Check if a URL is reachable. Returns status dict."""
    start = time.perf_counter()
    try:
        response = urlopen(url, timeout=timeout)
        elapsed = time.perf_counter() - start
        return {
            "url": url,
            "status": response.status,
            "ok": response.status < 400,
            "elapsed_ms": round(elapsed * 1000),
            "error": None
        }
    except HTTPError as e:
        return {"url": url, "status": e.code, "ok": False,
                "elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e)}
    except URLError as e:
        return {"url": url, "status": 0, "ok": False,
                "elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e.reason)}
    except Exception as e:
        return {"url": url, "status": 0, "ok": False,
                "elapsed_ms": round((time.perf_counter() - start) * 1000), "error": str(e)}

def check_urls_concurrent(urls: list[str], max_workers: int = 5) -> list[dict]:
    """Check multiple URLs concurrently using a thread pool."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_url = {executor.submit(check_url, url): url for url in urls}
        # Collect results as they complete
        for future in as_completed(future_to_url):
            result = future.result()
            results.append(result)
            status_icon = "OK " if result["ok"] else "FAIL"
            print(f"  [{status_icon}] {result['url'][:50]:<50} "
                  f"{result['status']:>3}  {result['elapsed_ms']:>5}ms")
    return results

# Check a set of real public URLs
urls_to_check = [
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/404",
    "https://httpbin.org/delay/1",
    "https://jsonplaceholder.typicode.com/posts/1",
    "https://quotes.toscrape.com/",
]

print(f"Checking {len(urls_to_check)} URLs with 5 threads...\n")
print(f"  {'STATUS':<6} {'URL':<50} {'CODE':>4} {'TIME':>6}")
print("  " + "-" * 68)

start = time.perf_counter()
results = check_urls_concurrent(urls_to_check)
total_time = time.perf_counter() - start

print(f"\nCompleted in {total_time:.1f}s")
ok_count = sum(1 for r in results if r["ok"])
print(f"  {ok_count}/{len(results)} URLs healthy")
avg_ms = sum(r["elapsed_ms"] for r in results) / len(results)
print(f"  Avg response time: {avg_ms:.0f}ms")

Output:

Checking 5 URLs with 5 threads...

  STATUS URL                                                CODE   TIME
  --------------------------------------------------------------------
  [OK ] https://httpbin.org/status/200                      200    312ms
  [FAIL] https://httpbin.org/status/404                     404    298ms
  [OK ] https://jsonplaceholder.typicode.com/posts/1        200    189ms
  [OK ] https://quotes.toscrape.com/                        200    245ms
  [OK ] https://httpbin.org/delay/1                         200   1051ms

Completed in 1.1s
  4/5 URLs healthy
  Avg response time: 419ms

ThreadPoolExecutor is the modern way to manage a pool of worker threads — it handles thread creation, reuse, and cleanup automatically. as_completed() returns futures as they finish (not in submission order), so faster URLs report first. Extend this checker with retry logic for timeouts, a CSV report of results, or a –watch mode that re-checks URLs every 30 seconds using a daemon thread.

Frequently Asked Questions

If Python has the GIL, why use threading at all?

The GIL prevents multiple threads from running Python bytecode simultaneously on different CPU cores — this affects CPU-bound work. But for I/O-bound work, threads spend most of their time waiting for network or disk responses, not executing Python code. Python releases the GIL during I/O system calls, so multiple threads can have their I/O in-flight simultaneously. The result: for I/O-bound workloads (HTTP requests, file reads, database queries), threading provides genuine concurrency and real speedups.

Should I use Thread directly or ThreadPoolExecutor?

Use ThreadPoolExecutor from concurrent.futures for most cases — it handles thread lifecycle, limits concurrency via the pool size, and makes it easy to collect results with Future.result(). Use threading.Thread directly when you need fine-grained control over thread behavior (daemon settings, join() ordering, custom run() methods), or when you’re implementing a specific synchronization pattern like producer-consumer with Queue.

How do I avoid deadlocks?

A deadlock occurs when two threads each hold a lock the other needs. The main prevention rules: always acquire locks in the same order across all threads, use with lock: (never acquire() without release()), prefer queue.Queue for inter-thread communication over shared variables with locks, and use lock.acquire(timeout=5) to detect potential deadlocks instead of blocking forever. If you suspect a deadlock, print threading.enumerate() to see which threads are alive and stuck.

What is threading.local() used for?

threading.local() creates thread-local storage — each thread gets its own independent copy of the data. This is how Flask stores the current request object (each request handler thread gets its own request context) and how SQLAlchemy session scoping works. Use it when you need per-thread state that shouldn’t be shared, like a database connection or a user session object. Access it via attribute assignment: local_data = threading.local(); local_data.user_id = 42.

How do I stop a thread cleanly?

Python doesn’t provide a way to forcibly kill a thread — you must signal it to stop itself. The standard pattern is a shared stop flag: stop_event = threading.Event(). The thread checks if stop_event.is_set(): break in its loop, and the main thread calls stop_event.set() to signal it to stop. For blocking operations (like queue.get()), use queue.get(timeout=1) with a loop so the thread can periodically check the stop flag. Never use thread._stop() — it’s a private API and can leave shared resources in a corrupted state.

Conclusion

Python’s threading module is the right tool for I/O-bound concurrent work. The core tools are Thread for creating threads, Lock for protecting shared state, Event for signaling between threads, Queue for safe inter-thread data transfer, and ThreadPoolExecutor for managed thread pools. The URL checker above is a template for any concurrent I/O task — adapt it for batch API calls, file processing pipelines, or database operations.

For CPU-bound parallelism, switch to multiprocessing.Pool or ProcessPoolExecutor. For very high concurrency I/O (hundreds of simultaneous connections), consider asyncio with aiohttp.

Official documentation: https://docs.python.org/3/library/threading.html

How To Use Python SQLModel for Database Models with FastAPI

How To Use Python SQLModel for Database Models with FastAPI

Intermediate

If you’ve built a FastAPI application with a database backend, you’ve probably run into this: you define a Pydantic model for request/response validation, then a separate SQLAlchemy model for the database table, and then spend time keeping them in sync. Change one field and you have to update both. Add a validator and you need it in both places. SQLModel solves this by merging the two into a single class that works as both a Pydantic model and a SQLAlchemy ORM model.

SQLModel is built by the same developer as FastAPI (Sebastian Ramirez) and is designed to integrate seamlessly with FastAPI’s dependency injection and OpenAPI documentation. Under the hood it’s a thin layer over SQLAlchemy 2.0 and Pydantic v2, so you get the full power of both — type-checked models that also map to database tables, automatic schema generation, and Alembic migration support. You’ll need Python 3.7+ and you install it with pip install sqlmodel.

In this tutorial you’ll learn how to define SQLModel table models, create and query a database with the SQLAlchemy engine, relate tables with foreign keys, use SQLModel with FastAPI for a complete CRUD API, and handle optional fields correctly for create vs update operations. By the end you’ll have a working FastAPI + SQLite application that demonstrates the full SQLModel workflow.

Python SQLModel: Quick Example

Here’s a minimal SQLModel app that creates a table, inserts a row, and queries it back — all in under 20 lines:

# sqlmodel_quick.py
from sqlmodel import Field, Session, SQLModel, create_engine, select

class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    power: str
    level: int = 1

engine = create_engine("sqlite:///heroes.db", echo=False)
SQLModel.metadata.create_all(engine)

# Insert
with Session(engine) as session:
    hero = Hero(name="Spider-Man", power="wall-crawling", level=5)
    session.add(hero)
    session.commit()
    session.refresh(hero)
    print(f"Created hero ID={hero.id}")

# Query
with Session(engine) as session:
    heroes = session.exec(select(Hero)).all()
    for h in heroes:
        print(f"{h.id}: {h.name} (level {h.level})")

Output:

Created hero ID=1
1: Spider-Man (level 5)

The key is table=True in the class definition — this tells SQLModel to register the class as a database table. Without it, the class is just a Pydantic model. The Field(primary_key=True) maps to SQLAlchemy’s primary key constraint, and default=None lets the database auto-assign the ID on insert. Notice how session.exec(select(Hero)) returns fully typed Hero objects — no manual type conversion needed.

The sections below cover relationships, FastAPI integration, and the create/update pattern in depth.

What Is SQLModel and Why Use It?

SQLModel is a Python ORM library that unifies SQLAlchemy and Pydantic into a single model definition. A traditional FastAPI + SQLAlchemy stack requires two parallel class hierarchies: one for database tables (SQLAlchemy declarative base) and one for API schemas (Pydantic BaseModel). SQLModel replaces both with a single SQLModel base class that is simultaneously a Pydantic model and a SQLAlchemy ORM model.

Feature SQLAlchemy alone Pydantic alone SQLModel
Database ORM Yes No Yes
Runtime type validation No Yes Yes
FastAPI schema generation No Yes Yes
Single model definition No No Yes
Alembic migration support Yes No Yes
Async support Yes (asyncio) N/A Yes (AsyncSession)

The trade-off is that SQLModel adds an abstraction layer, so some advanced SQLAlchemy patterns require dropping down to the SQLAlchemy API. For most CRUD applications, SQLModel’s higher-level API is exactly the right level of abstraction.

Debug Dee examining database relationships
Two model classes, one schema to maintain. SQLModel picked the right battle.

Defining Table and Non-Table Models

The key design choice in SQLModel is the table=True flag. Models with this flag map to database tables; models without it are pure Pydantic schemas used for API input/output validation. This lets you define separate schemas for “create” (no ID field), “read” (ID included), and “update” (all fields optional) while sharing the field definitions from the table model.

# sqlmodel_models.py
from sqlmodel import Field, SQLModel
from typing import Optional

# --- Database table model ---
class Item(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(index=True)          # creates DB index
    description: Optional[str] = None
    price: float
    in_stock: bool = True

# --- API schemas (no table=True) ---
class ItemCreate(SQLModel):
    """Fields required to create an item. No id -- DB assigns it."""
    name: str
    description: Optional[str] = None
    price: float
    in_stock: bool = True

class ItemRead(SQLModel):
    """Fields returned in API responses. id is always present."""
    id: int
    name: str
    description: Optional[str]
    price: float
    in_stock: bool

class ItemUpdate(SQLModel):
    """All fields optional for PATCH updates."""
    name: Optional[str] = None
    description: Optional[str] = None
    price: Optional[float] = None
    in_stock: Optional[bool] = None

# Validate that ItemCreate works as expected
item = ItemCreate(name="Widget", price=9.99)
print(item.model_dump())

# Invalid data raises a ValidationError
try:
    bad = ItemCreate(name="Widget", price="not-a-number")
except Exception as e:
    print(f"Validation error: {type(e).__name__}")

Output:

{'name': 'Widget', 'description': None, 'price': 9.99, 'in_stock': True}
Validation error: ValidationError

Using separate ItemCreate, ItemRead, and ItemUpdate schemas alongside the Item table model is the standard SQLModel pattern for FastAPI. The ItemCreate schema is what the API accepts; the ItemRead schema is what the API returns; the Item table model is what the database uses. FastAPI’s response model validation uses ItemRead to strip any fields that shouldn’t appear in the response.

CRUD Operations with Session

SQLModel uses the same Session pattern as SQLAlchemy 2.0. Every database operation runs inside a session context manager. The select() function builds type-safe queries and session.exec() executes them, returning strongly-typed results rather than raw row tuples.

# sqlmodel_crud.py
from sqlmodel import Field, Session, SQLModel, create_engine, select
from typing import Optional
import os

class Item(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    price: float
    in_stock: bool = True

DB_PATH = "items.db"
engine = create_engine(f"sqlite:///{DB_PATH}", echo=False)
SQLModel.metadata.create_all(engine)

def create_item(name: str, price: float) -> Item:
    with Session(engine) as session:
        item = Item(name=name, price=price)
        session.add(item)
        session.commit()
        session.refresh(item)   # load auto-assigned id
        return item

def get_item(item_id: int) -> Optional[Item]:
    with Session(engine) as session:
        return session.get(Item, item_id)

def list_items(min_price: float = 0.0) -> list[Item]:
    with Session(engine) as session:
        stmt = select(Item).where(Item.price >= min_price).order_by(Item.name)
        return session.exec(stmt).all()

def update_item(item_id: int, new_price: float) -> Optional[Item]:
    with Session(engine) as session:
        item = session.get(Item, item_id)
        if not item:
            return None
        item.price = new_price
        session.add(item)
        session.commit()
        session.refresh(item)
        return item

def delete_item(item_id: int) -> bool:
    with Session(engine) as session:
        item = session.get(Item, item_id)
        if not item:
            return False
        session.delete(item)
        session.commit()
        return True

# Demo
widget = create_item("Widget", 9.99)
gadget = create_item("Gadget", 24.99)
print(f"Created: {widget.id} {widget.name}, {gadget.id} {gadget.name}")

items = list_items(min_price=10.0)
print(f"Items >= $10: {[i.name for i in items]}")

updated = update_item(widget.id, 12.99)
print(f"Updated widget price: ${updated.price}")

deleted = delete_item(gadget.id)
print(f"Deleted gadget: {deleted}")
print(f"Remaining items: {len(list_items())}")

# Cleanup
os.remove(DB_PATH)

Output:

Created: 1 Widget, 2 Gadget
Items >= $10: ['Gadget']
Updated widget price: $12.99
Deleted gadget: True
Remaining items: 1

session.get(Item, item_id) is the efficient single-row lookup by primary key — it hits the identity cache first and only queries the database if the object isn’t already loaded. Always call session.refresh(item) after commit() to reload auto-generated values like id or database-side defaults. Without refresh(), accessing item.id after a commit can raise a DetachedInstanceError.

API Alice with Swagger documentation
Foreign key defined. Cascade behavior: a problem for Future You.

Defining Relationships with Foreign Keys

SQLModel supports table relationships using Relationship and standard SQLAlchemy foreign keys. Relationships let you navigate between related objects in Python without writing manual join queries.

# sqlmodel_relationships.py
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
from typing import Optional, List

class Team(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    heroes: List["Hero"] = Relationship(back_populates="team")

class Hero(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    team_id: Optional[int] = Field(default=None, foreign_key="team.id")
    team: Optional[Team] = Relationship(back_populates="heroes")

engine = create_engine("sqlite:///teams.db", echo=False)
SQLModel.metadata.create_all(engine)

with Session(engine) as session:
    # Create team and heroes together
    avengers = Team(name="Avengers")
    session.add(avengers)
    session.commit()
    session.refresh(avengers)

    iron_man = Hero(name="Iron Man", team_id=avengers.id)
    thor = Hero(name="Thor", team_id=avengers.id)
    session.add_all([iron_man, thor])
    session.commit()

    # Query heroes with their team
    stmt = select(Hero).where(Hero.team_id == avengers.id)
    members = session.exec(stmt).all()
    print(f"{avengers.name} has {len(members)} heroes:")
    for h in members:
        print(f"  - {h.name}")

import os; os.remove("teams.db")

Output:

Avengers has 2 heroes:
  - Iron Man
  - Thor

The back_populates parameter creates a two-way relationship: team.heroes gives you the list of heroes in a team, and hero.team gives you the hero’s team. SQLModel uses lazy loading by default — the related objects are only fetched when you actually access the relationship attribute. For performance-critical applications, use selectin loading or explicit joins to avoid N+1 query problems.

FastAPI Integration: Complete CRUD API

SQLModel’s real strength shows when combined with FastAPI. The same model classes serve as both database tables and FastAPI request/response schemas, and FastAPI automatically generates OpenAPI documentation for them.

# sqlmodel_fastapi.py
from fastapi import FastAPI, HTTPException, Depends
from sqlmodel import Field, Session, SQLModel, create_engine, select
from typing import Optional, List

# --- Models ---
class HeroBase(SQLModel):
    name: str
    power: str
    level: int = 1

class Hero(HeroBase, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)

class HeroCreate(HeroBase):
    pass  # same fields as base, no id

class HeroRead(HeroBase):
    id: int

class HeroUpdate(SQLModel):
    name: Optional[str] = None
    power: Optional[str] = None
    level: Optional[int] = None

# --- Database setup ---
engine = create_engine("sqlite:///heroes.db", echo=False)
SQLModel.metadata.create_all(engine)

def get_session():
    with Session(engine) as session:
        yield session

# --- FastAPI app ---
app = FastAPI(title="Heroes API")

@app.post("/heroes/", response_model=HeroRead, status_code=201)
def create_hero(hero: HeroCreate, session: Session = Depends(get_session)):
    db_hero = Hero.model_validate(hero)
    session.add(db_hero)
    session.commit()
    session.refresh(db_hero)
    return db_hero

@app.get("/heroes/", response_model=List[HeroRead])
def read_heroes(skip: int = 0, limit: int = 10, session: Session = Depends(get_session)):
    heroes = session.exec(select(Hero).offset(skip).limit(limit)).all()
    return heroes

@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int, session: Session = Depends(get_session)):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    return hero

@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(hero_id: int, hero_update: HeroUpdate, session: Session = Depends(get_session)):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    update_data = hero_update.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(hero, field, value)
    session.add(hero)
    session.commit()
    session.refresh(hero)
    return hero

@app.delete("/heroes/{hero_id}")
def delete_hero(hero_id: int, session: Session = Depends(get_session)):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    session.delete(hero)
    session.commit()
    return {"ok": True}

Run with:

pip install fastapi uvicorn sqlmodel
uvicorn sqlmodel_fastapi:app --reload
# Docs at http://127.0.0.1:8000/docs

The model_validate(hero) call converts a HeroCreate Pydantic model into a Hero table model — SQLModel inherits Pydantic’s model_validate for this. The Depends(get_session) pattern injects a fresh session per request and automatically closes it when the request completes. hero_update.model_dump(exclude_unset=True) is the key to PATCH support: it only returns fields that were actually sent in the request, not fields that defaulted to None.

Loop Larry organizing data models
response_model=HeroRead strips the internal fields. Your API won’t leak what it shouldn’t.

Real-Life Example: Task Manager API

A complete task manager with users, tasks, and status tracking — demonstrating relationships, filtering, and pagination in a realistic SQLModel + FastAPI app.

Python SQLModel task manager
offset(skip).limit(limit) — pagination that works until you have 10 million rows.

# task_manager.py
from fastapi import FastAPI, HTTPException, Depends, Query
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
from typing import Optional, List
from datetime import datetime
from enum import Enum

class TaskStatus(str, Enum):
    todo = "todo"
    in_progress = "in_progress"
    done = "done"

class UserBase(SQLModel):
    username: str
    email: str

class User(UserBase, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    tasks: List["Task"] = Relationship(back_populates="owner")

class UserCreate(UserBase):
    pass

class UserRead(UserBase):
    id: int

class TaskBase(SQLModel):
    title: str
    description: Optional[str] = None
    status: TaskStatus = TaskStatus.todo

class Task(TaskBase, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    owner_id: Optional[int] = Field(default=None, foreign_key="user.id")
    owner: Optional[User] = Relationship(back_populates="tasks")

class TaskCreate(TaskBase):
    owner_id: int

class TaskRead(TaskBase):
    id: int
    created_at: datetime
    owner_id: int

class TaskUpdate(SQLModel):
    title: Optional[str] = None
    description: Optional[str] = None
    status: Optional[TaskStatus] = None

engine = create_engine("sqlite:///tasks.db", echo=False)
SQLModel.metadata.create_all(engine)

def get_session():
    with Session(engine) as session:
        yield session

app = FastAPI(title="Task Manager API")

@app.post("/users/", response_model=UserRead, status_code=201)
def create_user(user: UserCreate, session: Session = Depends(get_session)):
    db_user = User.model_validate(user)
    session.add(db_user)
    session.commit()
    session.refresh(db_user)
    return db_user

@app.post("/tasks/", response_model=TaskRead, status_code=201)
def create_task(task: TaskCreate, session: Session = Depends(get_session)):
    db_task = Task.model_validate(task)
    session.add(db_task)
    session.commit()
    session.refresh(db_task)
    return db_task

@app.get("/tasks/", response_model=List[TaskRead])
def list_tasks(
    status: Optional[TaskStatus] = None,
    owner_id: Optional[int] = None,
    skip: int = Query(default=0, ge=0),
    limit: int = Query(default=10, le=100),
    session: Session = Depends(get_session)
):
    stmt = select(Task)
    if status:
        stmt = stmt.where(Task.status == status)
    if owner_id:
        stmt = stmt.where(Task.owner_id == owner_id)
    stmt = stmt.offset(skip).limit(limit)
    return session.exec(stmt).all()

@app.patch("/tasks/{task_id}", response_model=TaskRead)
def update_task(task_id: int, update: TaskUpdate, session: Session = Depends(get_session)):
    task = session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    for field, value in update.model_dump(exclude_unset=True).items():
        setattr(task, field, value)
    session.add(task)
    session.commit()
    session.refresh(task)
    return task

Run with:

uvicorn task_manager:app --reload
# POST /users/  -> creates user
# POST /tasks/  -> creates task for that user
# GET /tasks/?status=todo&owner_id=1  -> filtered task list

The TaskStatus enum ensures the API only accepts valid status values and documents the allowed choices in the OpenAPI schema automatically. The filtering pattern in list_tasks is composable — you can add more .where()` clauses without rewriting the query. Extend this example by adding authentication with FastAPI's OAuth2 dependency, Alembic migrations for schema changes, or async sessions with AsyncSession for high-concurrency workloads.

Frequently Asked Questions

Is SQLModel a replacement for SQLAlchemy?

SQLModel is built on top of SQLAlchemy, not a replacement for it. It provides a higher-level API that integrates with Pydantic, but all SQLAlchemy features are still available. You can mix SQLModel's select() with SQLAlchemy's advanced query constructs, use Alembic for migrations (SQLModel.metadata is a standard SQLAlchemy MetaData object), and access the underlying engine and session objects directly. When SQLModel's API is insufficient, drop down to SQLAlchemy -- they're fully compatible.

Does SQLModel support async databases?

Yes. Use create_async_engine from SQLAlchemy and AsyncSession from sqlmodel.ext.asyncio (or directly from sqlalchemy.ext.asyncio). The query syntax is identical, but you await session.exec() instead of calling it synchronously. For databases, use an async driver like aiosqlite for SQLite or asyncpg for PostgreSQL. The async pattern is recommended for high-concurrency FastAPI applications where database calls should not block the event loop.

How do I handle database migrations with SQLModel?

Use Alembic -- the standard SQLAlchemy migration tool. Run alembic init alembic to create the migration environment, then set target_metadata = SQLModel.metadata in alembic/env.py. Run alembic revision --autogenerate -m "add column" to auto-generate a migration from your model changes, and alembic upgrade head to apply it. Never use SQLModel.metadata.create_all() in production -- that only creates tables that don't exist and won't handle column additions, renames, or deletions.

Why use Optional[str] = None for non-required fields?

SQLModel inherits Pydantic's field semantics: a field with Optional[str] = None is optional in both the Python model and the database column (nullable). This creates a nullable column in the database. If you want a column that has a database default but is required in Python, use Field(default=None, sa_column_kwargs={"server_default": "value"}). The exclude_unset=True pattern in PATCH endpoints relies on distinguishing "field not sent" (not in model dump) from "field sent as None" (in dump as None).

Can SQLModel work with PostgreSQL and MySQL?

Yes -- SQLModel uses SQLAlchemy's database abstraction layer, so it works with any database SQLAlchemy supports: PostgreSQL (use psycopg2 or asyncpg), MySQL/MariaDB (use mysqlclient or aiomysql), SQLite (built-in), and more. Change the connection URL in create_engine(): "postgresql://user:pass@localhost/dbname" for PostgreSQL. Field types like str and int map to the appropriate column types for each database automatically.

Conclusion

SQLModel eliminates the duplication between SQLAlchemy ORM models and Pydantic schemas by merging them into a single class. The core workflow is: define a table=True model for the database, define non-table models for API input/output, use Session.exec(select(Model))` for queries, and use model_dump(exclude_unset=True) for PATCH operations. The FastAPI integration is seamless because both libraries share the same author and design philosophy.

The task manager example above is a solid foundation for a real application. Next steps: add Alembic for migrations, switch to AsyncSession for async support, add authentication with FastAPI's Depends system, and add indexes to frequently-filtered columns with Field(index=True).

Official documentation: https://sqlmodel.tiangolo.com/

How To Use Python dis Module to Inspect Bytecode

How To Use Python dis Module to Inspect Bytecode

Advanced

You’ve profiled your Python code and a particular function keeps showing up in the hot path. You’ve tried rewriting it a few different ways and timeit says one version is 30% faster — but you’re not sure why. Or you’re curious what Python is actually doing when you write a list comprehension versus a for loop, or amat the difference is between x += 1 and x = x + 1 at the bytecode level. The dis module (short for “disassemble”) lets you answer these questions by showing you the CPython bytecode that any Python function compiles to.

The dis module is part of Python’s standard library and works with any function, method, class, module, or code string. It doesn’t modify your code — it just translates the compiled .pyc bytecode back into a human-readable instruction listing. You don’t need to be a CPython internals expert to use it; the instruction names are descriptive enough that you can reason about them with basic Python knowledge.

In this tutorial, you’ll learn how to disassemble functions with dis.dis(), read and interpret bytecode output, inspect code objects with __code__ attributes, compare implementations by their bytecode, understand how closures and global lookups work at the bytecode level, and use dis as a performance diagnosis tool. By the end, you’ll have a concrete mental model of what CPython does with your code.

Python dis Module: Quick Example

Here’s a basic disassembly of a simple function to show what the output looks like:

# dis_quick.py
import dis

def add_numbers(a, b):
    result = a + b
    return result

dis.dis(add_numbers)

Output:

  2           0 RESUME                   0

  3           2 LOAD_FAST                0 (a)
              4 LOAD_FAST                1 (b)
              6 BINARY_OP                0 (+)
             10 STORE_FAST               2 (result)

  4          12 LOAD_FAST                2 (result)
             14 RETURN_VALUE

Each row is one bytecode instruction. The columns are: source line number (left), byte offset in the code object, instruction name (opcode), argument, and the argument’s human-readable value in parentheses. LOAD_FAST pushes a local variable onto the evaluation stack. BINARY_OP pops two values, applies the operator (0 = +), and pushes the result. STORE_FAST pops the top of the stack into a local variable. RETURN_VALUE returns the top of the stack to the caller.

What Is CPython Bytecode?

When CPython compiles your Python source code, it produces bytecode — a sequence of fixed-width instructions for the CPython virtual machine (VM). The VM then interprets these instructions in a loop, maintaining an evaluation stack where operands are pushed and popped. Bytecode is stored in .pyc files and in the __code__ attribute of function objects.

ConceptWhat It IsExample
OpcodeA single VM instructionLOAD_FAST, CALL
OffsetByte position of instruction in code object0, 2, 4…
ArgumentInteger parameter to the opcodeVariable index, constant index
StackLIFO buffer for operands and resultsLOAD pushes, STORE/CALL pops
Code objectCompiled bytecode + metadata for a functionfunc.__code__
co_varnamesTuple of local variable names('a', 'b', 'result')

Understanding the stack model is the key to reading bytecode. Every LOAD instruction pushes a value; every STORE, CALL, or binary operator pops one or more values. The stack depth at any point tells you how many “in-flight” values exist. CPython verifies the stack depth at compile time, which is why certain invalid constructs are caught before the code runs.

Inspecting bytecode with dis
dis.dis() shows you every LOAD_FAST and BINARY_OP your function executes. It judges silently.

Reading the dis Output

Let’s look at a more complex function to practice reading bytecode:

# dis_read.py
import dis

def categorize_score(score):
    if score >= 90:
        return "A"
    elif score >= 80:
        return "B"
    else:
        return "C"

print("=== categorize_score ===")
dis.dis(categorize_score)

# Compare with a dict-based version
GRADE_THRESHOLDS = [(90, "A"), (80, "B"), (0, "C")]

def categorize_score_v2(score):
    for threshold, grade in GRADE_THRESHOLDS:
        if score >= threshold:
            return grade

print("\n=== categorize_score_v2 ===")
dis.dis(categorize_score_v2)

Output:

=== categorize_score ===
  2           0 RESUME                   0

  3           2 LOAD_FAST                0 (score)
              4 LOAD_CONST               1 (90)
              6 COMPARE_OP               5 (>=)
             10 POP_JUMP_IF_FALSE       12 (to 36)

  4          12 LOAD_CONST               2 ('A')
             14 RETURN_VALUE

  5          16 LOAD_FAST                0 (score)
  ...
  (abbreviated)

=== categorize_score_v2 ===
  2           0 RESUME                   0

  3           2 LOAD_GLOBAL              1 (NULL + GRADE_THRESHOLDS)
             12 GET_ITER
        >>   14 FOR_ITER                28 (to 72)
             18 UNPACK_SEQUENCE          2
             22 STORE_FAST               1 (threshold)
             24 STORE_FAST               2 (grade)
  ...

The POP_JUMP_IF_FALSE instruction is how Python implements if statements in bytecode: it pops the boolean result of the comparison and jumps to the target offset if it’s False, otherwise continues to the next instruction. In v2, the LOAD_GLOBAL instruction for GRADE_THRESHOLDS does a dictionary lookup in the global namespace on every call — that’s slower than LOAD_FAST which reads from a local slot. This is why moving frequently-accessed globals into local variables inside tight loops is a valid micro-optimization.

Inspecting Code Objects

Every function has a __code__ attribute that exposes the raw code object. The code object contains not just the bytecode bytes but also metadata: constant values, variable names, closure variables, and source information.

# dis_code_object.py
import dis

def make_adder(n):
    """Return a function that adds n to its argument."""
    def adder(x):
        return x + n
    return adder

add5 = make_adder(5)

# Inspect the outer function
outer_code = make_adder.__code__
print("=== make_adder code object ===")
print(f"co_name:       {outer_code.co_name}")
print(f"co_varnames:   {outer_code.co_varnames}")    # local variables
print(f"co_freevars:   {outer_code.co_freevars}")    # closed-over variables
print(f"co_cellvars:   {outer_code.co_cellvars}")    # variables used by inner funcs
print(f"co_consts:     {outer_code.co_consts}")      # constants (includes inner code)
print(f"co_flags:      {outer_code.co_flags:#010b}")

# Inspect the inner adder function
inner_code = add5.__code__
print("\n=== adder code object ===")
print(f"co_name:       {inner_code.co_name}")
print(f"co_varnames:   {inner_code.co_varnames}")
print(f"co_freevars:   {inner_code.co_freevars}")    # 'n' is a free variable

print("\n=== adder bytecode ===")
dis.dis(add5)

Output:

=== make_adder code object ===
co_name:       make_adder
co_varnames:   ('n', 'adder')
co_freevars:   ()
co_cellvars:   ('n',)
co_consts:     (None, <code object adder at 0x...>, 'make_adder.<locals>.adder')
co_flags:      0b0000000001000011

=== adder code object ===
co_name:       adder
co_varnames:   ('x',)
co_freevars:   ('n',)

=== adder bytecode ===
  3           0 RESUME                   0

  4           2 LOAD_FAST                0 (x)
              4 LOAD_DEREF               0 (n)
              6 BINARY_OP                0 (+)
             10 RETURN_VALUE

LOAD_DEREF is the opcode for accessing a closure variable (a free variable). It reads from a “cell” object that’s shared between the outer and inner function, which is how closures maintain state after the outer function returns. This is slower than LOAD_FAST but necessary for closures. co_cellvars in the outer function lists variables that are “wrapped in cells” for sharing with inner functions; co_freevars in the inner function lists the variables it accesses from the cell.

Reading bytecode like a blueprint
dis.get_instructions() returns a generator. Your profiler returns excuses.

Comparing Implementations

One of the most practical uses of dis is comparing two implementations of the same function. Fewer instructions usually means faster execution (though the relationship isn’t perfect — some instructions are heavier than others).

# dis_compare.py
import dis

# Three ways to join a list of strings
def join_v1(words):
    result = ""
    for word in words:
        result += word + " "
    return result.strip()

def join_v2(words):
    return " ".join(words)

def join_v3(words):
    parts = []
    for word in words:
        parts.append(word)
    return " ".join(parts)

# Count instructions for each
for fn in [join_v1, join_v2, join_v3]:
    instructions = list(dis.get_instructions(fn))
    real_instructions = [i for i in instructions if i.opname != "RESUME"]
    print(f"{fn.__name__}: {len(real_instructions)} instructions")

print("\n=== join_v2 bytecode (winner) ===")
dis.dis(join_v2)

Output:

join_v1: 18 instructions
join_v2: 6 instructions
join_v3: 17 instructions
=== join_v2 bytecode (winner) ===
  2           0 RESUME                   0

  3           2 LOAD_CONST               1 (' ')
              4 LOAD_ATTR                1 (NULL|self + join)
             14 LOAD_FAST                0 (words)
             16 CALL                     1
             24 RETURN_VALUE

join_v2 uses just 5 real instructions: load the string ' ', load its join method, load the words argument, call the method, and return. The other versions loop in Python bytecode, which means executing multiple instructions per element. str.join() does the concatenation in C, which is why it’s significantly faster than loop-based string building for large lists.

Other dis Module Tools

Beyond dis.dis(), the module provides several other useful functions for programmatic bytecode analysis:

# dis_tools.py
import dis

def sample(x, y=10):
    z = x * y
    return z if z > 0 else -z

# get_instructions() -- iterator of Instruction namedtuples
print("=== get_instructions ===")
for instr in dis.get_instructions(sample):
    print(f"  {instr.offset:3d}  {instr.opname:<20} {instr.argrepr}")

# dis.code_info() -- compact summary of the code object
print("\n=== code_info ===")
print(dis.code_info(sample))

# dis.show_code() -- prints code_info to stdout
print("\n=== show_code ===")
dis.show_code(sample)

# Disassemble a code string directly
print("\n=== disassemble from string ===")
code_str = "x = [i**2 for i in range(5)]"
dis.dis(compile(code_str, "<string>", "exec"))

Output (abridged):

=== get_instructions ===
    0  RESUME                0
    2  LOAD_FAST             x
    4  LOAD_FAST             y
    6  BINARY_OP             *
   ...

=== code_info ===
Name:              sample
Filename:          dis_tools.py
Argument count:    2
...
Constants:         (None, 0)
Variable names:    x, y, z

dis.get_instructions() returns Instruction namedtuples with fields opname, opcode, arg, argval, argrepr, offset, starts_line, and is_jump_target. This is the programmatic interface for tools that analyze bytecode -- linters, optimizers, and coverage tools all use it.

Real-Life Example: Bytecode-Based Performance Audit

This audit tool identifies functions in a module that use slow patterns -- global variable access in loops, attribute chaining, or excessive stack operations -- and reports them with instruction counts.

Understanding Python bytecode instructions
Bytecode doesn't lie. Your function has 47 instructions. You thought it was simple.
# bytecode_audit.py
import dis
import types

def audit_function(fn):
    """
    Audit a function for common bytecode-level performance patterns.
    Returns a dict of findings.
    """
    instructions = list(dis.get_instructions(fn))
    opnames = [i.opname for i in instructions]

    # Count meaningful instructions (skip RESUME)
    total = sum(1 for op in opnames if op != "RESUME")

    # Detect LOAD_GLOBAL inside a FOR_ITER loop
    global_in_loop = 0
    in_loop = False
    for instr in instructions:
        if instr.opname == "FOR_ITER":
            in_loop = True
        if instr.opname == "RETURN_VALUE":
            in_loop = False
        if in_loop and instr.opname == "LOAD_GLOBAL":
            global_in_loop += 1

    # Count closure variable accesses
    deref_count = opnames.count("LOAD_DEREF") + opnames.count("STORE_DEREF")

    # Count function calls
    call_count = opnames.count("CALL") + opnames.count("CALL_FUNCTION")

    return {
        "name": fn.__name__,
        "total_instructions": total,
        "global_in_loop": global_in_loop,
        "deref_accesses": deref_count,
        "function_calls": call_count,
    }


# Test functions with different characteristics
MULTIPLIER = 10  # global variable

def process_list_slow(data):
    """Uses global in loop -- flagged by audit."""
    result = []
    for item in data:
        result.append(item * MULTIPLIER)  # LOAD_GLOBAL each iteration
    return result

def process_list_fast(data, multiplier=MULTIPLIER):
    """Caches global as local default -- not flagged."""
    return [item * multiplier for item in data]  # LOAD_FAST

def nested_closure(x):
    """Creates a closure -- deref accesses flagged."""
    factor = x * 2
    def inner(y):
        return y + factor  # LOAD_DEREF
    return inner

# Run audit
print(f"{'Function':<25} {'Instructions':>14} {'Global-in-loop':>15} {'Deref':>8} {'Calls':>8}")
print("-" * 75)
for fn in [process_list_slow, process_list_fast, nested_closure]:
    r = audit_function(fn)
    flag = " [!]" if r["global_in_loop"] > 0 else ""
    print(f"{r['name']:<25} {r['total_instructions']:>14} {r['global_in_loop']:>15}{flag}  {r['deref_accesses']:>6}  {r['call_count']:>6}")

Output:

Function                  Instructions  Global-in-loop    Deref   Calls
---------------------------------------------------------------------------
process_list_slow                   10               1 [!]         0       2
process_list_fast                    6               0               0       0
nested_closure                       6               0               0       0

process_list_slow is flagged because it accesses the global MULTIPLIER inside a FOR_ITER loop. Each LOAD_GLOBAL does a dictionary lookup in the global namespace, while LOAD_FAST (used in the fast version) reads from a pre-allocated local slot -- typically 3-5x faster. The fix is simple: assign the global to a local variable before the loop, or use a default argument as process_list_fast does.

Frequently Asked Questions

Does bytecode change between Python versions?

Yes -- CPython's bytecode is not guaranteed to be stable between minor versions. Python 3.12 introduced significant opcode changes (e.g., CALL_FUNCTION was renamed and restructured to CALL), and 3.13 added further changes for the free-threaded build. The dis module always reflects the bytecode of the running interpreter, so your disassembly output will match what CPython actually executes. Never rely on specific bytecode sequences across Python versions in production code.

Why is LOAD_FAST faster than LOAD_GLOBAL?

LOAD_FAST reads a variable from a fixed-size local variable array using an integer index -- it's essentially an array lookup with no hashing. LOAD_GLOBAL performs a hash table lookup in the module's __dict__, then falls back to builtins.__dict__ if not found there. The hash table lookup involves computing a hash, finding the slot, and handling potential collisions -- much slower for tight loops. The micro-optimization of caching globals as locals (_len = len before a loop) can matter in hot paths that execute millions of times.

Does CPython optimize bytecode?

Yes, CPython applies "peephole optimization" during compilation -- a limited set of constant-folding and dead-code elimination passes. For example, 2 * 3 in source code becomes the constant 6 in bytecode (no runtime multiplication). if False: ... branches are eliminated entirely. Python 3.13 introduced more aggressive optimization with the "quickening" mechanism that specializes frequently-executed instructions based on their runtime types. The dis module shows the post-optimization bytecode.

Can I modify bytecode at runtime?

Technically yes -- you can create a new types.CodeType object with modified bytecode and assign it to a function's __code__. Libraries like codetransformer and bytecode provide higher-level APIs for this. In practice, modifying bytecode is fragile (it breaks across Python versions), slow to implement correctly, and almost always unnecessary -- if you need runtime code transformation, Python's decorator system, ast module, or metaclasses are safer and more maintainable alternatives.

How does bytecode relate to PyPy or Cython?

CPython's bytecode is interpreted by the CPython VM -- each instruction is executed by a Python-level switch statement in C. PyPy uses a JIT compiler that compiles frequently-executed bytecode to native machine code at runtime, which is why it can be 5-10x faster for CPU-bound loops. Cython is a different approach: it compiles Python-like code to C extensions that bypass bytecode entirely. The dis module only shows CPython bytecode -- PyPy and Cython have their own internal representations that dis doesn't apply to.

Conclusion

The dis module gives you a window into what CPython actually does with your Python source code. The key tools are dis.dis() for human-readable bytecode output, dis.get_instructions() for programmatic analysis, dis.code_info() for code object metadata, and the __code__ attribute for direct code object inspection. The performance audit above is a practical starting point -- extend it to scan all functions in a module, add checks for attribute chaining inside loops, or integrate it into your CI pipeline as a performance regression detector.

The dis module is most valuable when combined with profiling: use cProfile or line_profiler to find the hot path, then use dis to understand why one implementation is faster than another. Together they give you both the "where" and the "why" of Python performance.

Official documentation: https://docs.python.org/3/library/dis.html

How To Use Python unicodedata for Unicode Operations

How To Use Python unicodedata for Unicode Operations

Intermediate

You’re building a search feature and a user types “cafe” — but the database has “cafe” stored both with and without the accent mark. Or you’re cleaning user-submitted data and need to strip diacritics to normalize names. Or you’re processing text from multiple sources and the “same” character appears in different Unicode representations: a composed form where the accent is part of the character, and a decomposed form where it’s a separate combining character. These aren’t edge cases; they’re everyday Unicode problems that trip up Python developers regularly.

Python’s built-in unicodedata module gives you direct access to the Unicode Character Database — the official reference for every character’s name, category, numeric value, and decomposition. It also provides the normalize() function that converts between the four Unicode normalization forms (NFC, NFD, NFKC, NFKD). No third-party packages needed.

In this tutorial, you’ll learn how to normalize Unicode strings with unicodedata.normalize(), look up character properties with name(), category(), and decimal(), strip diacritics to create ASCII-safe slugs, detect and filter characters by category, and build a robust Unicode text cleaning pipeline. By the end, you’ll be able to handle international text data reliably.

Python unicodedata: Quick Example

Here’s the classic problem — the same word in two different Unicode representations — and how to normalize it:

# unicodedata_quick.py
import unicodedata

# "cafe" in two forms -- visually identical but different bytes
composed = "caf\u00e9"       # NFC: e with acute as single char U+00E9
decomposed = "cafe\u0301"    # NFD: e followed by combining acute U+0301

print(f"Composed:   {composed!r} len={len(composed)}")
print(f"Decomposed: {decomposed!r} len={len(decomposed)}")
print(f"Equal?      {composed == decomposed}")

# Normalize both to NFC -- canonical composed form
nfc_1 = unicodedata.normalize("NFC", composed)
nfc_2 = unicodedata.normalize("NFC", decomposed)
print(f"After NFC:  {nfc_1!r} == {nfc_2!r} -> {nfc_1 == nfc_2}")

# Get character info
char = "\u00e9"
print(f"\nCharacter: {char}")
print(f"Name:      {unicodedata.name(char)}")
print(f"Category:  {unicodedata.category(char)}")

Output:

Composed:   'caf\xe9' len=4
Decomposed: 'cafe\u0301' len=5
Equal?      False
After NFC:  'caf\xe9' == 'caf\xe9' -> True

Character: e
Name:      LATIN SMALL LETTER E WITH ACUTE
Category:  Ll

The two strings are visually identical — both display as “cafe” — but Python’s == operator finds them different because they have different byte representations. After normalizing both to NFC, they compare equal. This is the root cause of “mysterious” string comparison bugs in internationalized applications.

What Is Unicode Normalization?

Unicode defines multiple ways to represent the same text. The character “e” with an acute accent can be a single precomposed character (U+00E9, “LATIN SMALL LETTER E WITH ACUTE”) or a base character “e” followed by a combining acute accent (U+0301). Both are valid Unicode; they just have different byte sequences. Normalization converts text to one canonical form so comparisons and processing work correctly.

FormNameWhat It DoesWhen To Use
NFCCanonical ComposedDecomposes then recomposes charactersStorage, comparison, web output
NFDCanonical DecomposedDecomposes into base + combining charsStripping diacritics (then filter combining)
NFKCCompatibility ComposedNFC + normalizes compatibility chars (e.g. ligatures)Search indexing, slug generation
NFKDCompatibility DecomposedNFD + normalizes compatibility charsAggressive text cleaning

The key practical distinction: NFC/NFD preserve all characters (just reorder their representation), while NFKC/NFKD also replace “compatibility equivalents” — for example, the Roman numeral “III” character (U+2162) becomes the three-letter string “III”, and the ligature “fi” (U+FB01) becomes “fi”. Use NFC for storing and comparing text; use NFKD for generating slugs and search indexes.

Unicode character lookup
unicodedata.lookup() finds characters by name. Your copy-paste does not scale to 149,878 code points.

Looking Up Character Properties

The unicodedata module exposes the full Unicode Character Database. For any character, you can get its official name, general category, numeric value, bidirectional class, and more. These properties let you write character-type tests that work across all scripts, not just ASCII.

# unicodedata_properties.py
import unicodedata

chars = [
    "A", "a", "5", " ", "!", "\n",
    "\u00e9",   # e with acute (Latin)
    "\u03b1",   # Greek small letter alpha
    "\u0660",   # Arabic-Indic digit zero
    "\u4e2d",   # CJK character (zhong, "middle")
    "\u200b",   # Zero-width space
    "\u0301",   # Combining acute accent
]

print(f"{'Char':<8} {'Category':<10} {'Name'}")
print("-" * 60)
for c in chars:
    name = unicodedata.name(c, "(no name)")
    cat = unicodedata.category(c)
    display = repr(c) if c in (" ", "\n", "\u200b") else c
    print(f"{display:<8} {cat:<10} {name}")

Output:

Char     Category   Name
------------------------------------------------------------
A        Lu         LATIN CAPITAL LETTER A
a        Ll         LATIN SMALL LETTER A
5        Nd         DIGIT FIVE
' '      Zs         SPACE
!        Po         EXCLAMATION MARK
'\n'     Cc         (no name)
e        Ll         LATIN SMALL LETTER E WITH ACUTE
a        Ll         GREEK SMALL LETTER ALPHA
0        Nd         ARABIC-INDIC DIGIT ZERO
[CJK]    Lo         CJK UNIFIED IDEOGRAPH-4E2D
'\u200b' Cf         ZERO WIDTH SPACE
[comb]   Mn         COMBINING ACUTE ACCENT

The two-letter category codes are key for script-agnostic character classification: Lu = uppercase letter, Ll = lowercase letter, Nd = decimal digit, Zs = space separator, Mn = non-spacing mark (combining character), Cf = format character. Using category codes instead of character ranges means your code handles Arabic, Greek, and CJK text without any modifications.

Stripping Diacritics and Creating Slugs

A common need in web applications is converting Unicode text to ASCII-safe slugs for URLs, filenames, or database keys. The standard approach uses NFD normalization (which splits composed characters into base + combining marks) followed by filtering out all combining mark characters (category Mn).

# unicodedata_slugify.py
import unicodedata
import re

def remove_diacritics(text):
    """
    Remove diacritical marks (accents) from text.
    'cafe' -> 'cafe', 'Muller' -> 'Muller'
    """
    # NFD splits 'e' into 'e' + combining acute
    nfd = unicodedata.normalize("NFD", text)
    # Filter out all combining marks (category Mn)
    return "".join(c for c in nfd if unicodedata.category(c) != "Mn")

def slugify(text):
    """
    Convert Unicode text to a URL-safe ASCII slug.
    'Python ist grossartig!' -> 'python-ist-grossartig'
    """
    # NFKD for compatibility decomposition (handles ligatures etc.)
    text = unicodedata.normalize("NFKD", text)
    # Remove combining marks
    text = "".join(c for c in text if unicodedata.category(c) != "Mn")
    # Convert to ASCII, ignoring non-ASCII characters
    text = text.encode("ascii", errors="ignore").decode("ascii")
    # Lowercase and replace non-alphanumeric with hyphens
    text = re.sub(r"[^\w\s-]", "", text.lower())
    text = re.sub(r"[-\s]+", "-", text).strip("-")
    return text

# Test cases
test_strings = [
    "Cafe de Paris",
    "Muller und Schroeder",
    "Espanol: Munoz, Sanchez",
    "Japanese: Nihongo (not convertible to ASCII)",
    "Ligatures: \ufb01ne (fi ligature)",
    "Symbols: C++ & Python 3.12",
]

print("=== remove_diacritics ===")
for s in test_strings[:4]:
    print(f"  {s!r}")
    print(f"  -> {remove_diacritics(s)!r}\n")

print("=== slugify ===")
for s in test_strings:
    print(f"  {s[:40]!r} -> {slugify(s)!r}")

Output:

=== remove_diacritics ===
  'Cafe de Paris'
  -> 'Cafe de Paris'

  'Muller und Schroeder'
  -> 'Muller und Schroeder'

  'Espanol: Munoz, Sanchez'
  -> 'Espanol: Munoz, Sanchez'

  'Japanese: Nihongo (not convertible to ASCII)'
  -> 'Japanese: Nihongo (not convertible to ASCII)'

=== slugify ===
  'Cafe de Paris' -> 'cafe-de-paris'
  'Muller und Schroeder' -> 'muller-und-schroeder'
  'Espanol: Munoz, Sanchez' -> 'espanol-munoz-sanchez'
  'Japanese: Nihongo (not convertible to AS' -> 'japanese-nihongo-not-convertible-to-ascii'
  'Ligatures: \ufb01ne (fi ligature)' -> 'ligatures-fine-fi-ligature'
  'Symbols: C++ & Python 3.12' -> 'symbols-c-python-312'

The NFKD normalization correctly converts the "fi" ligature (U+FB01) to the two-character sequence "fi" before the slug is built. CJK characters (Japanese, Chinese, Korean) have no ASCII equivalents so they're dropped by the encode("ascii", errors="ignore") step -- acceptable for slugs, but note that the original text should be stored separately for display.

Unicode category classification
unicodedata.category() returns 'Ll' for lowercase. Your regex returns confusion.

Detecting Character Types

Using unicodedata.category(), you can write character type checks that work for any script. This is more reliable than character ranges like [a-zA-Z] which miss non-Latin letters entirely.

# unicodedata_categories.py
import unicodedata

def is_letter(c):
    """True for any Unicode letter (Latin, Greek, Arabic, CJK, etc.)"""
    return unicodedata.category(c).startswith("L")

def is_digit(c):
    """True for any Unicode digit (Arabic-Indic, Devanagari, etc.)"""
    return unicodedata.category(c) == "Nd"

def is_whitespace(c):
    """True for any Unicode whitespace (space, NBSP, ideographic space, etc.)"""
    return unicodedata.category(c).startswith("Z") or c in "\t\n\r\f\v"

def is_punctuation(c):
    """True for any Unicode punctuation mark"""
    return unicodedata.category(c).startswith("P")

# Test mixed-script text
samples = [
    ("A", "Latin letter"),
    ("\u03b1", "Greek letter alpha"),
    ("\u4e2d", "CJK character"),
    ("\u0660", "Arabic-Indic digit"),
    ("5", "ASCII digit"),
    ("\u00a0", "Non-breaking space"),
    ("\u3000", "Ideographic space"),
    ("\u2019", "Right single quotation mark"),
    ("\u200b", "Zero-width space"),
]

print(f"{'Char':<8} {'Letter?':<10} {'Digit?':<10} {'Space?':<10} {'Punct?'}")
print("-" * 55)
for c, desc in samples:
    display = repr(c) if unicodedata.category(c) in ("Cf", "Zs") else c
    print(f"{display:<8} {is_letter(c)!s:<10} {is_digit(c)!s:<10} {is_whitespace(c)!s:<10} {is_punctuation(c)!s}")

Output:

Char     Letter?    Digit?     Space?     Punct?
-------------------------------------------------------
A        True       False      False      False
a        True       False      False      False
[CJK]    True       False      False      False
0        False      True       False      False
5        False      True       False      False
'\xa0'   False      False      True       False
'\u3000' False      False      True       False
'        False      False      False      True
'\u200b' False      False      False      False

Notice that zero-width space (U+200B) is neither a letter, digit, space, nor punctuation -- its category is Cf (format character). This is exactly the kind of invisible character that can silently corrupt text processing. The category check lets you detect and strip these: "".join(c for c in text if unicodedata.category(c) != "Cf").

Real-Life Example: Unicode Text Cleaning Pipeline

This pipeline normalizes and cleans text from diverse sources (web scraping, user input, CSV imports) into a consistent, processable form.

Legacy encoding with unicodedata
unicodedata.normalize() has been handling your encoding bugs since before you knew they were bugs.
# unicode_cleaner.py
import unicodedata
import re

# Category prefixes for invisible/formatting characters to strip
STRIP_CATEGORIES = {"Cf", "Cc"}  # format chars and control chars (except \n\t)
KEEP_CONTROL = {"\n", "\t", "\r"}

def clean_unicode_text(text, normalize_form="NFC", strip_accents=False, ascii_only=False):
    """
    Clean and normalize Unicode text.

    normalize_form: NFC (default), NFD, NFKC, or NFKD
    strip_accents:  Remove diacritical marks (NFD must be applied first)
    ascii_only:     Encode to ASCII, drop non-ASCII chars
    """
    if not isinstance(text, str):
        raise TypeError(f"Expected str, got {type(text).__name__}")

    # Step 1: Normalize
    text = unicodedata.normalize(normalize_form, text)

    # Step 2: Strip combining marks (diacritics) if requested
    # Note: only makes sense after NFD or NFKD normalization
    if strip_accents:
        if normalize_form not in ("NFD", "NFKD"):
            text = unicodedata.normalize("NFD", text)
        text = "".join(c for c in text if unicodedata.category(c) != "Mn")

    # Step 3: Remove invisible formatting characters (zero-width space, BOM, etc.)
    text = "".join(
        c for c in text
        if unicodedata.category(c) not in STRIP_CATEGORIES or c in KEEP_CONTROL
    )

    # Step 4: Normalize whitespace (collapse multiple spaces, strip NBSP)
    text = re.sub(r"[\u00a0\u202f\u2009\u2008\u2007\u2006\u2005\u2004\u2003\u2002\u2001\u2000]", " ", text)
    text = re.sub(r" {2,}", " ", text)

    # Step 5: ASCII-only output if requested
    if ascii_only:
        text = text.encode("ascii", errors="ignore").decode("ascii")

    return text.strip()


# Sample dirty text from a web scrape
samples = [
    "caf\u00e9 vs caf\u0065\u0301",           # composed vs decomposed
    "Hello\u200b World",                         # zero-width space between words
    "Price:\u00a0$9.99",                         # non-breaking space
    "Se\u00f1or Mu\u00f1oz",                    # Spanish with tildes
    "Python\ufeff Tutorial",                     # BOM character mid-string
    "Multi\n\tline\ttext",                       # control chars to keep
]

print(f"{'Original':<40} {'Cleaned (NFC)'}")
print("-" * 80)
for s in samples:
    cleaned = clean_unicode_text(s, normalize_form="NFC")
    print(f"{repr(s):<40} {repr(cleaned)}")

print("\n=== strip_accents=True ===")
for s in samples[3:4]:
    print(f"  Original: {repr(s)}")
    print(f"  Cleaned:  {repr(clean_unicode_text(s, normalize_form='NFD', strip_accents=True))}")

Output:

Original                                 Cleaned (NFC)
--------------------------------------------------------------------------------
'caf\xe9 vs cafe\u0301'                  'cafe vs cafe'
'Hello\u200b World'                      'Hello World'
'Price:\xa0$9.99'                        'Price: $9.99'
'Se\xf1or Mu\xf1oz'                     'Senor Munoz'
'Python\ufeff Tutorial'                  'Python Tutorial'
'Multi\n\tline\ttext'                    'Multi\n\tline\ttext'

=== strip_accents=True ===
  Original: 'Se\xf1or Mu\xf1oz'
  Cleaned:  'Senor Munoz'

The pipeline handles five distinct problems in order: normalization form unification, invisible character removal, non-standard whitespace normalization, and optionally diacritic stripping and ASCII-only output. Running all incoming text through this pipeline before storage or comparison eliminates a whole class of hard-to-debug string matching bugs.

Frequently Asked Questions

When should I use NFC vs NFKC for normalization?

Use NFC for general text storage and display -- it's the most compact composed form and what most operating systems and web browsers produce. Use NFKC when building a search index, generating slugs, or comparing text where "compatibility equivalents" (ligatures, circled letters, Roman numerals as single characters) should match their plain-text equivalents. NFKC is more aggressive: it will change the meaning of certain characters (the ligature "fi" becomes "fi"), so only apply it to normalized search keys, not to the stored display text.

Where can I find the full list of Unicode category codes?

The Unicode standard defines 30 general categories grouped under 7 major groups: L (letters), M (marks), N (numbers), P (punctuation), S (symbols), Z (separators), and C (other/control). The two-letter code is major + minor category, e.g. Lu = Letter Uppercase, Nd = Number Decimal Digit. The full table is in Unicode Standard Annex #44 at unicode.org, or search for "Unicode general category" in the Python documentation.

What's the difference between unicodedata.decimal() and unicodedata.numeric()?

decimal(char) returns an integer for characters that represent decimal digits (0-9) in any script -- Arabic-Indic "0" returns 0, ASCII "5" returns 5. It returns None (or a default) for non-digit characters. digit(char) is similar but also includes superscript/subscript digit forms. numeric(char) is the broadest: it returns a float for any character with a numeric value, including fractions (one-half U+00BD returns 0.5) and Roman numerals. Use decimal() when you want to extract actual digit values from text, and isnumeric() on strings when you just want a boolean test.

How do I detect and fix Unicode corruption (mojibake)?

Mojibake (garbled text from encoding mismatches) typically happens when UTF-8 bytes are decoded as latin-1 or Windows-1252. The character "e" in UTF-8 is bytes 0xC3 0xA9, which in latin-1 reads as "A-" + copyright symbol. The ftfy library (pip install ftfy) automatically detects and fixes most mojibake patterns. For the unicodedata module specifically, you can detect it by checking if a string contains characters whose names include "LATIN SMALL LETTER A WITH TILDE" or similar combined-character sequences that shouldn't appear in ordinary prose.

Does unicodedata help with right-to-left text (Arabic, Hebrew)?

The unicodedata.bidirectional(char) function returns the bidirectional category of a character: "L" for left-to-right (Latin), "R" for right-to-left (Hebrew letters), "AL" for Arabic letters, "AN" for Arabic numbers, and so on. This is useful for detecting the text direction of a string and for flagging bidirectional override characters (categories "RLO" and "LRO") that have been abused in security attacks (the "trojan source" vulnerability). Always strip bidirectional override characters from untrusted input.

Conclusion

The unicodedata module gives you direct access to the Unicode Character Database from Python's standard library. The key tools are normalize() for ensuring consistent text representation (NFC for storage, NFKD for slug generation), category() for script-agnostic character classification, name() for debugging and introspection, and decimal()/numeric() for extracting numeric values from any script's digit characters. The text cleaning pipeline above is a production-ready foundation -- extend it with the ftfy library for mojibake correction or langdetect for script-aware processing.

For comprehensive international text handling beyond what unicodedata provides -- transliteration, locale-aware collation, and script detection -- look at the PyICU library, which provides Python bindings to IBM's International Components for Unicode.

Official documentation: https://docs.python.org/3/library/unicodedata.html

How To Use Python textwrap Module for Text Formatting

How To Use Python textwrap Module for Text Formatting

Beginner

You’ve written a Python CLI tool or a report generator, and your output looks great on your 4K monitor — then someone runs it in a terminal that’s 80 columns wide and the text spills across three lines, or you paste a multiline string into your code and the indentation is off because you indented it to match the function body. These are solvable problems, and Python’s built-in textwrap module is the tool for both of them.

textwrap ships with every Python installation and handles the most common text formatting tasks: wrapping long strings to a fixed width, removing excess indentation from multiline strings, truncating text with an ellipsis, and adding a consistent indent prefix. It’s especially valuable for CLI tools, log formatters, email body generators, and docstring utilities.

In this tutorial, you’ll learn how to use textwrap.wrap() and textwrap.fill() for line wrapping, textwrap.dedent() to strip indentation from multiline strings, textwrap.shorten() to truncate with an ellipsis, and textwrap.indent() to add prefix strings to text. By the end, you’ll have a solid text formatting toolkit for any Python output-heavy project.

Python textwrap: Quick Example

Here’s the module solving the three most common problems in a few lines:

# textwrap_quick.py
import textwrap

long_text = (
    "Python's textwrap module provides convenient functions for "
    "wrapping and filling text, as well as for dedenting and indenting "
    "text blocks. It is useful for formatting help messages and output "
    "in CLI tools, or for processing raw strings from user input."
)

# Wrap to 60 characters per line
wrapped = textwrap.wrap(long_text, width=60)
print("=== Wrapped (60 cols) ===")
for line in wrapped:
    print(line)

# Fill returns a single string instead of a list
print("\n=== fill() ===")
print(textwrap.fill(long_text, width=60))

# Shorten to 80 chars with ellipsis
print("\n=== Shortened ===")
print(textwrap.shorten(long_text, width=80, placeholder=" ..."))

Output:

=== Wrapped (60 cols) ===
Python's textwrap module provides convenient functions
for wrapping and filling text, as well as for dedenting
and indenting text blocks. It is useful for formatting
help messages and output in CLI tools, or for processing
raw strings from user input.

=== fill() ===
Python's textwrap module provides convenient functions
for wrapping and filling text, as well as for dedenting
and indenting text blocks. It is useful for formatting
help messages and output in CLI tools, or for processing
raw strings from user input.

=== Shortened ===
Python's textwrap module provides convenient functions for wrapping
and filling text, as well as for dedenting and indenting ...

textwrap.wrap() returns a list of strings (lines), while textwrap.fill() joins them with newlines and returns a single string. Use wrap() when you need to process individual lines; use fill() when you just want the formatted block. shorten() collapses whitespace and truncates at the nearest word boundary before the limit.

What Is the textwrap Module?

The textwrap module implements the “greedy” line-wrapping algorithm: it places as many words on each line as will fit within the specified width, then starts a new line. This matches how most word processors and terminal tools handle wrapping. It also normalizes whitespace — multiple spaces and embedded newlines are collapsed to a single space before wrapping, which makes it safe to use on raw strings that came from user input or text files.

FunctionReturnsUse Case
wrap(text, width)List of stringsWhen you need individual lines
fill(text, width)Single stringReady-to-print formatted block
dedent(text)Single stringStrip common leading whitespace
indent(text, prefix)Single stringAdd prefix to lines
shorten(text, width)Single stringTruncate with ellipsis
TextWrapperObjectReuse config across many wraps
Text overflowing its container
textwrap.fill() is what happens when you actually care about terminal output.

Wrapping and Filling Text

Both wrap() and fill() accept the same set of keyword arguments for fine-grained control. The most useful ones are initial_indent (prefix for the first line), subsequent_indent (prefix for continuation lines), and break_long_words (what to do with words longer than the width).

# textwrap_wrap.py
import textwrap

description = (
    "Deploying to production: first, run the full test suite. "
    "Then update the CHANGELOG.md with release notes. "
    "Finally, tag the release and push to main. "
    "Rollback procedure: revert the tag and redeploy the previous artifact."
)

# Hanging indent (first line flush, rest indented)
hanging = textwrap.fill(
    description,
    width=65,
    initial_indent="",
    subsequent_indent="  ",
)
print("=== Hanging indent ===")
print(hanging)

# Bullet-point style
bullet = textwrap.fill(
    description,
    width=65,
    initial_indent="* ",
    subsequent_indent="  ",
)
print("\n=== Bullet point ===")
print(bullet)

# Handle words longer than width (e.g. long URLs)
url_text = "Documentation: https://docs.python.org/3/library/textwrap.html for full API reference."
print("\n=== Long word, break_long_words=True ===")
print(textwrap.fill(url_text, width=40, break_long_words=True))
print("\n=== Long word, break_long_words=False ===")
print(textwrap.fill(url_text, width=40, break_long_words=False))

Output:

=== Hanging indent ===
Deploying to production: first, run the full test suite.
  Then update the CHANGELOG.md with release notes.
  Finally, tag the release and push to main. Rollback
  procedure: revert the tag and redeploy the previous
  artifact.

=== Bullet point ===
* Deploying to production: first, run the full test
  suite. Then update the CHANGELOG.md with release notes.
  Finally, tag the release and push to main. Rollback
  procedure: revert the tag and redeploy the previous
  artifact.

=== Long word, break_long_words=True ===
Documentation: https://docs.python.or
g/3/library/textwrap.html for full
API reference.

=== Long word, break_long_words=False ===
Documentation:
https://docs.python.org/3/library/textwrap.html
for full API reference.

The hanging indent pattern is common in help text and changelogs: the first line is flush and subsequent lines are indented to align with the start of the text. Set break_long_words=False for URLs — splitting a URL mid-word makes it unclickable in terminals. The URL will overflow the column width, but that’s preferable to a broken link.

Dedenting Multiline Strings

textwrap.dedent() solves a common problem with multiline strings in Python: when you write a multiline string inside a function or class, you indent it to match the code, but that indentation becomes part of the string. dedent() strips the common leading whitespace from all lines, giving you a clean string without the code indentation baked in.

# textwrap_dedent.py
import textwrap

def generate_email_body(name, issue):
    # This multiline string is indented to match the function body
    # but that indentation would appear in the email output
    body = f"""
        Hi {name},

        We've received your report about: {issue}.

        Our team will review it within 24 hours.

        Best regards,
        Support Team
    """
    # dedent() removes the common indentation (8 spaces here)
    return textwrap.dedent(body).strip()

print(generate_email_body("Alex", "login not working"))

# Compare: without dedent
def broken_version(name):
    msg = """
        Hello {name}.
        This text has leading spaces on every line.
    """
    return msg  # indentation baked into the string

print(repr(broken_version("Alex")[:60]))
print(repr(generate_email_body("Alex", "test")[:60]))

Output:

Hi Alex,

We've received your report about: login not working.

Our team will review it within 24 hours.

Best regards,
Support Team

'\n        Hello {name}.\n        This text has leading spa'
'Hi Alex,\n\nWe\'ve received your report about: test.\n\nOur'

dedent() looks at all non-empty lines and strips the longest common leading whitespace prefix from every line. The .strip() call at the end removes the leading and trailing newlines that come from the triple-quoted string’s opening and closing lines. Combining dedent() + strip() is the idiomatic pattern for clean multiline strings in Python functions.

Measuring text column widths
textwrap.dedent() removes indentation that your IDE added and your docstring didn’t ask for.

Adding Indentation with indent()

textwrap.indent(text, prefix) adds a prefix to the beginning of every line in the text. By default it applies to all lines; pass a predicate function to control which lines get the prefix. This is useful for adding quote markers to email replies, adding comment characters to code, or building nested structures.

# textwrap_indent.py
import textwrap

original = """Python was created by Guido van Rossum.
It was released in 1991.

The language emphasizes readability.
Its design philosophy is documented in the Zen of Python."""

# Add ">" prefix for email-style quoting
quoted = textwrap.indent(original, "> ")
print("=== Email quote ===")
print(quoted)

# Add "# " only to non-empty lines (skip blank lines)
commented = textwrap.indent(
    original,
    prefix="# ",
    predicate=lambda line: line.strip()  # True for non-empty lines
)
print("\n=== Code comment (skip blanks) ===")
print(commented)

# Nested indentation
inner = textwrap.fill("This is a long description of a feature.", width=40)
nested = textwrap.indent(inner, "    ")  # 4-space indent
print(f"\n=== Nested ===\nFeature:\n{nested}")

Output:

=== Email quote ===
> Python was created by Guido van Rossum.
> It was released in 1991.
>
> The language emphasizes readability.
> Its design philosophy is documented in the Zen of Python.

=== Code comment (skip blanks) ===
# Python was created by Guido van Rossum.
# It was released in 1991.

# The language emphasizes readability.
# Its design philosophy is documented in the Zen of Python.

=== Nested ===
Feature:
    This is a long description of a
    feature.

The predicate function receives each line (including the newline character) and returns True if the prefix should be added. Using lambda line: line.strip() evaluates to False for lines containing only whitespace or newlines — exactly what you want to skip blank lines. Without the predicate, blank lines get the prefix too, which produces > on otherwise empty lines.

Using the TextWrapper Class

When you need to wrap many strings with the same settings, instantiate textwrap.TextWrapper once and reuse it. This is more efficient than passing keyword arguments to fill() on every call, and it makes your configuration explicit and named.

# textwrap_wrapper.py
import textwrap

# Define a formatter for CLI help text
cli_formatter = textwrap.TextWrapper(
    width=72,
    initial_indent="  ",
    subsequent_indent="    ",
    break_long_words=False,
    break_on_hyphens=False,
)

commands = {
    "deploy": "Deploy the application to the target environment. Reads configuration from deploy.yaml in the project root.",
    "rollback": "Revert to the previous successful deployment. Specify --tag to rollback to a specific release tag.",
    "status": "Show the current deployment status, health check results, and last 5 deployment events.",
}

print("Available commands:\n")
for cmd, description in commands.items():
    print(f"  {cmd}")
    print(cli_formatter.fill(description))
    print()

Output:

Available commands:

  deploy
  Deploy the application to the target environment. Reads
    configuration from deploy.yaml in the project root.

  rollback
  Revert to the previous successful deployment. Specify
    --tag to rollback to a specific release tag.

  status
  Show the current deployment status, health check results,
    and last 5 deployment events.

break_on_hyphens=False prevents the wrapper from breaking hyphenated words like --tag across lines, which would make CLI flag names unreadable. The TextWrapper instance exposes all the same options as the module-level functions, but you configure them once and reuse the object.

Real-Life Example: CLI Report Formatter

This utility formats a structured data report for terminal output, handling variable-length text gracefully across different terminal widths.

Wrapping text around a spool
textwrap.shorten() truncates at word boundaries. Your substr() does not.
# cli_report.py
import textwrap
import shutil

def format_report(title, items, terminal_width=None):
    """
    Format a list of (label, description) tuples as a terminal report.
    Automatically adapts to terminal width.
    """
    if terminal_width is None:
        terminal_width = shutil.get_terminal_size(fallback=(80, 24)).columns

    label_width = 20
    text_width = terminal_width - label_width - 4  # 4 for separator + padding

    separator = "-" * terminal_width
    wrapper = textwrap.TextWrapper(
        width=text_width,
        break_long_words=False,
        break_on_hyphens=False,
    )

    lines = [separator, title.center(terminal_width), separator]

    for label, description in items:
        wrapped_desc = wrapper.wrap(description)
        if not wrapped_desc:
            wrapped_desc = ["(no description)"]

        # First line: label + first line of description
        label_str = f"  {label:<{label_width - 2}}"
        lines.append(f"{label_str}  {wrapped_desc[0]}")

        # Continuation lines: blank label area + rest of description
        padding = " " * label_width
        for cont_line in wrapped_desc[1:]:
            lines.append(f"{padding}  {cont_line}")

    lines.append(separator)
    return "\n".join(lines)


# Sample report data
report_items = [
    ("Status", "All systems operational. Last health check passed 2 minutes ago."),
    ("Indexed Pages", "1 of 149 articles indexed by Google as of 2026-04-23. Recovery actions in progress."),
    ("Last Deploy", "2026-04-20 14:32 UTC -- deployed 5 new articles via REST API pipeline."),
    ("Open Issues", "Image backlog: 47 posts missing featured images. Category audit: 0 uncategorized posts remaining."),
]

print(format_report("== PythonHowToProgram.com Daily Status ==", report_items, terminal_width=78))

Output:

------------------------------------------------------------------------------
              == PythonHowToProgram.com Daily Status ==
------------------------------------------------------------------------------
  Status              All systems operational. Last health check passed 2
                      minutes ago.
  Indexed Pages       1 of 149 articles indexed by Google as of 2026-04-23.
                      Recovery actions in progress.
  Last Deploy         2026-04-20 14:32 UTC -- deployed 5 new articles via
                      REST API pipeline.
  Open Issues         Image backlog: 47 posts missing featured images.
                      Category audit: 0 uncategorized posts remaining.
------------------------------------------------------------------------------

The key technique here is the two-pass approach: format the first line of each item with the label, then use blank-label padding for all continuation lines. shutil.get_terminal_size() reads the actual terminal width, so the report adapts automatically whether someone runs it in a narrow SSH session or a wide desktop terminal.

Frequently Asked Questions

When should I use wrap() vs fill()?

Use wrap() when you need to process individual lines -- for example, to add line numbers, apply different formatting to the first line, or join them with a custom delimiter. Use fill() when you just need a ready-to-print string. fill(text, width) is literally equivalent to "\n".join(wrap(text, width)) -- it's just the common case wrapped in a convenience function.

Why do I need .strip() after dedent()?

dedent() only removes leading whitespace -- it doesn't remove the leading newline that a triple-quoted string gets from its opening """ followed by a newline, or the trailing newline before the closing """. The idiom textwrap.dedent(text).strip() handles both: dedent() removes the indentation on each line, and .strip() removes the leading and trailing blank lines. If you want to preserve intentional leading/trailing newlines, use .lstrip('\n') and .rstrip('\n') instead of .strip().

Does textwrap.fill() preserve existing newlines in the text?

No -- fill() and wrap() collapse all whitespace, including newlines, into single spaces before wrapping. If you have a paragraph-separated text and want to wrap each paragraph independently, split the text on double newlines first, wrap each paragraph, then rejoin. The TextWrapper class has a expand_tabs parameter (default True) and a fix_sentence_endings parameter that adds two spaces after sentence-ending punctuation -- but newlines are always normalized away.

Does textwrap handle Unicode and non-Latin text correctly?

For Latin-script languages, yes. For CJK (Chinese, Japanese, Korean) characters, the default width calculations assume each character is one column wide, but CJK characters are double-width in terminals -- so a 80-column wrap will visually overflow. The wcwidth library provides correct Unicode display width calculations; for CJK-heavy text, use it to calculate line widths manually rather than relying on textwrap.

Is textwrap efficient for large volumes of text?

For typical CLI output and report generation, performance is not a concern. For very large documents (hundreds of thousands of lines), the pure-Python implementation will be slower than tools written in C. In those cases, consider streaming the text through wrap() in chunks rather than wrapping the entire document at once. The TextWrapper class is reusable and avoids rebuilding the regex patterns on every call, which gives a modest speedup when wrapping thousands of strings with the same settings.

Conclusion

The textwrap module covers the four most common text formatting needs in Python: wrap()/fill() for line wrapping, dedent() for cleaning up multiline strings, indent() for adding prefix characters, and shorten() for truncation with an ellipsis. The TextWrapper class lets you configure these options once and reuse them efficiently. Combined with shutil.get_terminal_size() for adaptive terminal width, you have everything needed to build professional CLI output without third-party dependencies.

The CLI report formatter above is a ready-to-use starting point -- extend it with color support using the rich library, add a progress bar, or connect it to real data from your application's monitoring endpoints.

Official documentation: https://docs.python.org/3/library/textwrap.html

How To Parse and Validate URLs in Python

How To Parse and Validate URLs in Python

Intermediate

URLs show up everywhere in Python code — web scrapers pull them from HTML, APIs return them in JSON payloads, oLI tools accept them as arguments, and configuration files store them as connection strings. Treating a URL as a plain string works until it doesn’t: the query string has unescaped spaces, the path contains special characters, or you need to swap the hostname without breaking the rest of the URL. Parsing URLs properly — splitting them into scheme, host, path, and query parameters — is a fundamental skill for anyone working with the web in Python.

Python’s standard library includes urllib.parse, a complete URL parsing toolkit that requires no installation. It handles splitting, joining, encoding, decoding, and modifying URLs according to RFC 3986. For availability beyond “does it parse?” you can combine it with a regex or the third-party validators library. Everything in this tutorial runs on Python 3.x with zero dependencies.

In this tutorial, you’ll learn how to parse URLs into components with urlparse(), extract and modify query parameters with parse_qs() and urlencode(), encode special characters with quote() and quote_plus(), build URLs safely with urljoin(), and validate URLs before sending them to external services. By the end, you’ll have a reusable URL utility class you can drop into any project.

Parsing URLs in Python: Quick Example

Here’s thhe core pattern — parse a URL and access its individual components:

# url_quick.py
from urllib.parse import urlparse, parse_qs, urlencode, urljoin

url = "https://jsonplaceholder.typicode.com/posts?userId=1&_limit=5"

# Parse into components
parsed = urlparse(url)
print("Scheme:  ", parsed.scheme)
print("Host:    ", parsed.netloc)
print("Path:    ", parsed.path)
print("Query:   ", parsed.query)

# Parse query string into a dict
params = parse_qs(parsed.query)
print("Params:  ", params)

# Modify a param and rebuild
params["_limit"] = ["10"]
new_query = urlencode(params, doseq=True)
new_url = parsed._replace(query=new_query).geturl()
print("New URL: ", new_url)

Output:

Scheme:   https
Host:     jsonplaceholder.typicode.com
Path:     /posts
Query:    userId=1&_limit=5
Params:   {'userId': ['1'], '_limit': ['5']}
New URL:  https://jsonplaceholder.typicode.com/posts?userId=%5B%2710%27%5D&_limit=%5B%275%27%5D

urlparse() returns a ParseResult named tuple with six fields: scheme, netloc, path, params, query, and fragment. The _replace() method (standard on named tuples) creates a modified copy without mutating the original. parse_qs() returns a dict where each value is a list, because HTTP allows multiple values per key — always expect lists, not strings, from parse_qs().

The sections below go deeper into each operation, including proper encoding, URL building, and validation patterns.

What Is urllib.parse?

A URL has a defined structure specified in RFC 3986. The full format is:

scheme://netloc/path;params?query#fragment
  |        |      |     |      |       |
  https  host:port /search  q=py  #top

urllib.parse implements the RFC correctly, handling edge cases like IPv6 hosts, missing schemes, URL-encoded characters, and relative URLs. It’s the right tool for all URL manipulation; treating a URL as a string and slicing it manually will break on any non-trivial input.

FunctionWhat It DoesWhen To Use It
urlparse()Splits URL into 6 componentsReading/inspecting a URL
urlunparse()Joins 6 components into a URLRebuilding after modification
parse_qs()Query string to dict of listsReading query parameters
parse_qsl()Query string to list of tuplesOrder-preserving param parsing
urlencode()Dict or list to query stringBuilding query strings
quote()Percent-encode a stringEncoding path segments
quote_plus()Percent-encode, spaces as +Encoding form data
unquote()Decode percent-encoded stringDecoding URL components
urljoin()Resolve relative URL against baseBuilding absolute URLs from relative ones
Dissecting URL components
urlparse() splits your URL into six pieces. You only needed three. Read the docs.

Parsing URL Components

Let’s examine urlparse() in detail. The function handles URLs with missing components gracefully — missing parts return empty strings, not exceptions. This makes it safe to use on untrusted or incomplete URLs without wrapping every call in a try/except.

# url_parse_components.py
from urllib.parse import urlparse

urls = [
    "https://api.github.com/repos/python/cpython/issues?state=open&per_page=5",
    "//cdn.example.org/assets/style.css",   # protocol-relative URL
    "/relative/path?foo=bar",               # relative URL
    "ftp://files.server.com:21/pub/data",   # FTP with port
    "mailto:user@example.com",              # non-HTTP scheme
]

for url in urls:
    p = urlparse(url)
    print(f"URL: {url[:50]}")
    print(f"  scheme={p.scheme!r}, netloc={p.netloc!r}, path={p.path!r}")
    print(f"  query={p.query!r}, fragment={p.fragment!r}")
    print(f"  hostname={p.hostname!r}, port={p.port!r}")
    print()

Output:

URL: https://api.github.com/repos/python/cpython/issues
  scheme='https', netloc='api.github.com', path='/repos/python/cpython/issues'
  query='state=open&per_page=5', fragment=''
  hostname='api.github.com', port=None

URL: //cdn.example.org/assets/style.css
  scheme='', netloc='cdn.example.org', path='/assets/style.css'
  query='', fragment=''
  hostname='cdn.example.org', port=None

URL: /relative/path?foo=bar
  scheme='', netloc='', path='/relative/path'
  query='foo=bar', fragment=''
  hostname=None, port=None

URL: ftp://files.server.com:21/pub/data
  scheme='ftp', netloc='files.server.com:21', path='/pub/data'
  query='', fragment=''
  hostname='files.server.com', port=21

URL: mailto:user@example.com
  scheme='mailto', netloc='', path='user@example.com'
  query='', fragment=''
  hostname=None, port=None

Note that parsed.hostname is always lowercase and strips the port, while parsed.netloc preserves the original case and includes the port. Use parsed.hostname for comparison and parsed.netloc for rebuilding. Protocol-relative URLs (//cdn...) have an empty scheme — check for this when validating that a URL is fully qualified.

Working with Query Parameters

Query strings are where URL handling gets messy. Multiple values for the same key, URL-encoded characters, plus signs vs. %20 for spaces — parse_qs() and urlencode() handle all of this correctly.

# url_query_params.py
from urllib.parse import parse_qs, parse_qsl, urlencode, urlparse, urlunparse

raw_query = "tags=python&tags=web&tags=beginner&q=url+parsing&page=2"

# parse_qs: values are always lists
params = parse_qs(raw_query)
print("parse_qs:", params)

# parse_qsl: order-preserving list of tuples
params_list = parse_qsl(raw_query)
print("parse_qsl:", params_list)

# Build a new query string
new_params = {"q": "urllib.parse tutorial", "lang": "en", "page": 1}
query_string = urlencode(new_params)
print("urlencode:", query_string)

# Multi-value params need doseq=True
multi = {"tags": ["python", "web", "tutorial"], "page": 1}
print("multi:", urlencode(multi, doseq=True))

# Modify one param in an existing URL
url = "https://jsonplaceholder.typicode.com/posts?userId=1&_limit=5"
parsed = urlparse(url)
params = dict(parse_qsl(parsed.query))
params["_limit"] = "20"      # change the limit
params["_sort"] = "title"    # add a new param
new_query = urlencode(params)
modified_url = urlunparse(parsed._replace(query=new_query))
print("Modified:", modified_url)

Output:

parse_qs: {'tags': ['python', 'web', 'beginner'], 'q': ['url parsing'], 'page': ['2']}
parse_qsl: [('tags', 'python'), ('tags', 'web'), ('tags', 'beginner'), ('q', 'url parsing'), ('page', '2')]
urlencode: q=urllib.parse+tutorial&lang=en&page=1
multi: tags=python&tags=web&tags=tutorial&page=1
Modified: https://jsonplaceholder.typicode.com/posts?userId=1&_limit=20&_sort=title

Two important details: parse_qs() silently decodes + as a space (form-encoded convention), so q=url+parsing becomes {'q': ['url parsing']}. And urlencode() by default encodes spaces as + (safe for query strings); use quote(string, safe='') if you need percent-encoding instead. The doseq=True flag in urlencode() is required when values are lists — without it, you’d get tags=%5B%27python%27... (a stringified Python list, which is wrong).

URL query parameters decoded
urlencode() handles your spaces. No more %20 panic attacks.

URL Encoding and Decoding

URL encoding (percent-encoding) converts characters that aren’t safe in URLs into their %XX hex equivalents. Python provides quote() for path segments, quote_plus() for query values (spaces become +), and unquote()/unquote_plus() to reverse the process.

# url_encoding.py
from urllib.parse import quote, quote_plus, unquote, unquote_plus

# Encoding path segments (/ should stay, spaces become %20)
path = "/search/Python web scraping"
encoded_path = quote(path)
print("quote:", encoded_path)

# Encoding query values (spaces become +, / becomes %2F)
search_term = "Python web scraping & parsing"
encoded_query = quote_plus(search_term)
print("quote_plus:", encoded_query)

# Decoding
print("unquote:", unquote("/search/Python%20web%20scraping"))
print("unquote_plus:", unquote_plus("Python+web+scraping+%26+parsing"))

# The 'safe' parameter: characters to NOT encode
# By default, quote() treats '/' as safe
print("safe=/:", quote("/api/v2/search?q=hello world"))
print("safe='':", quote("/api/v2/search?q=hello world", safe=""))

# Build a complete URL with encoded components
base = "https://httpbin.org"
path_segment = quote("/get", safe="")
query = quote_plus("Hello World & more")
full_url = f"{base}{path_segment}?data={query}"
print("Full URL:", full_url)

Output:

quote: /search/Python%20web%20scraping
quote_plus: Python+web+scraping+%26+parsing
unquote: /search/Python web scraping
unquote_plus: Python web scraping & parsing
safe=/: /api/v2/search%3Fq%3Dhello%20world
safe='': %2Fapi%2Fv2%2Fsearch%3Fq%3Dhello%20world
Full URL: https://httpbin.org%2Fget?data=Hello+World+%26+more

The safe parameter is the key to correct encoding. quote() defaults to treating / as safe (not encoded), which is correct for path components. For query values, use quote_plus() which encodes everything including / and converts spaces to +. Never double-encode — calling quote() on an already-encoded string produces %2520 (the % itself gets encoded). Always start from the raw, unencoded value.

Joining and Resolving URLs

When scraping web pages, you’ll often find relative URLs in href attributes like /about or ../images/logo.png. urljoin() resolves these against a base URL, implementing the same logic browsers use.

# url_join.py
from urllib.parse import urljoin

base = "https://quotes.toscrape.com/page/2/"

# These are typical hrefs found in scraped pages
hrefs = [
    "/author/Albert-Einstein",   # absolute path
    "page/3/",                   # relative path
    "../tags/",                  # parent-relative path
    "//cdn.server.com/logo.png", # protocol-relative
    "https://other.com/page",    # already absolute -- returned as-is
]

for href in hrefs:
    resolved = urljoin(base, href)
    print(f"  {href!r:40s} -> {resolved}")

Output:

  '/author/Albert-Einstein'          -> https://quotes.toscrape.com/author/Albert-Einstein
  'page/3/'                          -> https://quotes.toscrape.com/page/3/
  '../tags/'                         -> https://quotes.toscrape.com/tags/
  '//cdn.server.com/logo.png'        -> https://cdn.server.com/logo.png
  'https://other.com/page'           -> https://other.com/page

urljoin() follows RFC 3986 resolution rules. An absolute path (/author/...) discards the base URL’s path entirely. A relative path (page/3/) is resolved relative to the last / in the base path. A protocol-relative URL (//cdn...) inherits the scheme from the base. An already-absolute URL is returned unchanged. This makes urljoin() safe to call on any href you extract from a web page, regardless of its form.

Real-Life Example: URL Validator and Normalizer

This utility class validates URLs for a web scraper pipeline — checks that they have an expected scheme, strips tracking parameters, and normalizes them to a canonical form.

Building URLs from components
urljoin() builds URLs like a contractor who actually reads the blueprint.
# url_normalizer.py
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse

# Common tracking parameters to strip from URLs
TRACKING_PARAMS = {
    "utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
    "fbclid", "gclid", "msclkid", "ref", "_ga", "mc_eid",
}

ALLOWED_SCHEMES = {"http", "https"}


def validate_url(url):
    """Return (is_valid, error_message) for a URL."""
    if not url or not isinstance(url, str):
        return False, "URL must be a non-empty string"
    url = url.strip()
    try:
        parsed = urlparse(url)
    except Exception as exc:
        return False, f"Parse error: {exc}"

    if not parsed.scheme:
        return False, "Missing scheme (http/https)"
    if parsed.scheme not in ALLOWED_SCHEMES:
        return False, f"Unsupported scheme: {parsed.scheme!r}"
    if not parsed.netloc:
        return False, "Missing host"
    if "." not in parsed.netloc.lstrip("."):
        return False, f"Host looks invalid: {parsed.netloc!r}"
    return True, None


def normalize_url(url):
    """
    Normalize a URL:
    - Lowercase scheme and host
    - Strip tracking parameters
    - Remove default ports (80 for http, 443 for https)
    - Remove trailing slash from root path only
    """
    parsed = urlparse(url.strip())
    scheme = parsed.scheme.lower()
    netloc = parsed.netloc.lower()

    # Strip default ports
    if netloc.endswith(":80") and scheme == "http":
        netloc = netloc[:-3]
    elif netloc.endswith(":443") and scheme == "https":
        netloc = netloc[:-4]

    # Strip tracking params, preserve order
    clean_params = [
        (k, v) for k, v in parse_qsl(parsed.query)
        if k.lower() not in TRACKING_PARAMS
    ]
    clean_query = urlencode(clean_params)

    return urlunparse((scheme, netloc, parsed.path, parsed.params, clean_query, ""))


# Test the utilities
test_urls = [
    "https://jsonplaceholder.typicode.com/posts?userId=1&utm_source=newsletter&utm_campaign=weekly",
    "HTTPS://HTTPBin.Org:443/get?data=hello&fbclid=ABC123",
    "http://quotes.toscrape.com/page/1/?ref=homepage&page=1",
    "ftp://files.example.com/data",
    "not-a-url",
    "",
]

for url in test_urls:
    valid, err = validate_url(url)
    if valid:
        normalized = normalize_url(url)
        print(f"OK  -> {normalized}")
    else:
        print(f"ERR -> {err}")

Output:

OK  -> https://jsonplaceholder.typicode.com/posts?userId=1
OK  -> https://httpbin.org/get?data=hello
OK  -> http://quotes.toscrape.com/page/1/?page=1
ERR -> Unsupported scheme: 'ftp'
ERR -> Missing scheme (http/https)
ERR -> URL must be a non-empty string

The normalizer removes utm_source, utm_campaign, and fbclid while preserving legitimate parameters like userId and page. It also lowercases the scheme and host, and strips redundant ports. This canonical form means duplicate URLs with different tracking parameters or case differences get treated as the same URL — essential for deduplication in scrapers and crawlers.

Frequently Asked Questions

Should I use urllib.parse or the requests library for URL handling?

urllib.parse is for parsing and building URL strings — it doesn’t make HTTP requests. The requests library is for making HTTP requests — it uses urllib.parse internally when you pass params= to a request. The two tools are complementary: use urllib.parse to construct, decode, and validate URLs, and use requests (or httpx) to actually fetch them. You’ll often use both in the same script.

When should I use parse_qs vs parse_qsl?

parse_qs() returns a dict of lists — good for random access by key. parse_qsl() returns an ordered list of tuples — good when order matters or when you need to process duplicate keys in sequence. If you’re building a URL canonicalizer (like the normalizer above), use parse_qsl() so you preserve insertion order and can filter in a single pass. If you just need to look up a specific parameter value, parse_qs()["key"][0] is more readable.

How do I properly validate a URL in Python?

urlparse() is very lenient — it successfully parses nearly anything, including strings that aren’t URLs at all. For basic validation, check that parsed.scheme and parsed.netloc are non-empty after parsing. For stricter validation (checking domain format, TLD, reachability), consider the validators library (pip install validators) which provides validators.url(url). Never use urlparse() alone as a security gate — it won’t catch all invalid or malicious URLs.

Why does urljoin() sometimes ignore my base URL?

If the second argument to urljoin() is an absolute URL (has a scheme and host), the base is ignored entirely — the absolute URL is returned as-is. This is RFC 3986 behavior. The fix is to check whether your href is absolute before calling urljoin(), or always pass the current page URL as the base and let urljoin() do the right thing. For scraping, always use the URL of the page you scraped as the base, not the site root — relative paths resolve from the current directory, not the root.

How do I avoid double-encoding URLs?

Always start from the raw, decoded value before encoding. If a URL is already encoded (contains %20 or similar), call unquote() first, then re-encode with quote(). A quick check: if your string contains a literal % followed by two hex digits, it’s already encoded. Double-encoding produces %2520 (the % becomes %25), which browsers and servers won’t interpret as a space. When in doubt, decode first with unquote() then encode with quote().

Conclusion

Python’s urllib.parse module gives you everything you need to work with URLs correctly: urlparse() to split URLs into components, parse_qs()/parse_qsl() to decode query strings, urlencode() to build them back, quote()/quote_plus() for safe encoding, and urljoin() to resolve relative URLs against a base. The URL normalizer above is a production-ready starting point — extend it with domain allowlists, path normalization, or integration with your scraper’s deduplication layer.

For HTTP requests themselves, pair urllib.parse with the requests or httpx library. For stricter URL validation including TLD checking and reachability tests, add the validators package. The urllib.parse module handles the structural and encoding layer; the rest of the stack builds on top of it.

Official documentation: https://docs.python.org/3/library/urllib.parse.html