Intermediate

You have async code everywhere — FastAPI endpoints, aiohttp clients, asyncio workers — but every time you touch the filesystem you reach for the standard open() built-in. That one blocking call quietly freezes your entire event loop, starving every other coroutine until the disk responds. On SSDs the pause is milliseconds; under load with dozens of concurrent requests it compounds into a real latency problem.

The fix is aiofiles, a small drop-in library that wraps Python’s file I/O in a thread pool so your event loop stays free while bytes move between disk and memory. It mirrors the familiar open() interface almost exactly, so adoption costs nearly nothing — you change open to aiofiles.open and add await in a few places.

This article covers everything you need to use aiofiles effectively: installing it, reading and writing text and binary files, appending to logs, iterating lines, handling exceptions, and combining it with real async workflows. By the end you will have a working async log-aggregator script that processes multiple files concurrently without blocking.

Reading a File Asynchronously: Quick Example

The fastest way to see aiofiles in action is to open a file and read its contents inside a coroutine. The pattern mirrors open() almost exactly — the only differences are async with and await.

# quick_read.py
import asyncio
import aiofiles

async def read_file(path):
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        content = await f.read()
    return content

async def main():
    # Create a sample file first
    async with aiofiles.open('sample.txt', 'w', encoding='utf-8') as f:
        await f.write('Hello from aiofiles!\nLine two.\nLine three.')

    text = await read_file('sample.txt')
    print(text)

asyncio.run(main())

Output:

Hello from aiofiles!
Line two.
Line three.

aiofiles.open() returns an async context manager. Inside the async with block, every I/O method — read(), write(), readline() — is a coroutine that you await. The file is closed automatically when the block exits, just like regular with open(). Under the hood the actual read runs in a thread pool, releasing the event loop while waiting for the OS.

The sections below show how to extend this pattern to binary files, line-by-line iteration, append mode, and concurrent processing of multiple files.

What Is aiofiles and Why Use It?

Python’s built-in open() is synchronous. When your code calls f.read(), the calling thread blocks until the operating system delivers the data. In a regular script that is fine — there is only one task running. In an asyncio program, blocking the thread blocks the entire event loop, which means every other coroutine waiting for a network response, a timer, or another file has to wait too.

aiofiles solves this by delegating each file operation to a thread pool using asyncio.get_event_loop().run_in_executor() internally. Your coroutine suspends with await, the event loop schedules other work, and when the thread finishes the I/O the coroutine resumes. The result is a non-blocking file API with a familiar interface.

ScenarioUse open()Use aiofiles
Simple one-off scriptYesOptional
FastAPI / aiohttp endpoint reading a config fileNo — blocks loopYes
Concurrent processing of 20 log filesNo — sequentialYes — runs in parallel
Writing a large CSV from a background taskNo — freezes serverYes
One-time data migration script (no server)Yes — simplerNot needed

Install aiofiles with a single command. It has no external dependencies beyond a recent Python (3.6+):

# terminal
pip install aiofiles

Reading Text and Binary Files

Reading an Entire File at Once

For small-to-medium files (a few MB or less) the simplest approach is await f.read(), which returns the entire content as a string (text mode) or bytes (binary mode). The example below reads a UTF-8 text file and a binary PNG header.

# read_modes.py
import asyncio
import aiofiles

async def read_text(path):
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        return await f.read()

async def read_binary(path):
    async with aiofiles.open(path, mode='rb') as f:
        header = await f.read(8)   # Read first 8 bytes
    return header

async def main():
    # Write a sample text file
    async with aiofiles.open('notes.txt', 'w', encoding='utf-8') as f:
        await f.write('asyncio is non-blocking\naiofiles keeps it that way\n')

    text = await read_text('notes.txt')
    print('Text:', repr(text))

    # Write a fake binary file
    async with aiofiles.open('data.bin', 'wb') as f:
        await f.write(b'\x89PNG\r\n\x1a\n')  # PNG magic bytes

    header = await read_binary('data.bin')
    print('Header bytes:', header)

