How To Work with MongoDB in Python Using PyMongo

Quick Answer: MongoDB is a document-based NoSQL database that stores JSON-like data. Install PyMongo with pip install pymongo, then connect with client = MongoClient('mongodb://localhost:27017/'). Create databases and collections, perform CRUD operations with insert_one(), find(), update_one(), and delete_one(). Use aggregation pipelines for complex queries and GridFS for storing large files.
Setting up MongoDB connection
PyMongo speaks Python to your MongoDB. Fluently.

Understanding MongoDB and Document-Based Storage

MongoDB is a NoSQL database that stores data as flexible JSON-like documents instead of rigid table rows. This document-oriented approach allows you to store nested data structures without complex joins, making it ideal for applications with evolving schemas.

Key advantages of MongoDB:

  • Schema flexibility: Documents can have different structures
  • Nested data: Store complex hierarchical data naturally
  • Rich queries: Query and filter on any field
  • Horizontal scaling: Built-in sharding for distributing data
  • Indexing: Powerful indexing for fast queries
  • Aggregation: Complex data transformations in the database

Installing MongoDB and PyMongo

First, install MongoDB server. On macOS with Homebrew:

brew install mongodb-community
brew services start mongodb-community

On Ubuntu/Debian:

sudo apt-get install -y mongodb
sudo systemctl start mongod

Install the PyMongo Python driver:

pip install pymongo

Verify MongoDB is running:

mongosh --eval "db.adminCommand('ping')"
# Output: { ok: 1 }
CRUD operations with PyMongo
insert_one, find, update, delete. The four verbs of database life.

Connecting to MongoDB

Create a basic connection to MongoDB:

from pymongo import MongoClient

# Connect to local MongoDB
client = MongoClient('mongodb://localhost:27017/')

# Get database
db = client['blog_database']

# Get collection
posts = db['posts']

# Test connection
print(client.server_info())

For production with authentication and connection pooling:

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

# Connect with credentials
client = MongoClient(
    'mongodb://username:password@mongodb.example.com:27017/',
    maxPoolSize=50,
    minPoolSize=10,
    serverSelectionTimeoutMS=5000,
    connectTimeoutMS=5000
)

# Verify connection
try:
    client.admin.command('ping')
    print("Connected to MongoDB successfully")
except ConnectionFailure:
    print("Failed to connect to MongoDB")

Alternative connection methods:

from pymongo import MongoClient

# Connection string
uri = 'mongodb://user:pass@host1:27017,host2:27017,host3:27017/database?replicaSet=rs0'
client = MongoClient(uri)

# Access database and collection
db = client.get_database('mydb')
collection = db.get_collection('mycollection')

Creating and Inserting Documents

Insert documents into MongoDB collections:

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Insert single document
post = {
    'title': 'Getting Started with MongoDB',
    'author': 'John Doe',
    'content': 'MongoDB is a flexible NoSQL database...',
    'tags': ['mongodb', 'nosql', 'database'],
    'created_at': datetime.datetime.utcnow(),
    'views': 0,
    'published': True
}

result = posts.insert_one(post)
print(f"Inserted document ID: {result.inserted_id}")

# Insert multiple documents
documents = [
    {
        'title': 'Python Best Practices',
        'author': 'Jane Smith',
        'tags': ['python', 'best-practices'],
        'views': 150
    },
    {
        'title': 'Web Development with Flask',
        'author': 'Bob Johnson',
        'tags': ['python', 'flask', 'web'],
        'views': 200
    }
]

result = posts.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")

# Insert with custom ID
post_custom = {
    '_id': 'post_001',
    'title': 'Custom ID Example',
    'author': 'Alice'
}
posts.insert_one(post_custom)
Optimizing MongoDB queries
Indexes turn your slow queries into fast ones. Use them.

Reading Documents with Find Operations

Query documents from MongoDB:

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Find single document
post = posts.find_one({'author': 'John Doe'})
print(post)

# Find by ID
post_id = ObjectId('507f1f77bcf86cd799439011')
post = posts.find_one({'_id': post_id})

# Find all documents
all_posts = posts.find()
for post in all_posts:
    print(post['title'])

# Find with filters
python_posts = posts.find({'tags': {'contains': 'python'}})
python_posts = posts.find({'tags': 'python'})  # Simpler syntax

# Find with comparison operators
popular_posts = posts.find({'views': {'gt': 100}})
recent_posts = posts.find({'created_at': {'gte': datetime.datetime(2024, 1, 1)}})

