Intermediate

How To Create Custom Context Managers in Python

Context managers are one of Python’s most elegant features — they guarantee that setup and cleanup code runs correctly, even when exceptions occur. You’ve probably used them with with open('file.txt') as f: without thinking about the machinery underneath. But creating your own context managers unlocks powerful patterns: automatically releasing database connections, managing temporary files, handling transactions, and coordinating complex resource lifecycles. This tutorial digs deep into building custom context managers that transform error-prone manual cleanup into bulletproof automated patterns.

You might think building context managers is complex or requires understanding esoteric Python internals. In reality, there are three straightforward approaches: the class-based __enter__/__exit__ protocol for maximum control, the @contextmanager decorator for simple cases, and ExitStack for dynamic contexts. Each has a time and place. By the end of this article, you’ll have a mental model of when to use each, and you’ll be able to write context managers that your teammates will thank you for.

We’ll start with a quick example showing the difference between error-prone manual cleanup and context manager elegance, then explore the __enter__/__exit__ protocol in detail, master the @contextmanager decorator, tackle dynamic contexts with ExitStack, and wrap up with async context managers for concurrent code. Along the way, we’ll cover real-world patterns like database transactions, file handling, and resource pooling.

Quick Example

Here’s why context managers matter:

# cleanup_comparison.py

# WITHOUT context manager: cleanup might not run
def bad_database_access():
    connection = connect_to_db()
    try:
        result = connection.query('SELECT * FROM users')
        if not result:
            raise ValueError('No users found!')
        return result
    finally:
        connection.close()  # Easy to forget, even with try/finally

# WITH context manager: cleanup ALWAYS runs
def good_database_access():
    with connect_to_db() as connection:
        result = connection.query('SELECT * FROM users')
        if not result:
            raise ValueError('No users found!')
        return result
    # connection.close() runs automatically, even on exception

The second version is clearer and safer. The context manager guarantees cleanup, and exceptions naturally propagate while resources are released. No manual try/finally, no forgotten close() calls.

Python context managers tutorial - in-content-1
Context managers guarantee setup and cleanup run correctly, even when exceptions occur

What Are Context Managers?

A context manager is an object that implements two methods: __enter__ (called when entering the with block) and __exit__ (called when leaving, whether via normal completion or exception). This simple protocol guarantees that cleanup code runs.

Python’s with statement is syntactic sugar. When you write:

with some_object as x:
    do_something(x)

Python translates it to:

mgr = some_object
exit = type(mgr).__exit__
value = type(mgr).__enter__(mgr)
exc = True
try:
    try:
        x = value
        do_something(x)
    except:
        exc = False
        if not exit(mgr, *sys.exc_info()):
            raise
finally:
    exit(mgr, None, None, None)

The key insight: __exit__ always runs, even on exception. The return value of __exit__ determines whether the exception is suppressed (True) or re-raised (False).

Approach Simplicity Control Best For
Class with __enter__/__exit__ Medium Maximum (full access to exception info) Complex resources, error handling, state
@contextmanager decorator High Good (can catch exceptions) Simple setup/cleanup, one-off contexts
ExitStack Medium Good (dynamic number of contexts) Conditional contexts, variable cleanup order
Python context managers tutorial - in-content-2
Context managers separate concern: __enter__ acquires resources, __exit__ releases them

The __enter__/__exit__ Protocol

Basic Example: File-Like Object

Let’s build a simple custom file-like object that tracks access:

# tracked_file.py