asyncio.run(main())

Output:

Text: 'asyncio is non-blocking\naiofiles keeps it that way\n'
Header bytes: b'\x89PNG\r\n\x1a\n'

The mode parameter works exactly like the built-in open(): 'r' for text, 'rb' for binary, 'w' for write, 'a' for append, and so on. Passing a byte count to read(n) limits the read to n bytes — useful for inspecting file headers without loading the whole file.

Reading Line by Line

Large log files should not be loaded into memory at once. aiofiles supports async iteration — you can loop over the file object just like a regular file, but with async for. Each iteration awaits the next line without blocking the loop.

# readline_loop.py
import asyncio
import aiofiles

async def count_errors(log_path):
    error_count = 0
    async with aiofiles.open(log_path, mode='r', encoding='utf-8') as f:
        async for line in f:
            if 'ERROR' in line:
                error_count += 1
    return error_count

async def main():
    # Create a sample log file
    log_lines = [
        '2026-05-07 INFO server started\n',
        '2026-05-07 ERROR connection refused: db host unreachable\n',
        '2026-05-07 INFO request handled in 12ms\n',
        '2026-05-07 ERROR timeout after 30s on /api/data\n',
        '2026-05-07 INFO cache hit ratio: 0.87\n',
    ]
    async with aiofiles.open('app.log', 'w', encoding='utf-8') as f:
        await f.writelines(log_lines)

    errors = await count_errors('app.log')
    print(f'Error lines found: {errors}')

asyncio.run(main())

Output:

Error lines found: 2

async for line in f is the key pattern here. It reads one line at a time, yields control back to the event loop between iterations, and is memory-safe for files of any size. writelines() also accepts any iterable of strings and is awaitable just like write().

Writing and Appending to Files

Writing a New File

Opening a file with mode='w' creates it if it does not exist, or truncates it if it does. The example below writes a JSON config file asynchronously — a common pattern in async web apps that need to persist settings.

# write_config.py
import asyncio
import aiofiles
import json

async def save_config(path, config):
    json_str = json.dumps(config, indent=2)
    async with aiofiles.open(path, mode='w', encoding='utf-8') as f:
        await f.write(json_str)
    print(f'Saved {len(json_str)} bytes to {path}')

async def main():
    config = {
        'host': 'localhost',
        'port': 8080,
        'debug': False,
        'allowed_origins': ['https://example.com', 'https://app.example.com']
    }
    await save_config('config.json', config)

    # Verify it was written
    async with aiofiles.open('config.json', 'r', encoding='utf-8') as f:
        loaded = json.loads(await f.read())
    print('Loaded back:', loaded['host'], loaded['port'])

asyncio.run(main())

Output:

Saved 126 bytes to config.json
Loaded back: localhost 8080

Appending to a Log File

Using mode='a' keeps existing content intact and adds new lines at the end — ideal for audit trails or rolling application logs. The file is created if it does not exist.

# append_log.py
import asyncio
import aiofiles
from datetime import datetime

async def log_event(path, level, message):
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    line = f'[{timestamp}] {level}: {message}\n'
    async with aiofiles.open(path, mode='a', encoding='utf-8') as f:
        await f.write(line)

async def main():
    log_path = 'events.log'
    await log_event(log_path, 'INFO',  'Application started')
    await log_event(log_path, 'WARN',  'Config missing -- using defaults')
    await log_event(log_path, 'ERROR', 'Database connection failed')

    async with aiofiles.open(log_path, 'r', encoding='utf-8') as f:
        print(await f.read())

asyncio.run(main())

Output:

[2026-05-07 09:00:01] INFO: Application started
[2026-05-07 09:00:01] WARN: Config missing -- using defaults
[2026-05-07 09:00:01] ERROR: Database connection failed

Each aiofiles.open() call in append mode opens, writes, and closes the file. Because the writes are sequential in this example, the order is guaranteed. If you need to append from multiple concurrent coroutines, consider using a single persistent file handle or an asyncio.Lock to prevent interleaved writes.