# Multiple conditions
filtered = posts.find({
    'author': 'John Doe',
    'published': True
})

# Using OR operator
from pymongo import ASCENDING
query = {
    'author': {'in': ['John Doe', 'Jane Smith']}
}
posts_by_authors = posts.find(query)

# Find with projection (select specific fields)
titles_only = posts.find(
    {'published': True},
    {'title': 1, 'author': 1, '_id': 0}  # Include title and author, exclude ID
)

# Find with sorting
sorted_posts = posts.find().sort('views', -1).limit(5)  # Top 5 by views
recent = posts.find().sort('created_at', -1).limit(10)  # Latest 10

# Find with skip and limit (pagination)
page_size = 10
page_number = 2
skip = (page_number - 1) * page_size
posts_page = posts.find().skip(skip).limit(page_size)

Updating Documents

Modify existing documents in MongoDB:

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Update single document
result = posts.update_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    {'set': {'views': 500}}
)
print(f"Matched: {result.matched_count}, Modified: {result.modified_count}")

# Update with multiple fields
posts.update_one(
    {'author': 'John Doe'},
    {
        'set': {
            'title': 'Updated Title',
            'views': 999,
            'updated_at': datetime.datetime.utcnow()
        }
    }
)

# Increment views
posts.update_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    {'inc': {'views': 1}}
)

# Push item to array
posts.update_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    {'push': {'tags': 'new-tag'}}
)

# Update multiple documents
result = posts.update_many(
    {'author': 'John Doe'},
    {'set': {'verified': True}}
)
print(f"Modified {result.modified_count} documents")

# Replace entire document
new_post = {
    'title': 'Completely New Post',
    'author': 'Anonymous',
    'content': 'New content...'
}
posts.replace_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    new_post
)

# Upsert: update or insert if not found
posts.update_one(
    {'title': 'MongoDB Guide'},
    {'set': {'author': 'Expert', 'views': 1000}},
    upsert=True  # Insert if not found
)
Data transformations in MongoDB
Aggregation pipelines transform data without pulling it into Python.

Deleting Documents

Remove documents from MongoDB:

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Delete single document
result = posts.delete_one({'author': 'Anonymous'})
print(f"Deleted {result.deleted_count} document")

# Delete multiple documents
result = posts.delete_many({'views': {'lt': 10}})
print(f"Deleted {result.deleted_count} low-view posts")

# Delete all documents
posts.delete_many({})

# Delete by ID
posts.delete_one({'_id': ObjectId('507f1f77bcf86cd799439011')})

Indexing for Performance

Create indexes to speed up queries:

from pymongo import MongoClient, ASCENDING, DESCENDING

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Create single field index
posts.create_index('author')
posts.create_index([('views', DESCENDING)])

# Create compound index
posts.create_index([
    ('author', ASCENDING),
    ('created_at', DESCENDING)
])

# Create unique index
posts.create_index('slug', unique=True)

# Create text search index
posts.create_index([('title', 'text'), ('content', 'text')])

# Text search using index
results = posts.find({'text': {'search': 'mongodb'}})

# List all indexes
indexes = posts.list_indexes()
for index in indexes:
    print(index['key'])

# Drop index
posts.drop_index('author_1')
posts.drop_index([('author', 1), ('created_at', -1)])

# Get index statistics
stats = db.command('collStats', 'posts')
print(f"Index size: {stats['totalIndexSize']}")
Debugging slow MongoDB queries
When your query takes seconds, the explain() plan takes you to the answer.

Aggregation Pipeline for Complex Queries

Perform complex data transformations using aggregation:

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Basic aggregation: Group by author and count posts
pipeline = [
    {'group': {'_id': 'author', 'count': {'sum': 1}}}
]
result = posts.aggregate(pipeline)
for doc in result:
    print(f"{doc['_id']}: {doc['count']} posts")

# Match and group
pipeline = [
    {'match': {'published': True}},
    {'group': {'_id': 'author', 'total_views': {'sum': 'views'}}}
]

# Stage 1: Filter published posts
# Stage 2: Group by author
# Stage 3: Sort by views descending
# Stage 4: Limit to top 5
pipeline = [
    {'match': {'published': True}},
    {'group': {
        '_id': 'author',
        'total_views': {'sum': 'views'},
        'post_count': {'sum': 1}
    }},
    {'sort': {'total_views': -1}},
    {'limit': 5}
]
top_authors = posts.aggregate(pipeline)