class TrackedFile:
    """Context manager that tracks file access"""

    def __init__(self, filename):
        self.filename = filename
        self.file = None
        self.access_count = 0

    def __enter__(self):
        """Called when entering 'with' block"""
        print(f'Opening {self.filename}')
        self.file = open(self.filename, 'r')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Called when exiting 'with' block"""
        print(f'Closing {self.filename} (accessed {self.access_count} times)')
        if self.file:
            self.file.close()
        return False  # Don't suppress exceptions

    def read_line(self):
        """Read a line and track access"""
        self.access_count += 1
        return self.file.readline()


# Usage
if __name__ == '__main__':
    with TrackedFile('/etc/hostname') as f:
        line = f.read_line()
        print(f'Read: {line.strip()}')
        line2 = f.read_line()

Output:

Opening /etc/hostname
Read: localhost
Closing /etc/hostname (accessed 1 times)

When we enter the with block, __enter__ opens the file and returns self. When we exit (normally or via exception), __exit__ closes the file. The file is guaranteed to close, even if read_line() raises an exception.

Exception Handling in __exit__

The __exit__ method receives three parameters that describe any exception that occurred:

# exception_handling.py

class SafeConnection:
    """Context manager that handles exceptions gracefully"""

    def __init__(self, db_name):
        self.db_name = db_name
        self.connected = False

    def __enter__(self):
        print(f'Connecting to {self.db_name}')
        self.connected = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        exc_type: Exception class (None if no exception)
        exc_val: Exception instance
        exc_tb: Traceback object
        """
        if exc_type is not None:
            print(f'Exception occurred: {exc_type.__name__}: {exc_val}')
            print(f'Rolling back transaction')
            # Return False to re-raise the exception
            return False
        else:
            print(f'Committing transaction')
            return False

    def query(self, sql):
        if not self.connected:
            raise RuntimeError('Not connected!')
        return f'Query result for: {sql}'


# Normal completion
print('=== Normal completion ===')
with SafeConnection('mydb') as conn:
    result = conn.query('SELECT * FROM users')
    print(f'Got: {result}')

# With exception
print('\n=== With exception ===')
try:
    with SafeConnection('mydb') as conn:
        result = conn.query('SELECT * FROM users')
        raise ValueError('Data validation failed!')
except ValueError:
    print('Caught exception in outer scope')

Output:

=== Normal completion ===
Connecting to mydb
Got: Query result for: SELECT * FROM users
Committing transaction

=== With exception ===
Connecting to mydb
Exception occurred: ValueError: Data validation failed!
Rolling back transaction
Caught exception in outer scope

Notice that __exit__ runs even when an exception occurs inside the with block. This is perfect for cleanup like rolling back transactions or releasing locks. If __exit__ returns True, the exception is suppressed. If it returns False (or nothing), the exception propagates to the caller.

When to Suppress Exceptions

Sometimes you want __exit__ to suppress exceptions. Use this carefully:

# suppress_exception.py

class ErrorIgnorer:
    """Context manager that suppresses specific exceptions"""

    def __init__(self, *suppress_types):
        self.suppress_types = suppress_types

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            # No exception, return False
            return False

        if exc_type in self.suppress_types:
            print(f'Suppressed: {exc_type.__name__}')
            return True  # Suppress this exception

        print(f'Not suppressed: {exc_type.__name__}')
        return False  # Let other exceptions propagate


# Suppress ValueError but not RuntimeError
with ErrorIgnorer(ValueError):
    print('Inside context')
    raise ValueError('This will be suppressed')
print('Execution continues after ValueError')

try:
    with ErrorIgnorer(ValueError):
        raise RuntimeError('This will NOT be suppressed')
except RuntimeError as e:
    print(f'RuntimeError propagated: {e}')

Output:

Inside context
Suppressed: ValueError
Execution continues after ValueError
RuntimeError propagated: This will NOT be suppressed

Exception suppression is powerful but dangerous. Use it only when you’re absolutely certain the exception is safe to ignore. A common safe case is ignoring FileNotFoundError when deleting files.

Python context managers tutorial - in-content-3
__exit__ receives exception info and can suppress it (return True) or propagate it (return False)

The @contextmanager Decorator

Simple Decorator Pattern

For simple setup/cleanup, the @contextmanager decorator is cleaner than writing a class. It converts a generator function into a context manager:

# simple_decorator.py

from contextlib import contextmanager

@contextmanager
def timed_operation(name):
    """Context manager that times an operation"""
    import time
    start = time.time()
    print(f'Starting {name}')
    try:
        yield
    finally:
        elapsed = time.time() - start
        print(f'Finished {name} in {elapsed:.2f}s')


# Usage
with timed_operation('download'):
    import time
    time.sleep(0.5)
print('Done')

Output:

Starting download
Finished download in 0.50s
Done

The code before yield runs in __enter__. The code after yield runs in __exit__, in a finally block. This guarantees cleanup runs even on exception.

Yielding Values from the Context

Use yield to pass a value to the with block:

# yield_value.py

from contextlib import contextmanager

@contextmanager
def database_connection(db_name):
    """Context manager that manages database connection"""
    print(f'Connecting to {db_name}')
    # Simulate connection object
    class Connection:
        def query(self, sql):
            return f'Result of: {sql}'

    connection = Connection()
    try:
        yield connection
    finally:
        print(f'Disconnecting from {db_name}')


