Intermediate

You are building an async Python service with FastAPI or aiohttp and you need MongoDB as your database. The standard PyMongo driver is synchronous — every database call blocks the event loop, turning your async application into an accidentally single-threaded one. Motor is the official async MongoDB driver for Python: it wraps PyMongo with a non-blocking interface designed for asyncio, so your database queries and your API handlers can run concurrently without blocking each other.

Install motor with pip install motor. You will also need a running MongoDB instance — docker run -d -p 27017:27017 mongo:7 starts one locally with no authentication. For cloud MongoDB, use a free Atlas cluster at mongodb.net; the connection string works identically in motor. PyMongo is installed automatically as motor’s dependency.

This tutorial covers motor’s async CRUD operations, querying with filters and projections, aggregation pipelines, indexes, bulk writes, and change streams. You will finish by building an async product catalogue API with FastAPI and motor that handles concurrent reads without blocking.

Async Insert and Query: Quick Example

Here is the minimal motor workflow — connect, insert a document, and query it back, all with async/await:

# quick_motor.py
import asyncio
import motor.motor_asyncio

async def main():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    db = client['mydb']
    collection = db['users']

    # Insert one document
    result = await collection.insert_one({'name': 'Alice', 'age': 30, 'role': 'admin'})
    print(f"Inserted ID: {result.inserted_id}")

    # Find it back
    user = await collection.find_one({'name': 'Alice'})
    print(f"Found: {user['name']}, age {user['age']}, role {user['role']}")

    client.close()

asyncio.run(main())

Output:

Inserted ID: 664f3a1b2c8d4e5f6a7b8c9d
Found: Alice, age 30, role admin

Motor’s API mirrors PyMongo almost exactly — the key difference is that every I/O operation returns a coroutine you must await. The client and collection objects are synchronous Python objects; only the database operations require await. This design means migrating from PyMongo to motor is usually straightforward: add async def and await to the functions that call the database.

Motor vs PyMongo vs Other Drivers

Choosing the right MongoDB driver depends on whether your application is async or sync:

DriverI/O ModelBest ForInstall
motorasyncio (non-blocking)FastAPI, aiohttp, async servicespip install motor
pymongoSynchronous (blocking)Django, Flask, scripts, CLIspip install pymongo
beanieasyncio + ODM (models)When you want Pydantic-based modelspip install beanie
mongoengineSynchronous + ODMDjango-like ORM on MongoDBpip install mongoengine

Use motor whenever your web framework is async. Using PyMongo in a FastAPI handler does not raise an error immediately, but it blocks the event loop during every database call, effectively serializing all requests through a single-threaded bottleneck. Motor keeps each query non-blocking, letting your async framework handle hundreds of concurrent connections efficiently.

CRUD Operations

Motor’s CRUD methods are direct async equivalents of PyMongo’s methods. Here is a complete reference implementation covering insert, find, update, and delete:

# motor_crud.py
import asyncio
import motor.motor_asyncio

async def crud_demo():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    col = client['demo']['products']

    # --- CREATE ---
    # insert_one: single document
    r = await col.insert_one({'name': 'Widget', 'price': 9.99, 'stock': 100})
    widget_id = r.inserted_id

    # insert_many: multiple documents at once
    await col.insert_many([
        {'name': 'Gadget', 'price': 24.99, 'stock': 50},
        {'name': 'Doohickey', 'price': 4.49, 'stock': 200},
    ])
    print("Inserted 3 products")

    # --- READ ---
    # find_one: returns first match or None
    product = await col.find_one({'name': 'Widget'})
    print(f"Widget price: ${product['price']}")

    # find: returns an async cursor
    async for p in col.find({'price': {'$lt': 10}}):
        print(f"  Under $10: {p['name']} @ ${p['price']}")

    # find with projection (only return name and price)
    async for p in col.find({}, {'_id': 0, 'name': 1, 'price': 1}):
        print(f"  {p['name']}: ${p['price']}")

    # --- UPDATE ---
    # update_one: modify first matching document
    result = await col.update_one({'name': 'Widget'}, {'$set': {'price': 7.99}, '$inc': {'stock': -1}})
    print(f"Modified: {result.modified_count} document(s)")

    # update_many: modify all matching documents
    result = await col.update_many({'stock': {'$gt': 100}}, {'$set': {'on_sale': True}})
    print(f"Sale items updated: {result.modified_count}")

    # --- DELETE ---
    result = await col.delete_one({'name': 'Doohickey'})
    print(f"Deleted: {result.deleted_count} document(s)")

    # cleanup
    await col.drop()
    client.close()