# Project selected fields
pipeline = [
    {'match': {'views': {'gte': 100}}},
    {'project': {
        'title': 1,
        'author': 1,
        'views': 1,
        '_id': 0
    }}
]

# Unwind array field
pipeline = [
    {'unwind': 'tags'},
    {'group': {'_id': 'tags', 'count': {'sum': 1}}},
    {'sort': {'count': -1}}
]
tag_stats = posts.aggregate(pipeline)

# Lookup (join with another collection)
users_collection = db['users']
pipeline = [
    {'lookup': {
        'from': 'users',
        'localField': 'author',
        'foreignField': 'name',
        'as': 'author_info'
    }},
    {'unwind': 'author_info'},
    {'project': {
        'title': 1,
        'author': 1,
        'author_email': 'author_info.email'
    }}
]

# Faceted aggregation (multiple aggregations in one)
pipeline = [
    {'facet': {
        'by_author': [
            {'group': {'_id': 'author', 'count': {'sum': 1}}}
        ],
        'by_tag': [
            {'unwind': 'tags'},
            {'group': {'_id': 'tags', 'count': {'sum': 1}}}
        ],
        'stats': [
            {'group': {
                '_id': None,
                'total_posts': {'sum': 1},
                'avg_views': {'avg': 'views'}
            }}
        ]
    }}
]

GridFS for Large File Storage

Store files larger than 16MB in MongoDB using GridFS:

from pymongo import MongoClient
from gridfs import GridFS

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
fs = GridFS(db)

# Store file
with open('document.pdf', 'rb') as f:
    file_id = fs.put(f, filename='document.pdf', content_type='application/pdf')

print(f"File stored with ID: {file_id}")

# Retrieve file
with open('downloaded_document.pdf', 'wb') as f:
    f.write(fs.get(file_id).read())

# List all files
for grid_out in fs.find({'filename': 'document.pdf'}):
    print(f"File: {grid_out.filename}, Size: {grid_out.length}")

# Delete file
fs.delete(file_id)

# Store with metadata
with open('image.jpg', 'rb') as f:
    file_id = fs.put(
        f,
        filename='profile.jpg',
        content_type='image/jpeg',
        user_id='user_123',
        uploaded_by='John Doe'
    )

# Retrieve with metadata
grid_out = fs.get(file_id)
print(f"Uploaded by: {grid_out.uploaded_by}")
print(f"User ID: {grid_out.user_id}")

Troubleshooting Common MongoDB Issues

Issue Cause Solution
Connection refused MongoDB server not running Start MongoDB: brew services start mongodb-community or systemctl start mongod
Slow queries Missing indexes on frequently queried fields Create indexes: collection.create_index('field_name'). Check query plans with explain()
Duplicate key error Unique index constraint violation Ensure unique values or remove unique index constraint
Out of memory errors Aggregation pipeline processing too much data Add $match stage early, limit results with $limit, use allowDiskUse=True
Document too large Document exceeds 16MB size limit Use GridFS for large documents or split data across documents
Authentication failed Wrong credentials or database Verify username, password, and database name in connection string

Real-Life Example: Blog Content Management System

Here’s a complete blog CMS using MongoDB and PyMongo:

from pymongo import MongoClient, ASCENDING, DESCENDING
from bson.objectid import ObjectId
from datetime import datetime, timedelta
import json