# Usage
with database_connection('mydb') as conn:
    result = conn.query('SELECT * FROM users')
    print(result)

Output:

Connecting to mydb
Result of: SELECT * FROM users
Disconnecting from mydb

Exception Handling with @contextmanager

You can catch exceptions from the with block:

# exception_decorator.py

from contextlib import contextmanager

@contextmanager
def transaction(db_name):
    """Context manager that manages transactions"""
    print(f'Starting transaction on {db_name}')
    try:
        yield
    except Exception as e:
        print(f'Transaction failed: {e}')
        print('Rolling back...')
        raise
    else:
        print('Transaction committed')


# Normal case
print('=== Normal transaction ===')
with transaction('mydb'):
    print('Executing query')

# Exception case
print('\n=== Failed transaction ===')
try:
    with transaction('mydb'):
        print('Executing query')
        raise ValueError('Data invalid!')
except ValueError:
    print('Caught in outer scope')

Output:

=== Normal transaction ===
Starting transaction on mydb
Executing query
Transaction committed

=== Failed transaction ===
Starting transaction on mydb
Executing query
Transaction failed: Data invalid!
Rolling back...
Caught in outer scope

The try/except/else pattern lets you handle exceptions in the context manager and decide whether to re-raise them.

When to Use Decorator vs Class

Use @contextmanager for:

  • Simple, one-off contexts
  • When setup and cleanup are short
  • When you don’t need to store state between calls

Use a class for:

  • Complex state management (storing attributes)
  • When you need multiple methods besides __enter__/__exit__
  • When you need fine-grained control over exception handling
  • Reusable contexts that multiple codebases will import
Python context managers tutorial - in-content-4
@contextmanager simplifies code for straightforward setup/cleanup patterns

Dynamic Contexts with ExitStack

What is ExitStack?

ExitStack lets you manage a dynamic number of contexts. Instead of nesting with statements, you enter contexts conditionally or in a loop:

# exitstack_basics.py

from contextlib import ExitStack

# Without ExitStack: ugly nesting
# with open('file1.txt') as f1:
#     with open('file2.txt') as f2:
#         with open('file3.txt') as f3:
#             process(f1, f2, f3)

# With ExitStack: clean
files = ['file1.txt', 'file2.txt', 'file3.txt']

with ExitStack() as stack:
    file_objects = [stack.enter_context(open(f)) for f in files]
    # Process files
    for f in file_objects:
        print(f'Opened: {f.name}')
    # All files closed when exiting the 'with' block

Output:

Opened: file1.txt
Opened: file2.txt
Opened: file3.txt

Conditional Contexts

Use ExitStack to conditionally enter contexts:

# conditional_contexts.py

from contextlib import ExitStack

class Connection:
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        print(f'Opening {self.name}')
        return self

    def __exit__(self, *args):
        print(f'Closing {self.name}')


def process_files(write_log=False, debug=False):
    """Open files conditionally"""
    with ExitStack() as stack:
        # Always open main file
        main_file = stack.enter_context(Connection('main.txt'))

        # Conditionally open log file
        if write_log:
            log_file = stack.enter_context(Connection('log.txt'))
        else:
            log_file = None

        # Conditionally open debug file
        if debug:
            debug_file = stack.enter_context(Connection('debug.txt'))
        else:
            debug_file = None

        print('Processing...')
        return log_file is not None and debug_file is not None


# Call with different conditions
print('=== With log and debug ===')
result = process_files(write_log=True, debug=True)
print(f'All files opened: {result}\n')

print('=== With only log ===')
result = process_files(write_log=True, debug=False)
print(f'All files opened: {result}')

Output:

=== With log and debug ===
Opening main.txt
Opening log.txt
Opening debug.txt
Processing...
All files opened: True

=== With only log ===
Opening main.txt
Opening log.txt
Processing...
All files opened: False

Callback Registration

ExitStack lets you register arbitrary cleanup functions:

# callback_registration.py

from contextlib import ExitStack

