Beginner

You have a script that processes CSV files dropped into a folder, or a service that should reload its configuration when the config file changes, or a development tool that reruns tests whenever you save a file. All of these need the same thing: a reliable way to watch for file system changes and react immediately. The standard Python approach — polling a folder in a loop with time.sleep() — works but is slow to react and wastes CPU. The watchfiles library does this properly: it uses OS-native file system events (inotify on Linux, kqueue on macOS, ReadDirectoryChangesW on Windows) to detect changes instantly with no polling overhead.

watchfiles is a modern Python file watching library written in Rust, which makes it significantly faster than pure-Python alternatives like watchdog. It supports both synchronous and async usage, integrates cleanly with asyncio, and has a simple API that gets you from installation to working watcher in about ten lines of code. You need Python 3.8+ and pip install watchfiles — no additional system packages are required on any major platform.

This article covers the complete watchfiles API: basic synchronous watching, async watching with awatch, filtering which files to monitor, handling specific change types (added, modified, deleted), watching multiple directories, and building a real-time CSV processing pipeline. By the end you will have a production-ready file watching pattern you can adapt for any live processing use case.

File Watching: Quick Example

Install watchfiles and try this minimal watcher. In one terminal, run the script. In another, create or modify files in the current directory to see events:

# quick_watch.py
from watchfiles import watch
import sys

print("Watching current directory for changes. Press Ctrl+C to stop.")

for changes in watch("."):
    for change_type, path in changes:
        print(f"{change_type.name}: {path}")

Output (when files change):

Watching current directory for changes. Press Ctrl+C to stop.
added: /home/user/project/new_file.txt
modified: /home/user/project/config.json
deleted: /home/user/project/temp.log

watch() returns a generator that yields a set of (Change, path) tuples each time files change. Change is an enum with three values: Change.added, Change.modified, and Change.deleted. Each iteration of the for loop blocks until at least one change is detected, then yields all changes that happened in that batch. Multiple changes (a rename that creates one file and deletes another) arrive in the same batch.

Watchfiles OS-native change detection
OS-native events. Zero polling. Detects changes before you lift your finger off Save.

What Is watchfiles and Why Use It?

Before watchfiles, the standard Python file watching library was watchdog. Both use OS-native events, but watchfiles has a simpler API and better async support. The Rust implementation also makes it noticeably faster for high-frequency change detection.

FeatureManual pollingwatchdogwatchfiles
Change detection speedUp to sleep intervalNear-instantNear-instant
CPU usage (idle)ConstantNear-zeroNear-zero
Async supportManualComplexFirst-class (awatch)
API complexitySimple (wrong)Observer/Handler classesSimple generator
PerformanceSlowGoodVery fast (Rust)

watchfiles is already used internally by Uvicorn and FastAPI for their --reload development mode — so if you have used uvicorn app:app --reload, you have already relied on watchfiles under the hood.

Installation

# Terminal
pip install watchfiles

# Verify
python3 -c "import watchfiles; print(watchfiles.__version__)"

Output:

0.24.0

No additional system packages are required. On Linux, watchfiles uses inotify (built into the kernel). On macOS it uses FSEvents via kqueue. On Windows it uses ReadDirectoryChangesW. The Rust extension handles all platform differences internally.

Filtering by Change Type and Path

You often only care about specific change types or specific file extensions. watchfiles provides the Change enum and filter functions for this:

# change_filtering.py
from watchfiles import watch, Change

# Only react to modified files, ignore additions and deletions
print("Watching for file modifications only...")
for changes in watch(".", watch_filter=lambda change, path: change == Change.modified):
    for change_type, path in changes:
        print(f"Modified: {path}")

Output (when a file is modified):

Watching for file modifications only...
Modified: /home/user/project/data.csv
# extension_filter.py
from watchfiles import watch, Change

# Only watch Python files
def python_only(change: Change, path: str) -> bool:
    return path.endswith(".py")

print("Watching for Python file changes...")
for changes in watch(".", watch_filter=python_only):
    for change_type, path in changes:
        print(f"{change_type.name}: {path}")

Output (when a .py file changes):

Watching for Python file changes...
modified: /home/user/project/main.py
added: /home/user/project/utils.py

The watch_filter parameter accepts any callable that takes a Change value and a path string and returns a boolean. When the filter returns False for an event, that event is silently discarded. If all events in a batch are filtered out, watchfiles does not yield that batch at all — your loop only runs when at least one passing event occurs.

Watchfiles watch_filter for file types
watch_filter: the change happened. Your code decides if it cares.

Watching Multiple Directories

watch() accepts multiple paths as positional arguments. All paths are monitored in a single watcher instance, and changes from any of them arrive in the same iterator:

# multi_directory_watch.py
from watchfiles import watch
import os

# Create test directories
os.makedirs("configs", exist_ok=True)
os.makedirs("data/incoming", exist_ok=True)