class BlogCMS:
    def __init__(self):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client['blog_cms']
        self.posts = self.db['posts']
        self.comments = self.db['comments']
        self.users = self.db['users']
        self._create_indexes()

    def _create_indexes(self):
        """Create necessary indexes"""
        self.posts.create_index('slug', unique=True)
        self.posts.create_index([('author', ASCENDING), ('created_at', DESCENDING)])
        self.posts.create_index([('title', 'text'), ('content', 'text')])
        self.comments.create_index('post_id')

    def create_post(self, title, content, author, tags, excerpt=''):
        """Create new blog post"""
        slug = title.lower().replace(' ', '-')
        post = {
            'title': title,
            'content': content,
            'excerpt': excerpt,
            'author': author,
            'tags': tags,
            'slug': slug,
            'created_at': datetime.utcnow(),
            'updated_at': datetime.utcnow(),
            'published': False,
            'views': 0,
            'comments_count': 0
        }
        result = self.posts.insert_one(post)
        return result.inserted_id

    def publish_post(self, post_id):
        """Publish a draft post"""
        self.posts.update_one(
            {'_id': ObjectId(post_id)},
            {'set': {
                'published': True,
                'published_at': datetime.utcnow()
            }}
        )

    def get_published_posts(self, page=1, per_page=10):
        """Get published posts with pagination"""
        skip = (page - 1) * per_page
        posts = self.posts.find(
            {'published': True},
            sort=[('created_at', -1)]
        ).skip(skip).limit(per_page)
        return list(posts)

    def search_posts(self, query):
        """Full-text search in posts"""
        results = self.posts.find(
            {'text': {'search': query}},
            {'score': {'meta': 'textScore'}}
        ).sort([('score', {'meta': 'textScore'})])
        return list(results)

    def get_post_by_slug(self, slug):
        """Get post by slug and increment views"""
        self.posts.update_one(
            {'slug': slug},
            {'inc': {'views': 1}}
        )
        return self.posts.find_one({'slug': slug})

    def add_comment(self, post_id, author, content):
        """Add comment to post"""
        comment = {
            'post_id': ObjectId(post_id),
            'author': author,
            'content': content,
            'created_at': datetime.utcnow(),
            'approved': False
        }
        result = self.comments.insert_one(comment)

        # Update comment count
        self.posts.update_one(
            {'_id': ObjectId(post_id)},
            {'inc': {'comments_count': 1}}
        )
        return result.inserted_id

    def get_post_comments(self, post_id, approved_only=True):
        """Get comments for post"""
        query = {'post_id': ObjectId(post_id)}
        if approved_only:
            query['approved'] = True

        return list(self.comments.find(query).sort('created_at', -1))

    def get_trending_posts(self, days=7):
        """Get trending posts from last N days"""
        since = datetime.utcnow() - timedelta(days=days)
        return list(self.posts.find(
            {'created_at': {'gte': since}, 'published': True}
        ).sort('views', -1).limit(10))

    def get_author_stats(self, author):
        """Get statistics for an author"""
        pipeline = [
            {'match': {'author': author, 'published': True}},
            {'group': {
                '_id': author,
                'total_posts': {'sum': 1},
                'total_views': {'sum': 'views'},
                'avg_views': {'avg': 'views'}
            }}
        ]
        return list(self.posts.aggregate(pipeline))

    def delete_post(self, post_id):
        """Delete post and its comments"""
        # Delete comments
        self.comments.delete_many({'post_id': ObjectId(post_id)})
        # Delete post
        self.posts.delete_one({'_id': ObjectId(post_id)})

# Usage
cms = BlogCMS()

# Create post
post_id = cms.create_post(
    title='MongoDB Best Practices',
    content='MongoDB is a flexible...',
    excerpt='Learn MongoDB best practices',
    author='John Doe',
    tags=['mongodb', 'database', 'tutorial']
)

# Publish post
cms.publish_post(post_id)

# Get published posts
posts = cms.get_published_posts(page=1, per_page=10)

# Search
results = cms.search_posts('mongodb python')

# Get post by slug
post = cms.get_post_by_slug('mongodb-best-practices')

# Add comment
cms.add_comment(post_id, 'Jane Smith', 'Great article!')

# Get comments
comments = cms.get_post_comments(post_id)

# Get author stats
stats = cms.get_author_stats('John Doe')
print(stats)

This CMS demonstrates:

  • CRUD operations on multiple collections
  • Unique constraints with indexes
  • Full-text search capability
  • Aggregation for statistics
  • Pagination for large result sets
  • Relationship management between collections
  • Automatic counter updates

MongoDB Best Practices

Follow these guidelines for optimal MongoDB usage:

  • Design documents carefully: Plan your data structure before implementation
  • Use appropriate indexes: Index frequently queried fields
  • Avoid excessive nesting: Keep document depth reasonable
  • Use ObjectId for relationships: Reference documents with IDs
  • Implement validation: Use schema validation in MongoDB 3.6+
  • Monitor query performance: Use explain() to analyze queries
  • Configure backup: Enable oplog and regular snapshots
  • Use connection pooling: Reuse connections across requests

FAQ

Q: Should I use MongoDB or a relational database?

A: Use MongoDB for flexible schemas and hierarchical data. Use relational databases for structured data with complex relationships. Many applications use both.

Q: Does MongoDB support transactions?

A: Yes, MongoDB 4.0+ supports ACID transactions. Single document transactions are atomic by default. Multi-document transactions available in replica sets and sharded clusters.

Q: How do I backup MongoDB?

A: Use mongodump to export data and mongorestore to restore. Enable oplog for continuous backups, or use MongoDB Atlas automated backups.

Q: Can MongoDB handle joins like SQL databases?