def process_with_cleanup():
    with ExitStack() as stack:
        # Open a resource
        f = stack.enter_context(open('/etc/hostname'))

        # Register cleanup functions
        stack.callback(print, 'Cleanup step 1')
        stack.callback(print, 'Cleanup step 2')
        stack.callback(print, 'Cleanup step 3')

        print('Doing work...')
        content = f.read()
        print(f'Read: {content.strip()}')

    # ExitStack calls callbacks in LIFO (last-in-first-out) order
    # and closes the file


process_with_cleanup()

Output:

Doing work...
Read: localhost
Cleanup step 3
Cleanup step 2
Cleanup step 1

Callbacks run in reverse order (LIFO), which is usually what you want: if cleanup step 3 depends on step 2, they run in the right order.

ExitStack Real-World Pattern

A practical example: manage multiple database connections with conditional rollback:

# exitstack_database.py

from contextlib import ExitStack

class FakeConnection:
    def __init__(self, name):
        self.name = name
        self.committed = False

    def __enter__(self):
        print(f'Opening {self.name}')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            print(f'Rollback {self.name}')
        else:
            print(f'Commit {self.name}')
        self.committed = (exc_type is None)
        return False


def multi_db_transaction(db_names, fail_on=None):
    """Open multiple databases, rollback all if any fail"""
    with ExitStack() as stack:
        connections = {}
        for db_name in db_names:
            conn = stack.enter_context(FakeConnection(db_name))
            connections[db_name] = conn

        for db_name, conn in connections.items():
            if db_name == fail_on:
                raise ValueError(f'Intentional failure on {db_name}')
            print(f'Execute query on {db_name}')

        return connections


# Normal case: all succeed
print('=== All succeed ===')
multi_db_transaction(['db1', 'db2', 'db3'])

# Failure case: second database fails
print('\n=== Second database fails ===')
try:
    multi_db_transaction(['db1', 'db2', 'db3'], fail_on='db2')
except ValueError as e:
    print(f'Error: {e}')

Output:

=== All succeed ===
Opening db1
Opening db2
Opening db3
Execute query on db1
Execute query on db2
Execute query on db3
Commit db1
Commit db2
Commit db3

=== Second database fails ===
Opening db1
Opening db2
Opening db3
Execute query on db1
Error: Intentional failure on db2
Rollback db3
Rollback db2
Rollback db1

Notice that when db2 fails, all databases are rolled back in reverse order. This is the power of ExitStack: you don’t know how many resources to manage until runtime.

Python context managers tutorial - in-content-5
ExitStack manages dynamic numbers of contexts and cleanup callbacks in LIFO order

Async Context Managers

Async __aenter__ and __aexit__

For async code, define __aenter__ and __aexit__ instead of __enter__ and __exit__:

# async_context.py

import asyncio

class AsyncConnection:
    """Async context manager for database connection"""

    def __init__(self, db_name):
        self.db_name = db_name

    async def __aenter__(self):
        print(f'Async connecting to {self.db_name}')
        await asyncio.sleep(0.1)  # Simulate async I/O
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f'Async disconnecting from {self.db_name}')
        await asyncio.sleep(0.1)  # Simulate async cleanup
        return False

    async def query(self, sql):
        print(f'Executing: {sql}')
        await asyncio.sleep(0.1)
        return 'Query result'


async def main():
    # Use 'async with' for async context managers
    async with AsyncConnection('mydb') as conn:
        result = await conn.query('SELECT * FROM users')
        print(f'Got: {result}')


asyncio.run(main())

Output:

Async connecting to mydb
Executing: SELECT * FROM users
Got: Query result
Async disconnecting from mydb

Async @contextmanager

For simple async setup/cleanup, use @asynccontextmanager:

# async_decorator.py

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_transaction(db_name):
    """Async context manager for transactions"""
    print(f'Starting async transaction on {db_name}')
    await asyncio.sleep(0.1)
    try:
        yield
    finally:
        print(f'Committing async transaction on {db_name}')
        await asyncio.sleep(0.1)


async def main():
    async with async_transaction('mydb'):
        print('Doing async work...')
        await asyncio.sleep(0.2)


asyncio.run(main())

Output:

Starting async transaction on mydb
Doing async work...
Committing async transaction on mydb

AsyncExitStack for Dynamic Async Contexts

Use AsyncExitStack to manage variable numbers of async contexts:

# async_exitstack.py

import asyncio
from contextlib import AsyncExitStack