print("Watching configs/ and data/incoming/ for changes...")
for changes in watch("configs", "data/incoming"):
    for change_type, path in changes:
        # Determine which directory the change came from
        if "configs" in path:
            print(f"Config changed: {change_type.name} -- {path}")
            print("  --> Triggering config reload")
        elif "incoming" in path:
            print(f"New data: {change_type.name} -- {path}")
            print("  --> Triggering data processing")

Output (when files change):

Watching configs/ and data/incoming/ for changes...
Config changed: modified -- /home/user/project/configs/app.yaml
  --> Triggering config reload
New data: added -- /home/user/project/data/incoming/report_2026-05-19.csv
  --> Triggering data processing

Async Watching with awatch

For async applications — FastAPI handlers, async scripts, or any asyncio-based code — use awatch() instead of watch(). The API is identical, but it is an async generator:

# async_watch.py
import asyncio
from watchfiles import awatch

async def process_change(path: str) -> None:
    """Simulate async processing of a changed file."""
    await asyncio.sleep(0.1)  # replace with real async I/O
    print(f"  Processed: {path}")

async def main():
    print("Async watcher running...")
    async for changes in awatch("."):
        print(f"Batch of {len(changes)} change(s):")
        tasks = []
        for change_type, path in changes:
            print(f"  {change_type.name}: {path}")
            if change_type.name in ("added", "modified"):
                tasks.append(process_change(path))
        if tasks:
            await asyncio.gather(*tasks)

asyncio.run(main())

Output (when files change):

Async watcher running...
Batch of 2 change(s):
  added: /home/user/project/log_001.txt
  modified: /home/user/project/log_002.txt
  Processed: /home/user/project/log_001.txt
  Processed: /home/user/project/log_002.txt

Using asyncio.gather() to process multiple changes concurrently is the right pattern here — if three files arrive in the same batch, you want to process them in parallel, not sequentially. The awatch() generator itself runs in a background thread so the async for loop does not block the event loop while waiting for file system events.

Watchfiles asyncio batch processing
asyncio.gather() on the batch. Process 10 files in the time it takes to process one.

Stopping and Timeout Control

watchfiles provides built-in timeout and stop event support for clean shutdown in services:

# controlled_watcher.py
import asyncio
import threading
from watchfiles import awatch

async def watch_with_timeout():
    """Watch for 30 seconds then stop automatically."""
    print("Watching for 30 seconds...")
    async for changes in awatch(".", stop_event=asyncio.Event(), timeout=30000):
        for change_type, path in changes:
            print(f"{change_type.name}: {path}")
    print("Watch period ended.")

async def watch_with_stop_event():
    """Watch until explicitly stopped via an event."""
    stop = asyncio.Event()

    # Schedule a stop after 10 seconds (simulating shutdown signal)
    async def stopper():
        await asyncio.sleep(10)
        print("\nSignaling stop...")
        stop.set()

    asyncio.create_task(stopper())

    async for changes in awatch(".", stop_event=stop):
        for change_type, path in changes:
            print(f"{change_type.name}: {path}")
    print("Stopped cleanly.")

asyncio.run(watch_with_stop_event())

Output:

modified: /home/user/project/test.txt
Signaling stop...
Stopped cleanly.

The stop_event pattern is the correct way to handle graceful shutdown in a service. When your application receives a SIGTERM signal (from systemd, Docker, or Kubernetes), set the stop event and the watcher loop exits cleanly after processing any pending events. The timeout parameter is in milliseconds and causes the watcher to exit if no changes occur within that period — useful for integration tests or bounded watch sessions.

Real-Life Example: Live CSV Processing Pipeline

Watchfiles asyncio ETL pipeline
awatch + asyncio: your ETL pipeline wakes up the moment the file lands.
# csv_pipeline.py
"""
Real-time CSV processing pipeline using watchfiles.
Watches an incoming/ folder and processes new CSV files immediately.
"""
import asyncio
import csv
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
from watchfiles import awatch, Change

# Directory structure
INCOMING = Path("incoming")
PROCESSED = Path("processed")
FAILED = Path("failed")
SUMMARY_FILE = Path("pipeline_summary.json")

for d in [INCOMING, PROCESSED, FAILED]:
    d.mkdir(exist_ok=True)

