How To Work with MongoDB in Python Using PyMongo
pip install pymongo, then connect with client = MongoClient('mongodb://localhost:27017/'). Create databases and collections, perform CRUD operations with insert_one(), find(), update_one(), and delete_one(). Use aggregation pipelines for complex queries and GridFS for storing large files.
Understanding MongoDB and Document-Based Storage
MongoDB is a NoSQL database that stores data as flexible JSON-like documents instead of rigid table rows. This document-oriented approach allows you to store nested data structures without complex joins, making it ideal for applications with evolving schemas.
Key advantages of MongoDB:
- Schema flexibility: Documents can have different structures
- Nested data: Store complex hierarchical data naturally
- Rich queries: Query and filter on any field
- Horizontal scaling: Built-in sharding for distributing data
- Indexing: Powerful indexing for fast queries
- Aggregation: Complex data transformations in the database
Installing MongoDB and PyMongo
First, install MongoDB server. On macOS with Homebrew:
brew install mongodb-community
brew services start mongodb-community
On Ubuntu/Debian:
sudo apt-get install -y mongodb
sudo systemctl start mongod
Install the PyMongo Python driver:
pip install pymongo
Verify MongoDB is running:
mongosh --eval "db.adminCommand('ping')"
# Output: { ok: 1 }
Connecting to MongoDB
Create a basic connection to MongoDB:
from pymongo import MongoClient
# Connect to local MongoDB
client = MongoClient('mongodb://localhost:27017/')
# Get database
db = client['blog_database']
# Get collection
posts = db['posts']
# Test connection
print(client.server_info())
For production with authentication and connection pooling:
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
# Connect with credentials
client = MongoClient(
'mongodb://username:password@mongodb.example.com:27017/',
maxPoolSize=50,
minPoolSize=10,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=5000
)
# Verify connection
try:
client.admin.command('ping')
print("Connected to MongoDB successfully")
except ConnectionFailure:
print("Failed to connect to MongoDB")
Alternative connection methods:
from pymongo import MongoClient
# Connection string
uri = 'mongodb://user:pass@host1:27017,host2:27017,host3:27017/database?replicaSet=rs0'
client = MongoClient(uri)
# Access database and collection
db = client.get_database('mydb')
collection = db.get_collection('mycollection')
Creating and Inserting Documents
Insert documents into MongoDB collections:
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']
# Insert single document
post = {
'title': 'Getting Started with MongoDB',
'author': 'John Doe',
'content': 'MongoDB is a flexible NoSQL database...',
'tags': ['mongodb', 'nosql', 'database'],
'created_at': datetime.datetime.utcnow(),
'views': 0,
'published': True
}
result = posts.insert_one(post)
print(f"Inserted document ID: {result.inserted_id}")
# Insert multiple documents
documents = [
{
'title': 'Python Best Practices',
'author': 'Jane Smith',
'tags': ['python', 'best-practices'],
'views': 150
},
{
'title': 'Web Development with Flask',
'author': 'Bob Johnson',
'tags': ['python', 'flask', 'web'],
'views': 200
}
]
result = posts.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")
# Insert with custom ID
post_custom = {
'_id': 'post_001',
'title': 'Custom ID Example',
'author': 'Alice'
}
posts.insert_one(post_custom)
Reading Documents with Find Operations
Query documents from MongoDB:
from pymongo import MongoClient
from bson.objectid import ObjectId
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']
# Find single document
post = posts.find_one({'author': 'John Doe'})
print(post)
# Find by ID
post_id = ObjectId('507f1f77bcf86cd799439011')
post = posts.find_one({'_id': post_id})
# Find all documents
all_posts = posts.find()
for post in all_posts:
print(post['title'])
# Find with filters
python_posts = posts.find({'tags': {'contains': 'python'}})
python_posts = posts.find({'tags': 'python'}) # Simpler syntax
# Find with comparison operators
popular_posts = posts.find({'views': {'gt': 100}})
recent_posts = posts.find({'created_at': {'gte': datetime.datetime(2024, 1, 1)}})
# Multiple conditions
filtered = posts.find({
'author': 'John Doe',
'published': True
})
# Using OR operator
from pymongo import ASCENDING
query = {
'author': {'in': ['John Doe', 'Jane Smith']}
}
posts_by_authors = posts.find(query)
# Find with projection (select specific fields)
titles_only = posts.find(
{'published': True},
{'title': 1, 'author': 1, '_id': 0} # Include title and author, exclude ID
)
# Find with sorting
sorted_posts = posts.find().sort('views', -1).limit(5) # Top 5 by views
recent = posts.find().sort('created_at', -1).limit(10) # Latest 10
# Find with skip and limit (pagination)
page_size = 10
page_number = 2
skip = (page_number - 1) * page_size
posts_page = posts.find().skip(skip).limit(page_size)
Updating Documents
Modify existing documents in MongoDB:
from pymongo import MongoClient
from bson.objectid import ObjectId
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']
# Update single document
result = posts.update_one(
{'_id': ObjectId('507f1f77bcf86cd799439011')},
{'set': {'views': 500}}
)
print(f"Matched: {result.matched_count}, Modified: {result.modified_count}")
# Update with multiple fields
posts.update_one(
{'author': 'John Doe'},
{
'set': {
'title': 'Updated Title',
'views': 999,
'updated_at': datetime.datetime.utcnow()
}
}
)
# Increment views
posts.update_one(
{'_id': ObjectId('507f1f77bcf86cd799439011')},
{'inc': {'views': 1}}
)
# Push item to array
posts.update_one(
{'_id': ObjectId('507f1f77bcf86cd799439011')},
{'push': {'tags': 'new-tag'}}
)
# Update multiple documents
result = posts.update_many(
{'author': 'John Doe'},
{'set': {'verified': True}}
)
print(f"Modified {result.modified_count} documents")
# Replace entire document
new_post = {
'title': 'Completely New Post',
'author': 'Anonymous',
'content': 'New content...'
}
posts.replace_one(
{'_id': ObjectId('507f1f77bcf86cd799439011')},
new_post
)
# Upsert: update or insert if not found
posts.update_one(
{'title': 'MongoDB Guide'},
{'set': {'author': 'Expert', 'views': 1000}},
upsert=True # Insert if not found
)
Deleting Documents
Remove documents from MongoDB:
from pymongo import MongoClient
from bson.objectid import ObjectId
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']
# Delete single document
result = posts.delete_one({'author': 'Anonymous'})
print(f"Deleted {result.deleted_count} document")
# Delete multiple documents
result = posts.delete_many({'views': {'lt': 10}})
print(f"Deleted {result.deleted_count} low-view posts")
# Delete all documents
posts.delete_many({})
# Delete by ID
posts.delete_one({'_id': ObjectId('507f1f77bcf86cd799439011')})
Indexing for Performance
Create indexes to speed up queries:
from pymongo import MongoClient, ASCENDING, DESCENDING
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']
# Create single field index
posts.create_index('author')
posts.create_index([('views', DESCENDING)])
# Create compound index
posts.create_index([
('author', ASCENDING),
('created_at', DESCENDING)
])
# Create unique index
posts.create_index('slug', unique=True)
# Create text search index
posts.create_index([('title', 'text'), ('content', 'text')])
# Text search using index
results = posts.find({'text': {'search': 'mongodb'}})
# List all indexes
indexes = posts.list_indexes()
for index in indexes:
print(index['key'])
# Drop index
posts.drop_index('author_1')
posts.drop_index([('author', 1), ('created_at', -1)])
# Get index statistics
stats = db.command('collStats', 'posts')
print(f"Index size: {stats['totalIndexSize']}")
Aggregation Pipeline for Complex Queries
Perform complex data transformations using aggregation:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']
# Basic aggregation: Group by author and count posts
pipeline = [
{'group': {'_id': 'author', 'count': {'sum': 1}}}
]
result = posts.aggregate(pipeline)
for doc in result:
print(f"{doc['_id']}: {doc['count']} posts")
# Match and group
pipeline = [
{'match': {'published': True}},
{'group': {'_id': 'author', 'total_views': {'sum': 'views'}}}
]
# Stage 1: Filter published posts
# Stage 2: Group by author
# Stage 3: Sort by views descending
# Stage 4: Limit to top 5
pipeline = [
{'match': {'published': True}},
{'group': {
'_id': 'author',
'total_views': {'sum': 'views'},
'post_count': {'sum': 1}
}},
{'sort': {'total_views': -1}},
{'limit': 5}
]
top_authors = posts.aggregate(pipeline)
# Project selected fields
pipeline = [
{'match': {'views': {'gte': 100}}},
{'project': {
'title': 1,
'author': 1,
'views': 1,
'_id': 0
}}
]
# Unwind array field
pipeline = [
{'unwind': 'tags'},
{'group': {'_id': 'tags', 'count': {'sum': 1}}},
{'sort': {'count': -1}}
]
tag_stats = posts.aggregate(pipeline)
# Lookup (join with another collection)
users_collection = db['users']
pipeline = [
{'lookup': {
'from': 'users',
'localField': 'author',
'foreignField': 'name',
'as': 'author_info'
}},
{'unwind': 'author_info'},
{'project': {
'title': 1,
'author': 1,
'author_email': 'author_info.email'
}}
]
# Faceted aggregation (multiple aggregations in one)
pipeline = [
{'facet': {
'by_author': [
{'group': {'_id': 'author', 'count': {'sum': 1}}}
],
'by_tag': [
{'unwind': 'tags'},
{'group': {'_id': 'tags', 'count': {'sum': 1}}}
],
'stats': [
{'group': {
'_id': None,
'total_posts': {'sum': 1},
'avg_views': {'avg': 'views'}
}}
]
}}
]
GridFS for Large File Storage
Store files larger than 16MB in MongoDB using GridFS:
from pymongo import MongoClient
from gridfs import GridFS
client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
fs = GridFS(db)
# Store file
with open('document.pdf', 'rb') as f:
file_id = fs.put(f, filename='document.pdf', content_type='application/pdf')
print(f"File stored with ID: {file_id}")
# Retrieve file
with open('downloaded_document.pdf', 'wb') as f:
f.write(fs.get(file_id).read())
# List all files
for grid_out in fs.find({'filename': 'document.pdf'}):
print(f"File: {grid_out.filename}, Size: {grid_out.length}")
# Delete file
fs.delete(file_id)
# Store with metadata
with open('image.jpg', 'rb') as f:
file_id = fs.put(
f,
filename='profile.jpg',
content_type='image/jpeg',
user_id='user_123',
uploaded_by='John Doe'
)
# Retrieve with metadata
grid_out = fs.get(file_id)
print(f"Uploaded by: {grid_out.uploaded_by}")
print(f"User ID: {grid_out.user_id}")
Troubleshooting Common MongoDB Issues
| Issue | Cause | Solution |
|---|---|---|
| Connection refused | MongoDB server not running | Start MongoDB: brew services start mongodb-community or systemctl start mongod |
| Slow queries | Missing indexes on frequently queried fields | Create indexes: collection.create_index('field_name'). Check query plans with explain() |
| Duplicate key error | Unique index constraint violation | Ensure unique values or remove unique index constraint |
| Out of memory errors | Aggregation pipeline processing too much data | Add $match stage early, limit results with $limit, use allowDiskUse=True |
| Document too large | Document exceeds 16MB size limit | Use GridFS for large documents or split data across documents |
| Authentication failed | Wrong credentials or database | Verify username, password, and database name in connection string |
Real-Life Example: Blog Content Management System
Here’s a complete blog CMS using MongoDB and PyMongo:
from pymongo import MongoClient, ASCENDING, DESCENDING
from bson.objectid import ObjectId
from datetime import datetime, timedelta
import json
class BlogCMS:
def __init__(self):
self.client = MongoClient('mongodb://localhost:27017/')
self.db = self.client['blog_cms']
self.posts = self.db['posts']
self.comments = self.db['comments']
self.users = self.db['users']
self._create_indexes()
def _create_indexes(self):
"""Create necessary indexes"""
self.posts.create_index('slug', unique=True)
self.posts.create_index([('author', ASCENDING), ('created_at', DESCENDING)])
self.posts.create_index([('title', 'text'), ('content', 'text')])
self.comments.create_index('post_id')
def create_post(self, title, content, author, tags, excerpt=''):
"""Create new blog post"""
slug = title.lower().replace(' ', '-')
post = {
'title': title,
'content': content,
'excerpt': excerpt,
'author': author,
'tags': tags,
'slug': slug,
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
'published': False,
'views': 0,
'comments_count': 0
}
result = self.posts.insert_one(post)
return result.inserted_id
def publish_post(self, post_id):
"""Publish a draft post"""
self.posts.update_one(
{'_id': ObjectId(post_id)},
{'set': {
'published': True,
'published_at': datetime.utcnow()
}}
)
def get_published_posts(self, page=1, per_page=10):
"""Get published posts with pagination"""
skip = (page - 1) * per_page
posts = self.posts.find(
{'published': True},
sort=[('created_at', -1)]
).skip(skip).limit(per_page)
return list(posts)
def search_posts(self, query):
"""Full-text search in posts"""
results = self.posts.find(
{'text': {'search': query}},
{'score': {'meta': 'textScore'}}
).sort([('score', {'meta': 'textScore'})])
return list(results)
def get_post_by_slug(self, slug):
"""Get post by slug and increment views"""
self.posts.update_one(
{'slug': slug},
{'inc': {'views': 1}}
)
return self.posts.find_one({'slug': slug})
def add_comment(self, post_id, author, content):
"""Add comment to post"""
comment = {
'post_id': ObjectId(post_id),
'author': author,
'content': content,
'created_at': datetime.utcnow(),
'approved': False
}
result = self.comments.insert_one(comment)
# Update comment count
self.posts.update_one(
{'_id': ObjectId(post_id)},
{'inc': {'comments_count': 1}}
)
return result.inserted_id
def get_post_comments(self, post_id, approved_only=True):
"""Get comments for post"""
query = {'post_id': ObjectId(post_id)}
if approved_only:
query['approved'] = True
return list(self.comments.find(query).sort('created_at', -1))
def get_trending_posts(self, days=7):
"""Get trending posts from last N days"""
since = datetime.utcnow() - timedelta(days=days)
return list(self.posts.find(
{'created_at': {'gte': since}, 'published': True}
).sort('views', -1).limit(10))
def get_author_stats(self, author):
"""Get statistics for an author"""
pipeline = [
{'match': {'author': author, 'published': True}},
{'group': {
'_id': author,
'total_posts': {'sum': 1},
'total_views': {'sum': 'views'},
'avg_views': {'avg': 'views'}
}}
]
return list(self.posts.aggregate(pipeline))
def delete_post(self, post_id):
"""Delete post and its comments"""
# Delete comments
self.comments.delete_many({'post_id': ObjectId(post_id)})
# Delete post
self.posts.delete_one({'_id': ObjectId(post_id)})
# Usage
cms = BlogCMS()
# Create post
post_id = cms.create_post(
title='MongoDB Best Practices',
content='MongoDB is a flexible...',
excerpt='Learn MongoDB best practices',
author='John Doe',
tags=['mongodb', 'database', 'tutorial']
)
# Publish post
cms.publish_post(post_id)
# Get published posts
posts = cms.get_published_posts(page=1, per_page=10)
# Search
results = cms.search_posts('mongodb python')
# Get post by slug
post = cms.get_post_by_slug('mongodb-best-practices')
# Add comment
cms.add_comment(post_id, 'Jane Smith', 'Great article!')
# Get comments
comments = cms.get_post_comments(post_id)
# Get author stats
stats = cms.get_author_stats('John Doe')
print(stats)
This CMS demonstrates:
- CRUD operations on multiple collections
- Unique constraints with indexes
- Full-text search capability
- Aggregation for statistics
- Pagination for large result sets
- Relationship management between collections
- Automatic counter updates
MongoDB Best Practices
Follow these guidelines for optimal MongoDB usage:
- Design documents carefully: Plan your data structure before implementation
- Use appropriate indexes: Index frequently queried fields
- Avoid excessive nesting: Keep document depth reasonable
- Use ObjectId for relationships: Reference documents with IDs
- Implement validation: Use schema validation in MongoDB 3.6+
- Monitor query performance: Use explain() to analyze queries
- Configure backup: Enable oplog and regular snapshots
- Use connection pooling: Reuse connections across requests
FAQ
Q: Should I use MongoDB or a relational database?
A: Use MongoDB for flexible schemas and hierarchical data. Use relational databases for structured data with complex relationships. Many applications use both.
Q: Does MongoDB support transactions?
A: Yes, MongoDB 4.0+ supports ACID transactions. Single document transactions are atomic by default. Multi-document transactions available in replica sets and sharded clusters.
Q: How do I backup MongoDB?
A: Use mongodump to export data and mongorestore to restore. Enable oplog for continuous backups, or use MongoDB Atlas automated backups.
Q: Can MongoDB handle joins like SQL databases?
A: MongoDB uses the $lookup aggregation stage for joins, or you can denormalize data by embedding related documents.
Q: What is the 16MB document size limit?
A: MongoDB documents cannot exceed 16MB. Use GridFS for larger data or split into multiple documents with references.