Processing Multiple Files Concurrently

The real payoff of aiofiles comes when you need to process many files at once. Using asyncio.gather() you can launch all the reads simultaneously — each runs in the thread pool and the event loop handles them concurrently. A synchronous version would process them one after another.

# concurrent_read.py
import asyncio
import aiofiles
import time

async def read_and_count_lines(path):
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        content = await f.read()
    return path, len(content.splitlines())

async def main():
    # Create 5 sample files
    for i in range(5):
        async with aiofiles.open(f'log_{i}.txt', 'w', encoding='utf-8') as f:
            await f.write('\n'.join(f'Line {j}' for j in range((i + 1) * 100)))

    start = time.perf_counter()
    paths = [f'log_{i}.txt' for i in range(5)]
    results = await asyncio.gather(*[read_and_count_lines(p) for p in paths])
    elapsed = time.perf_counter() - start

    for path, lines in results:
        print(f'{path}: {lines} lines')
    print(f'Completed in {elapsed:.4f}s')

asyncio.run(main())

Output:

log_0.txt: 100 lines
log_1.txt: 200 lines
log_2.txt: 300 lines
log_3.txt: 400 lines
log_4.txt: 500 lines
Completed in 0.0021s

asyncio.gather() submits all five coroutines at once. Because aiofiles uses a thread pool, the OS can schedule overlapping reads. The speedup over a sequential loop grows as file count and file size increase — or when reading from a network-mounted filesystem where latency dominates.

Error Handling and File Safety

Async context managers clean up reliably — the async with block guarantees the file is closed even if an exception is raised. Always wrap file operations in try/except when the path might not exist or when the content might be corrupt.

# safe_read.py
import asyncio
import aiofiles

async def safe_read(path):
    try:
        async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
            return await f.read()
    except FileNotFoundError:
        print(f'File not found: {path}')
        return None
    except PermissionError:
        print(f'Permission denied: {path}')
        return None
    except UnicodeDecodeError:
        # File exists but is not valid UTF-8 -- try binary fallback
        async with aiofiles.open(path, mode='rb') as f:
            raw = await f.read()
        return raw.decode('latin-1', errors='replace')

async def main():
    result = await safe_read('missing_file.txt')
    print('Missing file result:', result)

    # Write a valid file then read it
    async with aiofiles.open('valid.txt', 'w', encoding='utf-8') as f:
        await f.write('valid content')
    result = await safe_read('valid.txt')
    print('Valid file result:', result)

asyncio.run(main())

Output:

File not found: missing_file.txt
Missing file result: None
Valid file result: valid content

The async with guarantee means you never need a manual close() call. If your code raises an exception after opening but before finishing, Python’s __aexit__ method is still called and the OS file descriptor is released. This is the same guarantee as synchronous with open(), extended to the async world.

Real-Life Example: Async Log Aggregator

The script below reads several log files concurrently, extracts all ERROR lines, sorts them by timestamp, and writes a combined error report. This is a realistic maintenance utility for any async backend.

# log_aggregator.py
import asyncio
import aiofiles
from pathlib import Path

async def extract_errors(log_path):
    errors = []
    try:
        async with aiofiles.open(log_path, mode='r', encoding='utf-8') as f:
            async for line in f:
                stripped = line.strip()
                if stripped and 'ERROR' in stripped:
                    errors.append((str(log_path), stripped))
    except FileNotFoundError:
        print(f'Skipping missing file: {log_path}')
    return errors

async def write_report(report_path, errors):
    async with aiofiles.open(report_path, mode='w', encoding='utf-8') as f:
        await f.write(f'Error Report -- {len(errors)} errors found\n')
        await f.write('=' * 50 + '\n')
        for source, line in errors:
            await f.write(f'[{source}] {line}\n')

