Intermediate
How To Create Custom Context Managers in Python
Context managers are one of Python’s most elegant features — they guarantee that setup and cleanup code runs correctly, even when exceptions occur. You’ve probably used them with with open('file.txt') as f: without thinking about the machinery underneath. But creating your own context managers unlocks powerful patterns: automatically releasing database connections, managing temporary files, handling transactions, and coordinating complex resource lifecycles. This tutorial digs deep into building custom context managers that transform error-prone manual cleanup into bulletproof automated patterns.
You might think building context managers is complex or requires understanding esoteric Python internals. In reality, there are three straightforward approaches: the class-based __enter__/__exit__ protocol for maximum control, the @contextmanager decorator for simple cases, and ExitStack for dynamic contexts. Each has a time and place. By the end of this article, you’ll have a mental model of when to use each, and you’ll be able to write context managers that your teammates will thank you for.
We’ll start with a quick example showing the difference between error-prone manual cleanup and context manager elegance, then explore the __enter__/__exit__ protocol in detail, master the @contextmanager decorator, tackle dynamic contexts with ExitStack, and wrap up with async context managers for concurrent code. Along the way, we’ll cover real-world patterns like database transactions, file handling, and resource pooling.
Quick Example
Here’s why context managers matter:
# cleanup_comparison.py
# WITHOUT context manager: cleanup might not run
def bad_database_access():
connection = connect_to_db()
try:
result = connection.query('SELECT * FROM users')
if not result:
raise ValueError('No users found!')
return result
finally:
connection.close() # Easy to forget, even with try/finally
# WITH context manager: cleanup ALWAYS runs
def good_database_access():
with connect_to_db() as connection:
result = connection.query('SELECT * FROM users')
if not result:
raise ValueError('No users found!')
return result
# connection.close() runs automatically, even on exception
The second version is clearer and safer. The context manager guarantees cleanup, and exceptions naturally propagate while resources are released. No manual try/finally, no forgotten close() calls.
What Are Context Managers?
A context manager is an object that implements two methods: __enter__ (called when entering the with block) and __exit__ (called when leaving, whether via normal completion or exception). This simple protocol guarantees that cleanup code runs.
Python’s with statement is syntactic sugar. When you write:
with some_object as x:
do_something(x)
Python translates it to:
mgr = some_object
exit = type(mgr).__exit__
value = type(mgr).__enter__(mgr)
exc = True
try:
try:
x = value
do_something(x)
except:
exc = False
if not exit(mgr, *sys.exc_info()):
raise
finally:
exit(mgr, None, None, None)
The key insight: __exit__ always runs, even on exception. The return value of __exit__ determines whether the exception is suppressed (True) or re-raised (False).
| Approach | Simplicity | Control | Best For |
|---|---|---|---|
Class with __enter__/__exit__ |
Medium | Maximum (full access to exception info) | Complex resources, error handling, state |
@contextmanager decorator |
High | Good (can catch exceptions) | Simple setup/cleanup, one-off contexts |
ExitStack |
Medium | Good (dynamic number of contexts) | Conditional contexts, variable cleanup order |
The __enter__/__exit__ Protocol
Basic Example: File-Like Object
Let’s build a simple custom file-like object that tracks access:
# tracked_file.py
class TrackedFile:
"""Context manager that tracks file access"""
def __init__(self, filename):
self.filename = filename
self.file = None
self.access_count = 0
def __enter__(self):
"""Called when entering 'with' block"""
print(f'Opening {self.filename}')
self.file = open(self.filename, 'r')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Called when exiting 'with' block"""
print(f'Closing {self.filename} (accessed {self.access_count} times)')
if self.file:
self.file.close()
return False # Don't suppress exceptions
def read_line(self):
"""Read a line and track access"""
self.access_count += 1
return self.file.readline()
# Usage
if __name__ == '__main__':
with TrackedFile('/etc/hostname') as f:
line = f.read_line()
print(f'Read: {line.strip()}')
line2 = f.read_line()
Output:
Opening /etc/hostname
Read: localhost
Closing /etc/hostname (accessed 1 times)
When we enter the with block, __enter__ opens the file and returns self. When we exit (normally or via exception), __exit__ closes the file. The file is guaranteed to close, even if read_line() raises an exception.
Exception Handling in __exit__
The __exit__ method receives three parameters that describe any exception that occurred:
# exception_handling.py
class SafeConnection:
"""Context manager that handles exceptions gracefully"""
def __init__(self, db_name):
self.db_name = db_name
self.connected = False
def __enter__(self):
print(f'Connecting to {self.db_name}')
self.connected = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
exc_type: Exception class (None if no exception)
exc_val: Exception instance
exc_tb: Traceback object
"""
if exc_type is not None:
print(f'Exception occurred: {exc_type.__name__}: {exc_val}')
print(f'Rolling back transaction')
# Return False to re-raise the exception
return False
else:
print(f'Committing transaction')
return False
def query(self, sql):
if not self.connected:
raise RuntimeError('Not connected!')
return f'Query result for: {sql}'
# Normal completion
print('=== Normal completion ===')
with SafeConnection('mydb') as conn:
result = conn.query('SELECT * FROM users')
print(f'Got: {result}')
# With exception
print('\n=== With exception ===')
try:
with SafeConnection('mydb') as conn:
result = conn.query('SELECT * FROM users')
raise ValueError('Data validation failed!')
except ValueError:
print('Caught exception in outer scope')
Output:
=== Normal completion ===
Connecting to mydb
Got: Query result for: SELECT * FROM users
Committing transaction
=== With exception ===
Connecting to mydb
Exception occurred: ValueError: Data validation failed!
Rolling back transaction
Caught exception in outer scope
Notice that __exit__ runs even when an exception occurs inside the with block. This is perfect for cleanup like rolling back transactions or releasing locks. If __exit__ returns True, the exception is suppressed. If it returns False (or nothing), the exception propagates to the caller.
When to Suppress Exceptions
Sometimes you want __exit__ to suppress exceptions. Use this carefully:
# suppress_exception.py
class ErrorIgnorer:
"""Context manager that suppresses specific exceptions"""
def __init__(self, *suppress_types):
self.suppress_types = suppress_types
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
# No exception, return False
return False
if exc_type in self.suppress_types:
print(f'Suppressed: {exc_type.__name__}')
return True # Suppress this exception
print(f'Not suppressed: {exc_type.__name__}')
return False # Let other exceptions propagate
# Suppress ValueError but not RuntimeError
with ErrorIgnorer(ValueError):
print('Inside context')
raise ValueError('This will be suppressed')
print('Execution continues after ValueError')
try:
with ErrorIgnorer(ValueError):
raise RuntimeError('This will NOT be suppressed')
except RuntimeError as e:
print(f'RuntimeError propagated: {e}')
Output:
Inside context
Suppressed: ValueError
Execution continues after ValueError
RuntimeError propagated: This will NOT be suppressed
Exception suppression is powerful but dangerous. Use it only when you’re absolutely certain the exception is safe to ignore. A common safe case is ignoring FileNotFoundError when deleting files.
The @contextmanager Decorator
Simple Decorator Pattern
For simple setup/cleanup, the @contextmanager decorator is cleaner than writing a class. It converts a generator function into a context manager:
# simple_decorator.py
from contextlib import contextmanager
@contextmanager
def timed_operation(name):
"""Context manager that times an operation"""
import time
start = time.time()
print(f'Starting {name}')
try:
yield
finally:
elapsed = time.time() - start
print(f'Finished {name} in {elapsed:.2f}s')
# Usage
with timed_operation('download'):
import time
time.sleep(0.5)
print('Done')
Output:
Starting download
Finished download in 0.50s
Done
The code before yield runs in __enter__. The code after yield runs in __exit__, in a finally block. This guarantees cleanup runs even on exception.
Yielding Values from the Context
Use yield to pass a value to the with block:
# yield_value.py
from contextlib import contextmanager
@contextmanager
def database_connection(db_name):
"""Context manager that manages database connection"""
print(f'Connecting to {db_name}')
# Simulate connection object
class Connection:
def query(self, sql):
return f'Result of: {sql}'
connection = Connection()
try:
yield connection
finally:
print(f'Disconnecting from {db_name}')
# Usage
with database_connection('mydb') as conn:
result = conn.query('SELECT * FROM users')
print(result)
Output:
Connecting to mydb
Result of: SELECT * FROM users
Disconnecting from mydb
Exception Handling with @contextmanager
You can catch exceptions from the with block:
# exception_decorator.py
from contextlib import contextmanager
@contextmanager
def transaction(db_name):
"""Context manager that manages transactions"""
print(f'Starting transaction on {db_name}')
try:
yield
except Exception as e:
print(f'Transaction failed: {e}')
print('Rolling back...')
raise
else:
print('Transaction committed')
# Normal case
print('=== Normal transaction ===')
with transaction('mydb'):
print('Executing query')
# Exception case
print('\n=== Failed transaction ===')
try:
with transaction('mydb'):
print('Executing query')
raise ValueError('Data invalid!')
except ValueError:
print('Caught in outer scope')
Output:
=== Normal transaction ===
Starting transaction on mydb
Executing query
Transaction committed
=== Failed transaction ===
Starting transaction on mydb
Executing query
Transaction failed: Data invalid!
Rolling back...
Caught in outer scope
The try/except/else pattern lets you handle exceptions in the context manager and decide whether to re-raise them.
When to Use Decorator vs Class
Use @contextmanager for:
- Simple, one-off contexts
- When setup and cleanup are short
- When you don’t need to store state between calls
Use a class for:
- Complex state management (storing attributes)
- When you need multiple methods besides
__enter__/__exit__ - When you need fine-grained control over exception handling
- Reusable contexts that multiple codebases will import
Dynamic Contexts with ExitStack
What is ExitStack?
ExitStack lets you manage a dynamic number of contexts. Instead of nesting with statements, you enter contexts conditionally or in a loop:
# exitstack_basics.py
from contextlib import ExitStack
# Without ExitStack: ugly nesting
# with open('file1.txt') as f1:
# with open('file2.txt') as f2:
# with open('file3.txt') as f3:
# process(f1, f2, f3)
# With ExitStack: clean
files = ['file1.txt', 'file2.txt', 'file3.txt']
with ExitStack() as stack:
file_objects = [stack.enter_context(open(f)) for f in files]
# Process files
for f in file_objects:
print(f'Opened: {f.name}')
# All files closed when exiting the 'with' block
Output:
Opened: file1.txt
Opened: file2.txt
Opened: file3.txt
Conditional Contexts
Use ExitStack to conditionally enter contexts:
# conditional_contexts.py
from contextlib import ExitStack
class Connection:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f'Opening {self.name}')
return self
def __exit__(self, *args):
print(f'Closing {self.name}')
def process_files(write_log=False, debug=False):
"""Open files conditionally"""
with ExitStack() as stack:
# Always open main file
main_file = stack.enter_context(Connection('main.txt'))
# Conditionally open log file
if write_log:
log_file = stack.enter_context(Connection('log.txt'))
else:
log_file = None
# Conditionally open debug file
if debug:
debug_file = stack.enter_context(Connection('debug.txt'))
else:
debug_file = None
print('Processing...')
return log_file is not None and debug_file is not None
# Call with different conditions
print('=== With log and debug ===')
result = process_files(write_log=True, debug=True)
print(f'All files opened: {result}\n')
print('=== With only log ===')
result = process_files(write_log=True, debug=False)
print(f'All files opened: {result}')
Output:
=== With log and debug ===
Opening main.txt
Opening log.txt
Opening debug.txt
Processing...
All files opened: True
=== With only log ===
Opening main.txt
Opening log.txt
Processing...
All files opened: False
Callback Registration
ExitStack lets you register arbitrary cleanup functions:
# callback_registration.py
from contextlib import ExitStack
def process_with_cleanup():
with ExitStack() as stack:
# Open a resource
f = stack.enter_context(open('/etc/hostname'))
# Register cleanup functions
stack.callback(print, 'Cleanup step 1')
stack.callback(print, 'Cleanup step 2')
stack.callback(print, 'Cleanup step 3')
print('Doing work...')
content = f.read()
print(f'Read: {content.strip()}')
# ExitStack calls callbacks in LIFO (last-in-first-out) order
# and closes the file
process_with_cleanup()
Output:
Doing work...
Read: localhost
Cleanup step 3
Cleanup step 2
Cleanup step 1
Callbacks run in reverse order (LIFO), which is usually what you want: if cleanup step 3 depends on step 2, they run in the right order.
ExitStack Real-World Pattern
A practical example: manage multiple database connections with conditional rollback:
# exitstack_database.py
from contextlib import ExitStack
class FakeConnection:
def __init__(self, name):
self.name = name
self.committed = False
def __enter__(self):
print(f'Opening {self.name}')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
print(f'Rollback {self.name}')
else:
print(f'Commit {self.name}')
self.committed = (exc_type is None)
return False
def multi_db_transaction(db_names, fail_on=None):
"""Open multiple databases, rollback all if any fail"""
with ExitStack() as stack:
connections = {}
for db_name in db_names:
conn = stack.enter_context(FakeConnection(db_name))
connections[db_name] = conn
for db_name, conn in connections.items():
if db_name == fail_on:
raise ValueError(f'Intentional failure on {db_name}')
print(f'Execute query on {db_name}')
return connections
# Normal case: all succeed
print('=== All succeed ===')
multi_db_transaction(['db1', 'db2', 'db3'])
# Failure case: second database fails
print('\n=== Second database fails ===')
try:
multi_db_transaction(['db1', 'db2', 'db3'], fail_on='db2')
except ValueError as e:
print(f'Error: {e}')
Output:
=== All succeed ===
Opening db1
Opening db2
Opening db3
Execute query on db1
Execute query on db2
Execute query on db3
Commit db1
Commit db2
Commit db3
=== Second database fails ===
Opening db1
Opening db2
Opening db3
Execute query on db1
Error: Intentional failure on db2
Rollback db3
Rollback db2
Rollback db1
Notice that when db2 fails, all databases are rolled back in reverse order. This is the power of ExitStack: you don’t know how many resources to manage until runtime.
Async Context Managers
Async __aenter__ and __aexit__
For async code, define __aenter__ and __aexit__ instead of __enter__ and __exit__:
# async_context.py
import asyncio
class AsyncConnection:
"""Async context manager for database connection"""
def __init__(self, db_name):
self.db_name = db_name
async def __aenter__(self):
print(f'Async connecting to {self.db_name}')
await asyncio.sleep(0.1) # Simulate async I/O
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f'Async disconnecting from {self.db_name}')
await asyncio.sleep(0.1) # Simulate async cleanup
return False
async def query(self, sql):
print(f'Executing: {sql}')
await asyncio.sleep(0.1)
return 'Query result'
async def main():
# Use 'async with' for async context managers
async with AsyncConnection('mydb') as conn:
result = await conn.query('SELECT * FROM users')
print(f'Got: {result}')
asyncio.run(main())
Output:
Async connecting to mydb
Executing: SELECT * FROM users
Got: Query result
Async disconnecting from mydb
Async @contextmanager
For simple async setup/cleanup, use @asynccontextmanager:
# async_decorator.py
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_transaction(db_name):
"""Async context manager for transactions"""
print(f'Starting async transaction on {db_name}')
await asyncio.sleep(0.1)
try:
yield
finally:
print(f'Committing async transaction on {db_name}')
await asyncio.sleep(0.1)
async def main():
async with async_transaction('mydb'):
print('Doing async work...')
await asyncio.sleep(0.2)
asyncio.run(main())
Output:
Starting async transaction on mydb
Doing async work...
Committing async transaction on mydb
AsyncExitStack for Dynamic Async Contexts
Use AsyncExitStack to manage variable numbers of async contexts:
# async_exitstack.py
import asyncio
from contextlib import AsyncExitStack
class AsyncResource:
def __init__(self, name):
self.name = name
async def __aenter__(self):
print(f'Acquiring {self.name}')
await asyncio.sleep(0.05)
return self
async def __aexit__(self, *args):
print(f'Releasing {self.name}')
await asyncio.sleep(0.05)
async def main():
resources = ['res1', 'res2', 'res3']
# Manage multiple async contexts
async with AsyncExitStack() as stack:
acquired = [
await stack.enter_async_context(AsyncResource(r))
for r in resources
]
print('Using resources...')
await asyncio.sleep(0.1)
print(f'Acquired {len(acquired)} resources')
# All resources released automatically
print('Done')
asyncio.run(main())
Output:
Acquiring res1
Acquiring res2
Acquiring res3
Using resources...
Acquired 3 resources
Releasing res3
Releasing res2
Releasing res1
Done
Real-World Project: Resource Manager
Let’s build a production-ready resource manager that combines everything: class-based context managers, ExitStack, error handling, and async support.
# resource_manager.py
import time
import asyncio
from contextlib import contextmanager, asynccontextmanager, ExitStack, AsyncExitStack
from typing import Optional, List
class DatabaseConnection:
"""Simulated database connection"""
def __init__(self, db_name: str):
self.db_name = db_name
self.is_connected = False
def __enter__(self):
print(f'[DB] Connecting to {self.db_name}')
time.sleep(0.1)
self.is_connected = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f'[DB] Disconnecting from {self.db_name}')
self.is_connected = False
return False
def execute(self, query: str):
if not self.is_connected:
raise RuntimeError('Not connected!')
return f'Result of: {query}'
@contextmanager
def backup_context(backup_path: str):
"""Simple context for backup operations"""
print(f'[BACKUP] Starting backup to {backup_path}')
try:
yield f'Backup handle for {backup_path}'
finally:
print(f'[BACKUP] Completed backup to {backup_path}')
class TransactionManager:
"""Complex context manager for multi-step transactions"""
def __init__(self, databases: List[str]):
self.databases = databases
self.connections = {}
self.failed = False
def __enter__(self):
print('[TX] Starting transaction')
try:
for db in self.databases:
self.connections[db] = DatabaseConnection(db)
self.connections[db].__enter__()
except Exception as e:
self.failed = True
for db, conn in self.connections.items():
try:
conn.__exit__(None, None, None)
except:
pass
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
print(f'[TX] Rolling back (error: {exc_type.__name__})')
else:
print('[TX] Committing')
for db, conn in self.connections.items():
try:
conn.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
print(f'[TX] Error closing {db}: {e}')
return False
def execute(self, db: str, query: str):
if db not in self.connections:
raise ValueError(f'Unknown database: {db}')
return self.connections[db].execute(query)
async class AsyncDatabaseConnection:
"""Async database connection"""
def __init__(self, db_name: str):
self.db_name = db_name
async def __aenter__(self):
print(f'[ASYNC DB] Connecting to {self.db_name}')
await asyncio.sleep(0.1)
return self
async def __aexit__(self, *args):
print(f'[ASYNC DB] Disconnecting from {self.db_name}')
await asyncio.sleep(0.05)
async def query(self, sql: str):
await asyncio.sleep(0.05)
return f'Async result: {sql}'
# Example: Multi-database transaction with backup
print('=== Multi-Database Transaction ===')
try:
with TransactionManager(['db1', 'db2']) as tx:
with ExitStack() as stack:
backup = stack.enter_context(backup_context('/backup/latest'))
print(f'Backup: {backup}')
result1 = tx.execute('db1', 'INSERT INTO users VALUES (...)')
result2 = tx.execute('db2', 'INSERT INTO logs VALUES (...)')
print(f'Results: {result1}, {result2}')
except Exception as e:
print(f'Transaction failed: {e}')
print('\n=== Multi-Database with Failure ===')
try:
with TransactionManager(['db1', 'db2']) as tx:
result = tx.execute('db1', 'SELECT * FROM users')
print(f'DB1 result: {result}')
raise ValueError('Simulated error!')
except ValueError as e:
print(f'Caught: {e}')
# Example: Async operations
async def async_example():
print('\n=== Async Context Managers ===')
async with AsyncExitStack() as stack:
db1 = await stack.enter_async_context(AsyncDatabaseConnection('async_db1'))
db2 = await stack.enter_async_context(AsyncDatabaseConnection('async_db2'))
result1 = await db1.query('SELECT * FROM users')
result2 = await db2.query('SELECT * FROM posts')
print(f'Results: {result1}, {result2}')
asyncio.run(async_example())
Output:
=== Multi-Database Transaction ===
[TX] Starting transaction
[DB] Connecting to db1
[DB] Connecting to db2
[BACKUP] Starting backup to /backup/latest
Backup: Backup handle for /backup/latest
DB1 result: Result of: INSERT INTO users VALUES (...)
DB2 result: Result of: INSERT INTO logs VALUES (...)
[BACKUP] Completed backup to /backup/latest
[TX] Committing
[DB] Disconnecting from db1
[DB] Disconnecting from db2
=== Multi-Database with Failure ===
[TX] Starting transaction
[DB] Connecting to db1
[DB] Connecting to db2
DB1 result: Result of: SELECT * FROM users
[TX] Rolling back (error: ValueError)
[DB] Disconnecting from db1
[DB] Disconnecting from db2
Caught: Simulated error!
=== Async Context Managers ===
[ASYNC DB] Connecting to async_db1
[ASYNC DB] Connecting to async_db2
Results: Async result: SELECT * FROM users, Async result: SELECT * FROM posts
[ASYNC DB] Disconnecting from async_db2
[ASYNC DB] Disconnecting from async_db1
This project demonstrates:
- Class-based context managers with full control
- Simple function-based contexts with
@contextmanager - ExitStack for combining contexts
- Multi-database transactions with automatic rollback
- Async contexts for concurrent operations
- Proper cleanup ordering (LIFO)
Frequently Asked Questions
Do I always need nested ‘with’ statements?
No, use ExitStack or multiple context managers on one line: with open('file1') as f1, open('file2') as f2: is valid syntax and cleaner than nesting. Use ExitStack when the number of contexts is dynamic or determined at runtime.
When should I suppress exceptions in __exit__?
Rarely. Only suppress exceptions when you’re absolutely certain it’s safe. Common cases: ignoring FileNotFoundError in a cleanup handler, or suppressing expected exceptions in a retry context manager. If unsure, return False and let the exception propagate.
What’s the difference between @contextmanager and a class?
@contextmanager is simpler for one-off contexts but can’t store complex state easily. Classes are more explicit and easier to debug. If you need attributes or multiple methods, use a class. If it’s a simple setup/cleanup, @contextmanager is cleaner.
Why do ExitStack callbacks run in reverse order?
LIFO (last-in, first-out) order ensures dependencies are satisfied. If callback B depends on callback A being active, register A first, then B. When exiting, B runs first (and can still use A), then A. This matches the pattern of nested contexts.
Can I mix async and sync context managers?
No, async code requires async with and __aenter__/__aexit__. Sync contexts use with and __enter__/__exit__. You can use AsyncExitStack to manage multiple async contexts, but you can’t use sync contexts inside an async context. However, you can call sync code from async using asyncio.to_thread().
How do I catch exceptions in a @contextmanager generator?
Wrap the yield in try/except: try: yield ... except ValueError: handle_error(). The except block runs if the code in the with body raises that exception. You can suppress (no re-raise) or let it propagate (re-raise or use bare raise).
Conclusion
Context managers are one of Python’s most powerful and elegant features. The class-based __enter__/__exit__ protocol gives you maximum control, the @contextmanager decorator simplifies common patterns, and ExitStack/AsyncExitStack handle dynamic and concurrent resource management. By building custom context managers, you transform error-prone manual cleanup into bulletproof automated patterns that your future self (and your teammates) will appreciate.
For deeper understanding, see the official contextlib documentation and with statement reference.
Related Articles
- Understanding Python Decorators
- Exception Handling and Best Practices in Python
- Async/Await and Asyncio Explained
- Working with Files and Directories in Python
- Database Transactions and ORM Patterns in Python