asyncio.run(crud_demo())

Output:

Inserted 3 products
Widget price: $9.99
  Under $10: Widget @ $9.99
  Widget: $9.99
  Gadget: $24.99
  Doohickey: $4.49
Modified: 1 document(s)
Sale items updated: 1
Deleted: 1 document(s)
Sudo Sam at MongoDB async filing cabinet using motor
Sudo Sam opens the async MongoDB cabinet — motor does the rest.

Aggregation Pipelines

MongoDB’s aggregation pipeline is motor’s killer feature for analytics — you can group, filter, sort, and reshape data entirely in the database, returning only the results your application needs:

# motor_aggregation.py
import asyncio
import motor.motor_asyncio

async def aggregation_demo():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    col = client['store']['orders']

    # Seed data
    await col.drop()
    await col.insert_many([
        {'product': 'Widget', 'qty': 3, 'price': 9.99, 'region': 'North'},
        {'product': 'Gadget', 'qty': 1, 'price': 24.99, 'region': 'South'},
        {'product': 'Widget', 'qty': 2, 'price': 9.99, 'region': 'South'},
        {'product': 'Gadget', 'qty': 4, 'price': 24.99, 'region': 'North'},
        {'product': 'Widget', 'qty': 5, 'price': 9.99, 'region': 'North'},
    ])

    # Pipeline: total revenue and units sold per product
    pipeline = [
        {'$group': {
            '_id': '$product',
            'total_revenue': {'$sum': {'$multiply': ['$qty', '$price']}},
            'units_sold': {'$sum': '$qty'},
            'order_count': {'$sum': 1},
        }},
        {'$sort': {'total_revenue': -1}},
        {'$project': {
            'product': '$_id',
            'total_revenue': {'$round': ['$total_revenue', 2]},
            'units_sold': 1,
            'order_count': 1,
            '_id': 0,
        }},
    ]

    print("Sales summary:")
    async for doc in col.aggregate(pipeline):
        print(f"  {doc['product']}: ${doc['total_revenue']} ({doc['units_sold']} units, {doc['order_count']} orders)")

    await col.drop()
    client.close()

asyncio.run(aggregation_demo())

Output:

Sales summary:
  Gadget: $124.95 (5 units, 2 orders)
  Widget: $99.90 (10 units, 3 orders)

The aggregation cursor is async, so you iterate it with async for exactly like the regular find cursor. Pipelines run entirely inside MongoDB — no Python-side grouping or sorting needed. For large datasets this is dramatically faster than fetching all documents and processing them in Python.

Real-Life Example: Async Product Catalogue API

Let us build a FastAPI product catalogue with motor as the database backend. This shows how motor integrates with a real async web framework, including startup/shutdown lifecycle and concurrent query handling:

# catalogue_api.py
# Install: pip install fastapi uvicorn motor
# Run: uvicorn catalogue_api:app --reload
# Test: http://localhost:8000/docs

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import motor.motor_asyncio
from bson import ObjectId

MONGO_URL = "mongodb://localhost:27017"

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create client and ensure index
    app.state.client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URL)
    app.state.col = app.state.client["catalogue"]["products"]
    await app.state.col.create_index("sku", unique=True)
    yield
    # Shutdown: close connection
    app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.get("/products")