async def main():
    # Create sample log files
    sample_logs = {
        'service_a.log': [
            '2026-05-07 08:01:00 INFO request received\n',
            '2026-05-07 08:01:05 ERROR DB timeout on query /users\n',
            '2026-05-07 08:01:10 INFO cache miss, querying DB\n',
        ],
        'service_b.log': [
            '2026-05-07 08:02:00 INFO worker started\n',
            '2026-05-07 08:02:15 ERROR queue consumer crashed: ConnectionReset\n',
            '2026-05-07 08:02:20 INFO restarting consumer\n',
        ],
        'service_c.log': [
            '2026-05-07 08:03:00 INFO all systems nominal\n',
        ],
    }
    for name, lines in sample_logs.items():
        async with aiofiles.open(name, 'w', encoding='utf-8') as f:
            await f.writelines(lines)

    # Concurrently extract errors from all logs
    log_files = list(sample_logs.keys())
    results = await asyncio.gather(*[extract_errors(p) for p in log_files])

    all_errors = [item for sublist in results for item in sublist]
    all_errors.sort(key=lambda x: x[1][:19])  # Sort by timestamp prefix

    await write_report('error_report.txt', all_errors)
    print(f'Report written: {len(all_errors)} errors aggregated')

    async with aiofiles.open('error_report.txt', 'r', encoding='utf-8') as f:
        print(await f.read())

asyncio.run(main())

Output:

Report written: 2 errors aggregated
Error Report -- 2 errors found
==================================================
[service_a.log] 2026-05-07 08:01:05 ERROR DB timeout on query /users
[service_b.log] 2026-05-07 08:02:15 ERROR queue consumer crashed: ConnectionReset

The aggregator reads all log files concurrently using asyncio.gather(), flattens the results, sorts by timestamp, and writes the combined report asynchronously. In a real deployment you would pass actual log file paths using pathlib.Path.glob('*.log'). You can extend this to send the report via email or post it to a Slack webhook using aiohttp — all without blocking the event loop.

Frequently Asked Questions

Is aiofiles always faster than regular open()?

Not necessarily for a single file read in an otherwise synchronous script. The overhead of the thread pool and event loop scheduling adds a tiny fixed cost. aiofiles shines when you have multiple concurrent I/O operations — that is where it delivers real speedups by allowing overlap. In a simple script with one file, use the standard open().

Can I use seek() and tell() with aiofiles?

Yes. Both await f.seek(offset) and await f.tell() are supported. They work the same as their synchronous counterparts, just wrapped as coroutines. This is useful for reading from specific byte positions in binary files or implementing resume-on-failure for large uploads.

Can I use a custom thread pool with aiofiles?

Yes. aiofiles.open() accepts a loop and executor keyword argument. You can pass a custom concurrent.futures.ThreadPoolExecutor to control the number of worker threads. The default uses Python’s default thread pool, which is usually sufficient but can be tuned for I/O-intensive workloads.

How do I use aiofiles with FastAPI for file uploads?

FastAPI uses aiofiles internally for its FileResponse class. For saving uploaded files, open the destination path with aiofiles.open(dest, 'wb') inside your async route handler and await f.write(chunk) in a loop. Never use synchronous open() inside a FastAPI route — it blocks the event loop and degrades performance under concurrent requests.

Does aiofiles work with pathlib.Path objects?

Yes. You can pass a Path object directly to aiofiles.open() just like the built-in open(). For example: async with aiofiles.open(Path('data') / 'output.txt', 'w') as f works without any conversion. This makes it easy to build cross-platform async file utilities with pathlib.

Conclusion

You have covered the full aiofiles toolkit: opening files in text and binary modes, reading entire files or iterating line by line, writing and appending, processing multiple files concurrently with asyncio.gather(), and handling errors safely with try/except inside async context managers. Every pattern here is a direct drop-in for code that currently uses the blocking open() built-in.

The log aggregator shows a practical scenario where non-blocking I/O pays off: reading several files in parallel, combining results, and writing a report — all without stalling your event loop. Extend it to watch a directory with watchdog or asyncio streams, ship the report over aiohttp, or plug it into a FastAPI background task. The foundation is solid.

For a deeper dive into the internals and advanced use cases, see the official aiofiles documentation on GitHub and the Python asyncio documentation.