Intermediate
You have async code everywhere — FastAPI endpoints, aiohttp clients, asyncio workers — but every time you touch the filesystem you reach for the standard open() built-in. That one blocking call quietly freezes your entire event loop, starving every other coroutine until the disk responds. On SSDs the pause is milliseconds; under load with dozens of concurrent requests it compounds into a real latency problem.
The fix is aiofiles, a small drop-in library that wraps Python’s file I/O in a thread pool so your event loop stays free while bytes move between disk and memory. It mirrors the familiar open() interface almost exactly, so adoption costs nearly nothing — you change open to aiofiles.open and add await in a few places.
This article covers everything you need to use aiofiles effectively: installing it, reading and writing text and binary files, appending to logs, iterating lines, handling exceptions, and combining it with real async workflows. By the end you will have a working async log-aggregator script that processes multiple files concurrently without blocking.
Reading a File Asynchronously: Quick Example
The fastest way to see aiofiles in action is to open a file and read its contents inside a coroutine. The pattern mirrors open() almost exactly — the only differences are async with and await.
# quick_read.py
import asyncio
import aiofiles
async def read_file(path):
async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
content = await f.read()
return content
async def main():
# Create a sample file first
async with aiofiles.open('sample.txt', 'w', encoding='utf-8') as f:
await f.write('Hello from aiofiles!\nLine two.\nLine three.')
text = await read_file('sample.txt')
print(text)
asyncio.run(main())
Output:
Hello from aiofiles!
Line two.
Line three.
aiofiles.open() returns an async context manager. Inside the async with block, every I/O method — read(), write(), readline() — is a coroutine that you await. The file is closed automatically when the block exits, just like regular with open(). Under the hood the actual read runs in a thread pool, releasing the event loop while waiting for the OS.
The sections below show how to extend this pattern to binary files, line-by-line iteration, append mode, and concurrent processing of multiple files.
What Is aiofiles and Why Use It?
Python’s built-in open() is synchronous. When your code calls f.read(), the calling thread blocks until the operating system delivers the data. In a regular script that is fine — there is only one task running. In an asyncio program, blocking the thread blocks the entire event loop, which means every other coroutine waiting for a network response, a timer, or another file has to wait too.
aiofiles solves this by delegating each file operation to a thread pool using asyncio.get_event_loop().run_in_executor() internally. Your coroutine suspends with await, the event loop schedules other work, and when the thread finishes the I/O the coroutine resumes. The result is a non-blocking file API with a familiar interface.
| Scenario | Use open() | Use aiofiles |
|---|---|---|
| Simple one-off script | Yes | Optional |
| FastAPI / aiohttp endpoint reading a config file | No — blocks loop | Yes |
| Concurrent processing of 20 log files | No — sequential | Yes — runs in parallel |
| Writing a large CSV from a background task | No — freezes server | Yes |
| One-time data migration script (no server) | Yes — simpler | Not needed |
Install aiofiles with a single command. It has no external dependencies beyond a recent Python (3.6+):
# terminal
pip install aiofiles
Reading Text and Binary Files
Reading an Entire File at Once
For small-to-medium files (a few MB or less) the simplest approach is await f.read(), which returns the entire content as a string (text mode) or bytes (binary mode). The example below reads a UTF-8 text file and a binary PNG header.
# read_modes.py
import asyncio
import aiofiles
async def read_text(path):
async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
return await f.read()
async def read_binary(path):
async with aiofiles.open(path, mode='rb') as f:
header = await f.read(8) # Read first 8 bytes
return header
async def main():
# Write a sample text file
async with aiofiles.open('notes.txt', 'w', encoding='utf-8') as f:
await f.write('asyncio is non-blocking\naiofiles keeps it that way\n')
text = await read_text('notes.txt')
print('Text:', repr(text))
# Write a fake binary file
async with aiofiles.open('data.bin', 'wb') as f:
await f.write(b'\x89PNG\r\n\x1a\n') # PNG magic bytes
header = await read_binary('data.bin')
print('Header bytes:', header)
asyncio.run(main())
Output:
Text: 'asyncio is non-blocking\naiofiles keeps it that way\n'
Header bytes: b'\x89PNG\r\n\x1a\n'
The mode parameter works exactly like the built-in open(): 'r' for text, 'rb' for binary, 'w' for write, 'a' for append, and so on. Passing a byte count to read(n) limits the read to n bytes — useful for inspecting file headers without loading the whole file.
Reading Line by Line
Large log files should not be loaded into memory at once. aiofiles supports async iteration — you can loop over the file object just like a regular file, but with async for. Each iteration awaits the next line without blocking the loop.
# readline_loop.py
import asyncio
import aiofiles
async def count_errors(log_path):
error_count = 0
async with aiofiles.open(log_path, mode='r', encoding='utf-8') as f:
async for line in f:
if 'ERROR' in line:
error_count += 1
return error_count
async def main():
# Create a sample log file
log_lines = [
'2026-05-07 INFO server started\n',
'2026-05-07 ERROR connection refused: db host unreachable\n',
'2026-05-07 INFO request handled in 12ms\n',
'2026-05-07 ERROR timeout after 30s on /api/data\n',
'2026-05-07 INFO cache hit ratio: 0.87\n',
]
async with aiofiles.open('app.log', 'w', encoding='utf-8') as f:
await f.writelines(log_lines)
errors = await count_errors('app.log')
print(f'Error lines found: {errors}')
asyncio.run(main())
Output:
Error lines found: 2
async for line in f is the key pattern here. It reads one line at a time, yields control back to the event loop between iterations, and is memory-safe for files of any size. writelines() also accepts any iterable of strings and is awaitable just like write().
Writing and Appending to Files
Writing a New File
Opening a file with mode='w' creates it if it does not exist, or truncates it if it does. The example below writes a JSON config file asynchronously — a common pattern in async web apps that need to persist settings.
# write_config.py
import asyncio
import aiofiles
import json
async def save_config(path, config):
json_str = json.dumps(config, indent=2)
async with aiofiles.open(path, mode='w', encoding='utf-8') as f:
await f.write(json_str)
print(f'Saved {len(json_str)} bytes to {path}')
async def main():
config = {
'host': 'localhost',
'port': 8080,
'debug': False,
'allowed_origins': ['https://example.com', 'https://app.example.com']
}
await save_config('config.json', config)
# Verify it was written
async with aiofiles.open('config.json', 'r', encoding='utf-8') as f:
loaded = json.loads(await f.read())
print('Loaded back:', loaded['host'], loaded['port'])
asyncio.run(main())
Output:
Saved 126 bytes to config.json
Loaded back: localhost 8080
Appending to a Log File
Using mode='a' keeps existing content intact and adds new lines at the end — ideal for audit trails or rolling application logs. The file is created if it does not exist.
# append_log.py
import asyncio
import aiofiles
from datetime import datetime
async def log_event(path, level, message):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
line = f'[{timestamp}] {level}: {message}\n'
async with aiofiles.open(path, mode='a', encoding='utf-8') as f:
await f.write(line)
async def main():
log_path = 'events.log'
await log_event(log_path, 'INFO', 'Application started')
await log_event(log_path, 'WARN', 'Config missing -- using defaults')
await log_event(log_path, 'ERROR', 'Database connection failed')
async with aiofiles.open(log_path, 'r', encoding='utf-8') as f:
print(await f.read())
asyncio.run(main())
Output:
[2026-05-07 09:00:01] INFO: Application started
[2026-05-07 09:00:01] WARN: Config missing -- using defaults
[2026-05-07 09:00:01] ERROR: Database connection failed
Each aiofiles.open() call in append mode opens, writes, and closes the file. Because the writes are sequential in this example, the order is guaranteed. If you need to append from multiple concurrent coroutines, consider using a single persistent file handle or an asyncio.Lock to prevent interleaved writes.
Processing Multiple Files Concurrently
The real payoff of aiofiles comes when you need to process many files at once. Using asyncio.gather() you can launch all the reads simultaneously — each runs in the thread pool and the event loop handles them concurrently. A synchronous version would process them one after another.
# concurrent_read.py
import asyncio
import aiofiles
import time
async def read_and_count_lines(path):
async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
content = await f.read()
return path, len(content.splitlines())
async def main():
# Create 5 sample files
for i in range(5):
async with aiofiles.open(f'log_{i}.txt', 'w', encoding='utf-8') as f:
await f.write('\n'.join(f'Line {j}' for j in range((i + 1) * 100)))
start = time.perf_counter()
paths = [f'log_{i}.txt' for i in range(5)]
results = await asyncio.gather(*[read_and_count_lines(p) for p in paths])
elapsed = time.perf_counter() - start
for path, lines in results:
print(f'{path}: {lines} lines')
print(f'Completed in {elapsed:.4f}s')
asyncio.run(main())
Output:
log_0.txt: 100 lines
log_1.txt: 200 lines
log_2.txt: 300 lines
log_3.txt: 400 lines
log_4.txt: 500 lines
Completed in 0.0021s
asyncio.gather() submits all five coroutines at once. Because aiofiles uses a thread pool, the OS can schedule overlapping reads. The speedup over a sequential loop grows as file count and file size increase — or when reading from a network-mounted filesystem where latency dominates.
Error Handling and File Safety
Async context managers clean up reliably — the async with block guarantees the file is closed even if an exception is raised. Always wrap file operations in try/except when the path might not exist or when the content might be corrupt.
# safe_read.py
import asyncio
import aiofiles
async def safe_read(path):
try:
async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
return await f.read()
except FileNotFoundError:
print(f'File not found: {path}')
return None
except PermissionError:
print(f'Permission denied: {path}')
return None
except UnicodeDecodeError:
# File exists but is not valid UTF-8 -- try binary fallback
async with aiofiles.open(path, mode='rb') as f:
raw = await f.read()
return raw.decode('latin-1', errors='replace')
async def main():
result = await safe_read('missing_file.txt')
print('Missing file result:', result)
# Write a valid file then read it
async with aiofiles.open('valid.txt', 'w', encoding='utf-8') as f:
await f.write('valid content')
result = await safe_read('valid.txt')
print('Valid file result:', result)
asyncio.run(main())
Output:
File not found: missing_file.txt
Missing file result: None
Valid file result: valid content
The async with guarantee means you never need a manual close() call. If your code raises an exception after opening but before finishing, Python’s __aexit__ method is still called and the OS file descriptor is released. This is the same guarantee as synchronous with open(), extended to the async world.
Real-Life Example: Async Log Aggregator
The script below reads several log files concurrently, extracts all ERROR lines, sorts them by timestamp, and writes a combined error report. This is a realistic maintenance utility for any async backend.
# log_aggregator.py
import asyncio
import aiofiles
from pathlib import Path
async def extract_errors(log_path):
errors = []
try:
async with aiofiles.open(log_path, mode='r', encoding='utf-8') as f:
async for line in f:
stripped = line.strip()
if stripped and 'ERROR' in stripped:
errors.append((str(log_path), stripped))
except FileNotFoundError:
print(f'Skipping missing file: {log_path}')
return errors
async def write_report(report_path, errors):
async with aiofiles.open(report_path, mode='w', encoding='utf-8') as f:
await f.write(f'Error Report -- {len(errors)} errors found\n')
await f.write('=' * 50 + '\n')
for source, line in errors:
await f.write(f'[{source}] {line}\n')
async def main():
# Create sample log files
sample_logs = {
'service_a.log': [
'2026-05-07 08:01:00 INFO request received\n',
'2026-05-07 08:01:05 ERROR DB timeout on query /users\n',
'2026-05-07 08:01:10 INFO cache miss, querying DB\n',
],
'service_b.log': [
'2026-05-07 08:02:00 INFO worker started\n',
'2026-05-07 08:02:15 ERROR queue consumer crashed: ConnectionReset\n',
'2026-05-07 08:02:20 INFO restarting consumer\n',
],
'service_c.log': [
'2026-05-07 08:03:00 INFO all systems nominal\n',
],
}
for name, lines in sample_logs.items():
async with aiofiles.open(name, 'w', encoding='utf-8') as f:
await f.writelines(lines)
# Concurrently extract errors from all logs
log_files = list(sample_logs.keys())
results = await asyncio.gather(*[extract_errors(p) for p in log_files])
all_errors = [item for sublist in results for item in sublist]
all_errors.sort(key=lambda x: x[1][:19]) # Sort by timestamp prefix
await write_report('error_report.txt', all_errors)
print(f'Report written: {len(all_errors)} errors aggregated')
async with aiofiles.open('error_report.txt', 'r', encoding='utf-8') as f:
print(await f.read())
asyncio.run(main())
Output:
Report written: 2 errors aggregated
Error Report -- 2 errors found
==================================================
[service_a.log] 2026-05-07 08:01:05 ERROR DB timeout on query /users
[service_b.log] 2026-05-07 08:02:15 ERROR queue consumer crashed: ConnectionReset
The aggregator reads all log files concurrently using asyncio.gather(), flattens the results, sorts by timestamp, and writes the combined report asynchronously. In a real deployment you would pass actual log file paths using pathlib.Path.glob('*.log'). You can extend this to send the report via email or post it to a Slack webhook using aiohttp — all without blocking the event loop.
Frequently Asked Questions
Is aiofiles always faster than regular open()?
Not necessarily for a single file read in an otherwise synchronous script. The overhead of the thread pool and event loop scheduling adds a tiny fixed cost. aiofiles shines when you have multiple concurrent I/O operations — that is where it delivers real speedups by allowing overlap. In a simple script with one file, use the standard open().
Can I use seek() and tell() with aiofiles?
Yes. Both await f.seek(offset) and await f.tell() are supported. They work the same as their synchronous counterparts, just wrapped as coroutines. This is useful for reading from specific byte positions in binary files or implementing resume-on-failure for large uploads.
Can I use a custom thread pool with aiofiles?
Yes. aiofiles.open() accepts a loop and executor keyword argument. You can pass a custom concurrent.futures.ThreadPoolExecutor to control the number of worker threads. The default uses Python’s default thread pool, which is usually sufficient but can be tuned for I/O-intensive workloads.
How do I use aiofiles with FastAPI for file uploads?
FastAPI uses aiofiles internally for its FileResponse class. For saving uploaded files, open the destination path with aiofiles.open(dest, 'wb') inside your async route handler and await f.write(chunk) in a loop. Never use synchronous open() inside a FastAPI route — it blocks the event loop and degrades performance under concurrent requests.
Does aiofiles work with pathlib.Path objects?
Yes. You can pass a Path object directly to aiofiles.open() just like the built-in open(). For example: async with aiofiles.open(Path('data') / 'output.txt', 'w') as f works without any conversion. This makes it easy to build cross-platform async file utilities with pathlib.
Conclusion
You have covered the full aiofiles toolkit: opening files in text and binary modes, reading entire files or iterating line by line, writing and appending, processing multiple files concurrently with asyncio.gather(), and handling errors safely with try/except inside async context managers. Every pattern here is a direct drop-in for code that currently uses the blocking open() built-in.
The log aggregator shows a practical scenario where non-blocking I/O pays off: reading several files in parallel, combining results, and writing a report — all without stalling your event loop. Extend it to watch a directory with watchdog or asyncio streams, ship the report over aiohttp, or plug it into a FastAPI background task. The foundation is solid.
For a deeper dive into the internals and advanced use cases, see the official aiofiles documentation on GitHub and the Python asyncio documentation.