Intermediate
You are building an async Python service with FastAPI or aiohttp and you need MongoDB as your database. The standard PyMongo driver is synchronous — every database call blocks the event loop, turning your async application into an accidentally single-threaded one. Motor is the official async MongoDB driver for Python: it wraps PyMongo with a non-blocking interface designed for asyncio, so your database queries and your API handlers can run concurrently without blocking each other.
Install motor with pip install motor. You will also need a running MongoDB instance — docker run -d -p 27017:27017 mongo:7 starts one locally with no authentication. For cloud MongoDB, use a free Atlas cluster at mongodb.net; the connection string works identically in motor. PyMongo is installed automatically as motor’s dependency.
This tutorial covers motor’s async CRUD operations, querying with filters and projections, aggregation pipelines, indexes, bulk writes, and change streams. You will finish by building an async product catalogue API with FastAPI and motor that handles concurrent reads without blocking.
Async Insert and Query: Quick Example
Here is the minimal motor workflow — connect, insert a document, and query it back, all with async/await:
# quick_motor.py
import asyncio
import motor.motor_asyncio
async def main():
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client['mydb']
collection = db['users']
# Insert one document
result = await collection.insert_one({'name': 'Alice', 'age': 30, 'role': 'admin'})
print(f"Inserted ID: {result.inserted_id}")
# Find it back
user = await collection.find_one({'name': 'Alice'})
print(f"Found: {user['name']}, age {user['age']}, role {user['role']}")
client.close()
asyncio.run(main())
Output:
Inserted ID: 664f3a1b2c8d4e5f6a7b8c9d
Found: Alice, age 30, role admin
Motor’s API mirrors PyMongo almost exactly — the key difference is that every I/O operation returns a coroutine you must await. The client and collection objects are synchronous Python objects; only the database operations require await. This design means migrating from PyMongo to motor is usually straightforward: add async def and await to the functions that call the database.
Motor vs PyMongo vs Other Drivers
Choosing the right MongoDB driver depends on whether your application is async or sync:
| Driver | I/O Model | Best For | Install |
|---|---|---|---|
| motor | asyncio (non-blocking) | FastAPI, aiohttp, async services | pip install motor |
| pymongo | Synchronous (blocking) | Django, Flask, scripts, CLIs | pip install pymongo |
| beanie | asyncio + ODM (models) | When you want Pydantic-based models | pip install beanie |
| mongoengine | Synchronous + ODM | Django-like ORM on MongoDB | pip install mongoengine |
Use motor whenever your web framework is async. Using PyMongo in a FastAPI handler does not raise an error immediately, but it blocks the event loop during every database call, effectively serializing all requests through a single-threaded bottleneck. Motor keeps each query non-blocking, letting your async framework handle hundreds of concurrent connections efficiently.
CRUD Operations
Motor’s CRUD methods are direct async equivalents of PyMongo’s methods. Here is a complete reference implementation covering insert, find, update, and delete:
# motor_crud.py
import asyncio
import motor.motor_asyncio
async def crud_demo():
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
col = client['demo']['products']
# --- CREATE ---
# insert_one: single document
r = await col.insert_one({'name': 'Widget', 'price': 9.99, 'stock': 100})
widget_id = r.inserted_id
# insert_many: multiple documents at once
await col.insert_many([
{'name': 'Gadget', 'price': 24.99, 'stock': 50},
{'name': 'Doohickey', 'price': 4.49, 'stock': 200},
])
print("Inserted 3 products")
# --- READ ---
# find_one: returns first match or None
product = await col.find_one({'name': 'Widget'})
print(f"Widget price: ${product['price']}")
# find: returns an async cursor
async for p in col.find({'price': {'$lt': 10}}):
print(f" Under $10: {p['name']} @ ${p['price']}")
# find with projection (only return name and price)
async for p in col.find({}, {'_id': 0, 'name': 1, 'price': 1}):
print(f" {p['name']}: ${p['price']}")
# --- UPDATE ---
# update_one: modify first matching document
result = await col.update_one({'name': 'Widget'}, {'$set': {'price': 7.99}, '$inc': {'stock': -1}})
print(f"Modified: {result.modified_count} document(s)")
# update_many: modify all matching documents
result = await col.update_many({'stock': {'$gt': 100}}, {'$set': {'on_sale': True}})
print(f"Sale items updated: {result.modified_count}")
# --- DELETE ---
result = await col.delete_one({'name': 'Doohickey'})
print(f"Deleted: {result.deleted_count} document(s)")
# cleanup
await col.drop()
client.close()
asyncio.run(crud_demo())
Output:
Inserted 3 products
Widget price: $9.99
Under $10: Widget @ $9.99
Widget: $9.99
Gadget: $24.99
Doohickey: $4.49
Modified: 1 document(s)
Sale items updated: 1
Deleted: 1 document(s)