async def list_products(category: str = None, max_price: float = None):
    query = {}
    if category:
        query["category"] = category
    if max_price is not None:
        query["price"] = {"$lte": max_price}
    products = []
    async for p in app.state.col.find(query, {"_id": 0}):
        products.append(p)
    return {"count": len(products), "products": products}

@app.post("/products", status_code=201)
async def create_product(product: dict):
    if not all(k in product for k in ["sku", "name", "price"]):
        raise HTTPException(400, "sku, name, and price are required")
    result = await app.state.col.insert_one(product)
    return {"id": str(result.inserted_id), "sku": product["sku"]}

@app.patch("/products/{sku}")
async def update_product(sku: str, updates: dict):
    result = await app.state.col.update_one({"sku": sku}, {"$set": updates})
    if result.matched_count == 0:
        raise HTTPException(404, f"Product '{sku}' not found")
    return {"updated": result.modified_count}

@app.delete("/products/{sku}")
async def delete_product(sku: str):
    result = await app.state.col.delete_one({"sku": sku})
    if result.deleted_count == 0:
        raise HTTPException(404, f"Product '{sku}' not found")
    return {"deleted": True}

The lifespan context manager is the modern FastAPI pattern for managing the motor client — it creates the connection on startup and closes it cleanly on shutdown. The unique index on sku prevents duplicate products at the database level. Because motor is async, all four endpoints can handle concurrent requests without one query blocking another.

API Alice juggling multiple async MongoDB queries simultaneously
Async means handling all queries at once — no waiting.

Frequently Asked Questions

How do I handle MongoDB’s ObjectId in API responses?

MongoDB’s _id field is a bson.ObjectId object, not a plain string. JSON serializers do not know how to handle it and will raise a TypeError. The cleanest fix is to either exclude _id from your queries using a projection ({"_id": 0}), or convert it when building your response: doc["id"] = str(doc.pop("_id")). If you use Pydantic models with beanie or a custom model, configure the JSON encoder to handle ObjectId automatically.

When should I create indexes?

Create indexes for any field you filter or sort on frequently. Without an index, MongoDB scans every document in the collection for each query. Use await collection.create_index("field_name") for single-field indexes, or create_index([("a", 1), ("b", -1)]) for compound indexes. Call index creation during application startup (in the lifespan handler), not on every request. Check your index usage with collection.find(query).explain("executionStats").

How does motor handle connection pooling?

Motor uses PyMongo’s connection pool internally. The default pool size is 100 connections. One AsyncIOMotorClient instance is meant to be shared across your entire application — do not create a new client per request. The client is thread-safe and coroutine-safe. Set the pool size with AsyncIOMotorClient(url, maxPoolSize=50) to reduce memory usage on low-traffic services.

Does motor support multi-document transactions?

Yes, starting with MongoDB 4.0 (replica sets) and 4.2 (sharded clusters). Use async with await client.start_session() as session: async with session.start_transaction(): ... to wrap multiple operations in a transaction. All motor CRUD methods accept an optional session parameter. Transactions have a performance cost — only use them when you genuinely need atomicity across multiple collections or documents.

How do I store files in MongoDB with motor?

Use motor.motor_asyncio.AsyncIOMotorGridFSBucket for files larger than 16MB (MongoDB’s document size limit). For smaller files (images, PDFs under a few MB), storing them as base64-encoded strings in a document field is simpler. GridFS splits large files into 255KB chunks and stores them across two collections, making upload and download resumable. In most modern setups, object storage (S3, GCS) is a better fit for files than GridFS.

Conclusion

Motor is the right MongoDB driver whenever your Python application uses asyncio. In this tutorial you performed async CRUD with insert_one, find, update_one, and delete_one, ran aggregation pipelines for analytics, created indexes for performance, and built a FastAPI product catalogue API that handles concurrent requests without blocking.

The catalogue API is ready to extend — add authentication middleware, implement pagination with skip() and limit(), add full-text search with a MongoDB text index, or add a change stream listener that notifies clients over WebSocket when products are updated. All of these patterns use the same async CRUD primitives you learned here.

Official documentation: motor — Async Python driver for MongoDB.