class AsyncResource:
    def __init__(self, name):
        self.name = name

    async def __aenter__(self):
        print(f'Acquiring {self.name}')
        await asyncio.sleep(0.05)
        return self

    async def __aexit__(self, *args):
        print(f'Releasing {self.name}')
        await asyncio.sleep(0.05)


async def main():
    resources = ['res1', 'res2', 'res3']

    # Manage multiple async contexts
    async with AsyncExitStack() as stack:
        acquired = [
            await stack.enter_async_context(AsyncResource(r))
            for r in resources
        ]

        print('Using resources...')
        await asyncio.sleep(0.1)
        print(f'Acquired {len(acquired)} resources')

    # All resources released automatically
    print('Done')


asyncio.run(main())

Output:

Acquiring res1
Acquiring res2
Acquiring res3
Using resources...
Acquired 3 resources
Releasing res3
Releasing res2
Releasing res1
Done
Async context managers with __aenter__/__aexit__ enable clea
Async context managers with __aenter__/__aexit__ enable clean concurrent resource handling

Real-World Project: Resource Manager

Let’s build a production-ready resource manager that combines everything: class-based context managers, ExitStack, error handling, and async support.

# resource_manager.py

import time
import asyncio
from contextlib import contextmanager, asynccontextmanager, ExitStack, AsyncExitStack
from typing import Optional, List


class DatabaseConnection:
    """Simulated database connection"""

    def __init__(self, db_name: str):
        self.db_name = db_name
        self.is_connected = False

    def __enter__(self):
        print(f'[DB] Connecting to {self.db_name}')
        time.sleep(0.1)
        self.is_connected = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f'[DB] Disconnecting from {self.db_name}')
        self.is_connected = False
        return False

    def execute(self, query: str):
        if not self.is_connected:
            raise RuntimeError('Not connected!')
        return f'Result of: {query}'


@contextmanager
def backup_context(backup_path: str):
    """Simple context for backup operations"""
    print(f'[BACKUP] Starting backup to {backup_path}')
    try:
        yield f'Backup handle for {backup_path}'
    finally:
        print(f'[BACKUP] Completed backup to {backup_path}')


class TransactionManager:
    """Complex context manager for multi-step transactions"""

    def __init__(self, databases: List[str]):
        self.databases = databases
        self.connections = {}
        self.failed = False

    def __enter__(self):
        print('[TX] Starting transaction')
        try:
            for db in self.databases:
                self.connections[db] = DatabaseConnection(db)
                self.connections[db].__enter__()
        except Exception as e:
            self.failed = True
            for db, conn in self.connections.items():
                try:
                    conn.__exit__(None, None, None)
                except:
                    pass
            raise
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            print(f'[TX] Rolling back (error: {exc_type.__name__})')
        else:
            print('[TX] Committing')

        for db, conn in self.connections.items():
            try:
                conn.__exit__(exc_type, exc_val, exc_tb)
            except Exception as e:
                print(f'[TX] Error closing {db}: {e}')

        return False

    def execute(self, db: str, query: str):
        if db not in self.connections:
            raise ValueError(f'Unknown database: {db}')
        return self.connections[db].execute(query)


async class AsyncDatabaseConnection:
    """Async database connection"""

    def __init__(self, db_name: str):
        self.db_name = db_name

    async def __aenter__(self):
        print(f'[ASYNC DB] Connecting to {self.db_name}')
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, *args):
        print(f'[ASYNC DB] Disconnecting from {self.db_name}')
        await asyncio.sleep(0.05)

    async def query(self, sql: str):
        await asyncio.sleep(0.05)
        return f'Async result: {sql}'


# Example: Multi-database transaction with backup
print('=== Multi-Database Transaction ===')
try:
    with TransactionManager(['db1', 'db2']) as tx:
        with ExitStack() as stack:
            backup = stack.enter_context(backup_context('/backup/latest'))
            print(f'Backup: {backup}')

            result1 = tx.execute('db1', 'INSERT INTO users VALUES (...)')
            result2 = tx.execute('db2', 'INSERT INTO logs VALUES (...)')
            print(f'Results: {result1}, {result2}')
except Exception as e:
    print(f'Transaction failed: {e}')

print('\n=== Multi-Database with Failure ===')
try:
    with TransactionManager(['db1', 'db2']) as tx:
        result = tx.execute('db1', 'SELECT * FROM users')
        print(f'DB1 result: {result}')
        raise ValueError('Simulated error!')