A: MongoDB uses the $lookup aggregation stage for joins, or you can denormalize data by embedding related documents.

Q: What is the 16MB document size limit?

A: MongoDB documents cannot exceed 16MB. Use GridFS for larger data or split into multiple documents with references.

Aggregation Pipeline

MongoDB’s aggregation framework is its answer to SQL GROUP BY + JOIN + analytics. You build a pipeline of stages — each transforms the document stream. The pymongo API maps directly to MongoDB’s pipeline syntax:

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017")
db = client.shop
orders = db.orders

# Total revenue per customer in the last 30 days
from datetime import datetime, timedelta
since = datetime.utcnow() - timedelta(days=30)

pipeline = [
    {"$match": {"created_at": {"$gte": since}, "status": "paid"}},
    {"$group": {
        "_id": "$customer_id",
        "total": {"$sum": "$amount"},
        "order_count": {"$sum": 1},
    }},
    {"$sort": {"total": -1}},
    {"$limit": 10},
]

for row in orders.aggregate(pipeline):
    print(row["_id"], row["total"], row["order_count"])

The pipeline runs entirely server-side — only the final aggregated rows come over the wire. For analytics over millions of documents, this is the right tool. Use .explain() on a sample call to verify your $match stage hits an index.

Indexing for Performance

MongoDB queries without indexes scan every document — fine at 1,000 docs, fatal at 1 million. Create indexes on every field you filter, sort, or group by:

orders.create_index("customer_id")
orders.create_index([("status", 1), ("created_at", -1)])  # compound index
orders.create_index("order_number", unique=True)
orders.create_index([("description", "text")])  # full-text search

# Inspect what queries are doing
explain = orders.find({"customer_id": "abc"}).explain()
print(explain["executionStats"]["totalDocsExamined"])

A query that scans every doc has totalDocsExamined equal to the collection size. With an index, it should match totalKeysExamined — orders of magnitude smaller.

Async MongoDB with Motor

For async applications (FastAPI, asyncio web crawlers), use Motor — same API as pymongo but coroutine-based:

# pip install motor

from motor.motor_asyncio import AsyncIOMotorClient
import asyncio

async def main():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.shop
    await db.orders.insert_one({"customer": "alice", "amount": 99})
    docs = await db.orders.find({"customer": "alice"}).to_list(length=100)
    print(docs)

asyncio.run(main())

Common Pitfalls

  • Forgetting to close clients. MongoClient holds a connection pool. Create one at app startup, reuse it, close on shutdown — never per-request.
  • Treating ObjectId as a string. _id is an ObjectId, not a string. JSON-serialize with json.dumps(doc, default=str) or use bson’s json_util.
  • Letting documents grow unbounded. Embedded arrays that grow forever (audit logs, comments) blow past the 16MB document limit. Move them into their own collection.
  • Skipping schema validation. MongoDB is schema-less — which means YOU enforce the schema. Use $jsonSchema at the collection level or validate in Python with pydantic before insert.
  • Heavy reads on the primary. Configure read preference to secondary for analytics queries; spare the primary for writes.

FAQ

Q: When should I use MongoDB instead of Postgres?
A: When your data is genuinely document-shaped — nested, variable per record, evolving schema. For relational data with joins, Postgres wins on both performance and developer experience.

Q: How do I handle transactions?
A: MongoDB 4.0+ supports multi-document transactions via client.start_session() + session.with_transaction(). But the philosophy is to model your data so transactions are rarely needed.

Q: pymongo or Motor?
A: pymongo for sync code (Django, Flask, scripts). Motor for async (FastAPI, asyncio). Don’t mix — pick one per service.

Q: How do I migrate schema in a schema-less database?
A: Two strategies. (1) Lazy migration: write code that handles both old and new shapes, update docs as they’re read. (2) Batch migration: a one-off script that walks the collection and rewrites each doc. Lazy scales better.

Q: Should I use MongoDB Atlas or self-host?
A: Atlas for almost everyone. Self-hosting MongoDB correctly (replica sets, backups, monitoring, security) is full-time work for a DBA. Atlas’s free tier is generous and the paid tiers are competitive.

Wrapping Up

MongoDB shines when documents are the natural shape of your data, when you need horizontal scaling, or when you want a quick start with flexible schema. The pymongo driver maps cleanly onto MongoDB’s idioms — once you know find, update_one, aggregate, and indexing, you’ve covered 80% of daily work. For async services, switch to Motor with no API relearning. The remaining 20% — replica sets, sharding, time-series collections — wait until you actually need them.