async def process_csv(path: str) -> dict:
    """Read a CSV file, compute basic stats, return summary."""
    filepath = Path(path)
    rows = []
    try:
        with open(filepath, newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                rows.append(row)

        if not rows:
            raise ValueError("Empty CSV file")

        summary = {
            "file": filepath.name,
            "processed_at": datetime.utcnow().isoformat(),
            "row_count": len(rows),
            "columns": list(rows[0].keys()) if rows else [],
            "status": "ok"
        }

        # Move to processed folder
        dest = PROCESSED / filepath.name
        shutil.move(str(filepath), str(dest))
        print(f"  [OK] {filepath.name} -- {len(rows)} rows, cols: {summary['columns']}")
        return summary

    except Exception as e:
        # Move to failed folder for investigation
        dest = FAILED / filepath.name
        shutil.move(str(filepath), str(dest))
        print(f"  [FAIL] {filepath.name} -- {e}")
        return {"file": filepath.name, "status": "failed", "error": str(e)}

def update_summary(results: list[dict]) -> None:
    existing = []
    if SUMMARY_FILE.exists():
        with open(SUMMARY_FILE) as f:
            existing = json.load(f)
    existing.extend(results)
    with open(SUMMARY_FILE, "w") as f:
        json.dump(existing, f, indent=2)

async def main():
    print(f"Pipeline watching {INCOMING}/ for new CSV files...")
    print(f"Results -> {PROCESSED}/, failures -> {FAILED}/\n")

    async for changes in awatch(str(INCOMING)):
        new_csvs = [
            path for change, path in changes
            if change == Change.added and path.endswith(".csv")
        ]
        if not new_csvs:
            continue

        print(f"Detected {len(new_csvs)} new CSV file(s):")
        tasks = [process_csv(path) for path in new_csvs]
        results = await asyncio.gather(*tasks)
        update_summary(list(results))
        print(f"Batch complete. Summary updated.\n")

if __name__ == "__main__":
    asyncio.run(main())

Test it:

# In terminal 1: run the pipeline
python3 csv_pipeline.py

# In terminal 2: drop test CSV files into incoming/
echo "name,age,city
Alice,32,Melbourne
Bob,28,Sydney" > incoming/users_001.csv

echo "product,price,qty
Widget,9.99,100
Gadget,24.99,50" > incoming/products_001.csv

# Terminal 1 output:
Pipeline watching incoming/ for new CSV files...
Results -> processed/, failures -> failed/

Detected 2 new CSV file(s):
  [OK] users_001.csv -- 2 rows, cols: ['name', 'age', 'city']
  [OK] products_001.csv -- 2 rows, cols: ['product', 'price', 'qty']
Batch complete. Summary updated.

This pipeline handles the real production concerns: concurrent processing of multiple files in the same batch, error isolation (a bad CSV does not block good ones), file movement after processing (so no file is ever processed twice), and a running summary log. Extend it by adding a database write step in process_csv(), or replace the CSV processing with any async operation — API calls, image processing, document parsing — without changing the watcher infrastructure.

Frequently Asked Questions

How is watchfiles different from watchdog?

watchdog uses an Observer/EventHandler class pattern where you subclass FileSystemEventHandler and attach it to an Observer. watchfiles uses a simpler generator pattern and has first-class async support. The Rust core of watchfiles also makes it faster and more memory-efficient for high-frequency change detection. Both work reliably for typical use cases; watchfiles is the better choice for new projects, especially those using async Python.

Does watchfiles watch subdirectories automatically?

Yes — by default, watch() and awatch() watch recursively. All files and subdirectories within the watched path are monitored. You can scope the monitoring by using a specific subdirectory as the watched path, or by filtering with watch_filter to only react to paths matching your pattern.

This depends on the OS. On Linux with inotify, symbolic links are not followed — you need to watch the link target directly. On macOS with FSEvents, symbolic links within the watched directory tree are followed. For cross-platform reliability, watch the real path rather than a symlink: use os.path.realpath(path) before passing it to watch().

What happens with large directories (thousands of files)?

OS-native file watching scales much better than polling because the kernel only notifies your process when something actually changes. For very large directory trees (tens of thousands of files), inotify on Linux has a limit on the number of watches per user (/proc/sys/fs/inotify/max_user_watches, default 8192). If you hit this limit, increase it: echo 524288 | sudo tee /proc/sys/fs/inotify/max_user_watches. Persist the change in /etc/sysctl.conf for system reboots.

How do I debounce rapid changes to the same file?

watchfiles already does some internal debouncing — multiple changes to the same file within a short period are collapsed into one batch. For additional debouncing, track the last-processed time per file in a dict and skip files that were processed within the last N seconds: if (now - last_processed.get(path, 0)) < debounce_seconds: continue. This is especially useful when monitoring log files that are written to many times per second.

Conclusion

The watchfiles library gives you instant, CPU-efficient file system monitoring in about ten lines of Python. We covered synchronous watching with watch(), async watching with awatch(), filtering by change type and file extension, watching multiple directories, clean shutdown with stop events, and built a complete CSV processing pipeline that handles batches concurrently.

The next step is to integrate the pipeline with your existing data infrastructure — swap the CSV parsing for database writes, add a message queue to distribute work across multiple workers, or wrap the watcher in a systemd service for reliable production deployment. The file watching layer stays exactly as it is.

Full documentation and advanced usage examples are at watchfiles.helpmanual.io.