except ValueError as e:
    print(f'Caught: {e}')


# Example: Async operations
async def async_example():
    print('\n=== Async Context Managers ===')
    async with AsyncExitStack() as stack:
        db1 = await stack.enter_async_context(AsyncDatabaseConnection('async_db1'))
        db2 = await stack.enter_async_context(AsyncDatabaseConnection('async_db2'))

        result1 = await db1.query('SELECT * FROM users')
        result2 = await db2.query('SELECT * FROM posts')
        print(f'Results: {result1}, {result2}')


asyncio.run(async_example())

Output:

=== Multi-Database Transaction ===
[TX] Starting transaction
[DB] Connecting to db1
[DB] Connecting to db2
[BACKUP] Starting backup to /backup/latest
Backup: Backup handle for /backup/latest
DB1 result: Result of: INSERT INTO users VALUES (...)
DB2 result: Result of: INSERT INTO logs VALUES (...)
[BACKUP] Completed backup to /backup/latest
[TX] Committing
[DB] Disconnecting from db1
[DB] Disconnecting from db2

=== Multi-Database with Failure ===
[TX] Starting transaction
[DB] Connecting to db1
[DB] Connecting to db2
DB1 result: Result of: SELECT * FROM users
[TX] Rolling back (error: ValueError)
[DB] Disconnecting from db1
[DB] Disconnecting from db2
Caught: Simulated error!

=== Async Context Managers ===
[ASYNC DB] Connecting to async_db1
[ASYNC DB] Connecting to async_db2
Results: Async result: SELECT * FROM users, Async result: SELECT * FROM posts
[ASYNC DB] Disconnecting from async_db2
[ASYNC DB] Disconnecting from async_db1

This project demonstrates:

  • Class-based context managers with full control
  • Simple function-based contexts with @contextmanager
  • ExitStack for combining contexts
  • Multi-database transactions with automatic rollback
  • Async contexts for concurrent operations
  • Proper cleanup ordering (LIFO)
Combining context managers, ExitStack, and async patterns ha
Combining context managers, ExitStack, and async patterns handles complex real-world scenarios

Frequently Asked Questions

Do I always need nested ‘with’ statements?

No, use ExitStack or multiple context managers on one line: with open('file1') as f1, open('file2') as f2: is valid syntax and cleaner than nesting. Use ExitStack when the number of contexts is dynamic or determined at runtime.

When should I suppress exceptions in __exit__?

Rarely. Only suppress exceptions when you’re absolutely certain it’s safe. Common cases: ignoring FileNotFoundError in a cleanup handler, or suppressing expected exceptions in a retry context manager. If unsure, return False and let the exception propagate.

What’s the difference between @contextmanager and a class?

@contextmanager is simpler for one-off contexts but can’t store complex state easily. Classes are more explicit and easier to debug. If you need attributes or multiple methods, use a class. If it’s a simple setup/cleanup, @contextmanager is cleaner.

Why do ExitStack callbacks run in reverse order?

LIFO (last-in, first-out) order ensures dependencies are satisfied. If callback B depends on callback A being active, register A first, then B. When exiting, B runs first (and can still use A), then A. This matches the pattern of nested contexts.

Can I mix async and sync context managers?

No, async code requires async with and __aenter__/__aexit__. Sync contexts use with and __enter__/__exit__. You can use AsyncExitStack to manage multiple async contexts, but you can’t use sync contexts inside an async context. However, you can call sync code from async using asyncio.to_thread().

How do I catch exceptions in a @contextmanager generator?

Wrap the yield in try/except: try: yield ... except ValueError: handle_error(). The except block runs if the code in the with body raises that exception. You can suppress (no re-raise) or let it propagate (re-raise or use bare raise).

Conclusion

Context managers are one of Python’s most powerful and elegant features. The class-based __enter__/__exit__ protocol gives you maximum control, the @contextmanager decorator simplifies common patterns, and ExitStack/AsyncExitStack handle dynamic and concurrent resource management. By building custom context managers, you transform error-prone manual cleanup into bulletproof automated patterns that your future self (and your teammates) will appreciate.

For deeper understanding, see the official contextlib documentation and with statement reference.

  • Understanding Python Decorators
  • Exception Handling and Best Practices in Python
  • Async/Await and Asyncio Explained
  • Working with Files and Directories in Python
  • Database Transactions and ORM Patterns in Python