Aggregation Pipelines
MongoDB’s aggregation pipeline is motor’s killer feature for analytics — you can group, filter, sort, and reshape data entirely in the database, returning only the results your application needs:
# motor_aggregation.py
import asyncio
import motor.motor_asyncio
async def aggregation_demo():
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
col = client['store']['orders']
# Seed data
await col.drop()
await col.insert_many([
{'product': 'Widget', 'qty': 3, 'price': 9.99, 'region': 'North'},
{'product': 'Gadget', 'qty': 1, 'price': 24.99, 'region': 'South'},
{'product': 'Widget', 'qty': 2, 'price': 9.99, 'region': 'South'},
{'product': 'Gadget', 'qty': 4, 'price': 24.99, 'region': 'North'},
{'product': 'Widget', 'qty': 5, 'price': 9.99, 'region': 'North'},
])
# Pipeline: total revenue and units sold per product
pipeline = [
{'$group': {
'_id': '$product',
'total_revenue': {'$sum': {'$multiply': ['$qty', '$price']}},
'units_sold': {'$sum': '$qty'},
'order_count': {'$sum': 1},
}},
{'$sort': {'total_revenue': -1}},
{'$project': {
'product': '$_id',
'total_revenue': {'$round': ['$total_revenue', 2]},
'units_sold': 1,
'order_count': 1,
'_id': 0,
}},
]
print("Sales summary:")
async for doc in col.aggregate(pipeline):
print(f" {doc['product']}: ${doc['total_revenue']} ({doc['units_sold']} units, {doc['order_count']} orders)")
await col.drop()
client.close()
asyncio.run(aggregation_demo())
Output:
Sales summary:
Gadget: $124.95 (5 units, 2 orders)
Widget: $99.90 (10 units, 3 orders)
The aggregation cursor is async, so you iterate it with async for exactly like the regular find cursor. Pipelines run entirely inside MongoDB — no Python-side grouping or sorting needed. For large datasets this is dramatically faster than fetching all documents and processing them in Python.
Real-Life Example: Async Product Catalogue API
Let us build a FastAPI product catalogue with motor as the database backend. This shows how motor integrates with a real async web framework, including startup/shutdown lifecycle and concurrent query handling:
# catalogue_api.py
# Install: pip install fastapi uvicorn motor
# Run: uvicorn catalogue_api:app --reload
# Test: http://localhost:8000/docs
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import motor.motor_asyncio
from bson import ObjectId
MONGO_URL = "mongodb://localhost:27017"
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create client and ensure index
app.state.client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URL)
app.state.col = app.state.client["catalogue"]["products"]
await app.state.col.create_index("sku", unique=True)
yield
# Shutdown: close connection
app.state.client.close()
app = FastAPI(lifespan=lifespan)
@app.get("/products")
async def list_products(category: str = None, max_price: float = None):
query = {}
if category:
query["category"] = category
if max_price is not None:
query["price"] = {"$lte": max_price}
products = []
async for p in app.state.col.find(query, {"_id": 0}):
products.append(p)
return {"count": len(products), "products": products}
@app.post("/products", status_code=201)
async def create_product(product: dict):
if not all(k in product for k in ["sku", "name", "price"]):
raise HTTPException(400, "sku, name, and price are required")
result = await app.state.col.insert_one(product)
return {"id": str(result.inserted_id), "sku": product["sku"]}
@app.patch("/products/{sku}")
async def update_product(sku: str, updates: dict):
result = await app.state.col.update_one({"sku": sku}, {"$set": updates})
if result.matched_count == 0:
raise HTTPException(404, f"Product '{sku}' not found")
return {"updated": result.modified_count}
@app.delete("/products/{sku}")
async def delete_product(sku: str):
result = await app.state.col.delete_one({"sku": sku})
if result.deleted_count == 0:
raise HTTPException(404, f"Product '{sku}' not found")
return {"deleted": True}
The lifespan context manager is the modern FastAPI pattern for managing the motor client — it creates the connection on startup and closes it cleanly on shutdown. The unique index on sku prevents duplicate products at the database level. Because motor is async, all four endpoints can handle concurrent requests without one query blocking another.

Frequently Asked Questions
How do I handle MongoDB’s ObjectId in API responses?
MongoDB’s _id field is a bson.ObjectId object, not a plain string. JSON serializers do not know how to handle it and will raise a TypeError. The cleanest fix is to either exclude _id from your queries using a projection ({"_id": 0}), or convert it when building your response: doc["id"] = str(doc.pop("_id")). If you use Pydantic models with beanie or a custom model, configure the JSON encoder to handle ObjectId automatically.
When should I create indexes?
Create indexes for any field you filter or sort on frequently. Without an index, MongoDB scans every document in the collection for each query. Use await collection.create_index("field_name") for single-field indexes, or create_index([("a", 1), ("b", -1)]) for compound indexes. Call index creation during application startup (in the lifespan handler), not on every request. Check your index usage with collection.find(query).explain("executionStats").
How does motor handle connection pooling?
Motor uses PyMongo’s connection pool internally. The default pool size is 100 connections. One AsyncIOMotorClient instance is meant to be shared across your entire application — do not create a new client per request. The client is thread-safe and coroutine-safe. Set the pool size with AsyncIOMotorClient(url, maxPoolSize=50) to reduce memory usage on low-traffic services.
Does motor support multi-document transactions?
Yes, starting with MongoDB 4.0 (replica sets) and 4.2 (sharded clusters). Use async with await client.start_session() as session: async with session.start_transaction(): ... to wrap multiple operations in a transaction. All motor CRUD methods accept an optional session parameter. Transactions have a performance cost — only use them when you genuinely need atomicity across multiple collections or documents.
How do I store files in MongoDB with motor?
Use motor.motor_asyncio.AsyncIOMotorGridFSBucket for files larger than 16MB (MongoDB’s document size limit). For smaller files (images, PDFs under a few MB), storing them as base64-encoded strings in a document field is simpler. GridFS splits large files into 255KB chunks and stores them across two collections, making upload and download resumable. In most modern setups, object storage (S3, GCS) is a better fit for files than GridFS.
Conclusion
Motor is the right MongoDB driver whenever your Python application uses asyncio. In this tutorial you performed async CRUD with insert_one, find, update_one, and delete_one, ran aggregation pipelines for analytics, created indexes for performance, and built a FastAPI product catalogue API that handles concurrent requests without blocking.
The catalogue API is ready to extend — add authentication middleware, implement pagination with skip() and limit(), add full-text search with a MongoDB text index, or add a change stream listener that notifies clients over WebSocket when products are updated. All of these patterns use the same async CRUD primitives you learned here.
Official documentation: motor — Async Python driver for MongoDB.