How To Work with MongoDB in Python Using PyMongo

Quick Answer: MongoDB is a document-based NoSQL database that stores JSON-like data. Install PyMongo with pip install pymongo, then connect with client = MongoClient('mongodb://localhost:27017/'). Create databases and collections, perform CRUD operations with insert_one(), find(), update_one(), and delete_one(). Use aggregation pipelines for complex queries and GridFS for storing large files.

Understanding MongoDB and Document-Based Storage

MongoDB is a NoSQL database that stores data as flexible JSON-like documents instead of rigid table rows. This document-oriented approach allows you to store nested data structures without complex joins, making it ideal for applications with evolving schemas.

Key advantages of MongoDB:

  • Schema flexibility: Documents can have different structures
  • Nested data: Store complex hierarchical data naturally
  • Rich queries: Query and filter on any field
  • Horizontal scaling: Built-in sharding for distributing data
  • Indexing: Powerful indexing for fast queries
  • Aggregation: Complex data transformations in the database

Installing MongoDB and PyMongo

First, install MongoDB server. On macOS with Homebrew:

brew install mongodb-community
brew services start mongodb-community

On Ubuntu/Debian:

sudo apt-get install -y mongodb
sudo systemctl start mongod

Install the PyMongo Python driver:

pip install pymongo

Verify MongoDB is running:

mongosh --eval "db.adminCommand('ping')"
# Output: { ok: 1 }

Connecting to MongoDB

Create a basic connection to MongoDB:

from pymongo import MongoClient

# Connect to local MongoDB
client = MongoClient('mongodb://localhost:27017/')

# Get database
db = client['blog_database']

# Get collection
posts = db['posts']

# Test connection
print(client.server_info())

For production with authentication and connection pooling:

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

# Connect with credentials
client = MongoClient(
    'mongodb://username:password@mongodb.example.com:27017/',
    maxPoolSize=50,
    minPoolSize=10,
    serverSelectionTimeoutMS=5000,
    connectTimeoutMS=5000
)

# Verify connection
try:
    client.admin.command('ping')
    print("Connected to MongoDB successfully")
except ConnectionFailure:
    print("Failed to connect to MongoDB")

Alternative connection methods:

from pymongo import MongoClient

# Connection string
uri = 'mongodb://user:pass@host1:27017,host2:27017,host3:27017/database?replicaSet=rs0'
client = MongoClient(uri)

# Access database and collection
db = client.get_database('mydb')
collection = db.get_collection('mycollection')

Creating and Inserting Documents

Insert documents into MongoDB collections:

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Insert single document
post = {
    'title': 'Getting Started with MongoDB',
    'author': 'John Doe',
    'content': 'MongoDB is a flexible NoSQL database...',
    'tags': ['mongodb', 'nosql', 'database'],
    'created_at': datetime.datetime.utcnow(),
    'views': 0,
    'published': True
}

result = posts.insert_one(post)
print(f"Inserted document ID: {result.inserted_id}")

# Insert multiple documents
documents = [
    {
        'title': 'Python Best Practices',
        'author': 'Jane Smith',
        'tags': ['python', 'best-practices'],
        'views': 150
    },
    {
        'title': 'Web Development with Flask',
        'author': 'Bob Johnson',
        'tags': ['python', 'flask', 'web'],
        'views': 200
    }
]

result = posts.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")

# Insert with custom ID
post_custom = {
    '_id': 'post_001',
    'title': 'Custom ID Example',
    'author': 'Alice'
}
posts.insert_one(post_custom)

Reading Documents with Find Operations

Query documents from MongoDB:

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Find single document
post = posts.find_one({'author': 'John Doe'})
print(post)

# Find by ID
post_id = ObjectId('507f1f77bcf86cd799439011')
post = posts.find_one({'_id': post_id})

# Find all documents
all_posts = posts.find()
for post in all_posts:
    print(post['title'])

# Find with filters
python_posts = posts.find({'tags': {'contains': 'python'}})
python_posts = posts.find({'tags': 'python'})  # Simpler syntax

# Find with comparison operators
popular_posts = posts.find({'views': {'gt': 100}})
recent_posts = posts.find({'created_at': {'gte': datetime.datetime(2024, 1, 1)}})

# Multiple conditions
filtered = posts.find({
    'author': 'John Doe',
    'published': True
})

# Using OR operator
from pymongo import ASCENDING
query = {
    'author': {'in': ['John Doe', 'Jane Smith']}
}
posts_by_authors = posts.find(query)

# Find with projection (select specific fields)
titles_only = posts.find(
    {'published': True},
    {'title': 1, 'author': 1, '_id': 0}  # Include title and author, exclude ID
)

# Find with sorting
sorted_posts = posts.find().sort('views', -1).limit(5)  # Top 5 by views
recent = posts.find().sort('created_at', -1).limit(10)  # Latest 10

# Find with skip and limit (pagination)
page_size = 10
page_number = 2
skip = (page_number - 1) * page_size
posts_page = posts.find().skip(skip).limit(page_size)

Updating Documents

Modify existing documents in MongoDB:

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Update single document
result = posts.update_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    {'set': {'views': 500}}
)
print(f"Matched: {result.matched_count}, Modified: {result.modified_count}")

# Update with multiple fields
posts.update_one(
    {'author': 'John Doe'},
    {
        'set': {
            'title': 'Updated Title',
            'views': 999,
            'updated_at': datetime.datetime.utcnow()
        }
    }
)

# Increment views
posts.update_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    {'inc': {'views': 1}}
)

# Push item to array
posts.update_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    {'push': {'tags': 'new-tag'}}
)

# Update multiple documents
result = posts.update_many(
    {'author': 'John Doe'},
    {'set': {'verified': True}}
)
print(f"Modified {result.modified_count} documents")

# Replace entire document
new_post = {
    'title': 'Completely New Post',
    'author': 'Anonymous',
    'content': 'New content...'
}
posts.replace_one(
    {'_id': ObjectId('507f1f77bcf86cd799439011')},
    new_post
)

# Upsert: update or insert if not found
posts.update_one(
    {'title': 'MongoDB Guide'},
    {'set': {'author': 'Expert', 'views': 1000}},
    upsert=True  # Insert if not found
)

Deleting Documents

Remove documents from MongoDB:

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Delete single document
result = posts.delete_one({'author': 'Anonymous'})
print(f"Deleted {result.deleted_count} document")

# Delete multiple documents
result = posts.delete_many({'views': {'lt': 10}})
print(f"Deleted {result.deleted_count} low-view posts")

# Delete all documents
posts.delete_many({})

# Delete by ID
posts.delete_one({'_id': ObjectId('507f1f77bcf86cd799439011')})

Indexing for Performance

Create indexes to speed up queries:

from pymongo import MongoClient, ASCENDING, DESCENDING

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Create single field index
posts.create_index('author')
posts.create_index([('views', DESCENDING)])

# Create compound index
posts.create_index([
    ('author', ASCENDING),
    ('created_at', DESCENDING)
])

# Create unique index
posts.create_index('slug', unique=True)

# Create text search index
posts.create_index([('title', 'text'), ('content', 'text')])

# Text search using index
results = posts.find({'text': {'search': 'mongodb'}})

# List all indexes
indexes = posts.list_indexes()
for index in indexes:
    print(index['key'])

# Drop index
posts.drop_index('author_1')
posts.drop_index([('author', 1), ('created_at', -1)])

# Get index statistics
stats = db.command('collStats', 'posts')
print(f"Index size: {stats['totalIndexSize']}")

Aggregation Pipeline for Complex Queries

Perform complex data transformations using aggregation:

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
posts = db['posts']

# Basic aggregation: Group by author and count posts
pipeline = [
    {'group': {'_id': 'author', 'count': {'sum': 1}}}
]
result = posts.aggregate(pipeline)
for doc in result:
    print(f"{doc['_id']}: {doc['count']} posts")

# Match and group
pipeline = [
    {'match': {'published': True}},
    {'group': {'_id': 'author', 'total_views': {'sum': 'views'}}}
]

# Stage 1: Filter published posts
# Stage 2: Group by author
# Stage 3: Sort by views descending
# Stage 4: Limit to top 5
pipeline = [
    {'match': {'published': True}},
    {'group': {
        '_id': 'author',
        'total_views': {'sum': 'views'},
        'post_count': {'sum': 1}
    }},
    {'sort': {'total_views': -1}},
    {'limit': 5}
]
top_authors = posts.aggregate(pipeline)

# Project selected fields
pipeline = [
    {'match': {'views': {'gte': 100}}},
    {'project': {
        'title': 1,
        'author': 1,
        'views': 1,
        '_id': 0
    }}
]

# Unwind array field
pipeline = [
    {'unwind': 'tags'},
    {'group': {'_id': 'tags', 'count': {'sum': 1}}},
    {'sort': {'count': -1}}
]
tag_stats = posts.aggregate(pipeline)

# Lookup (join with another collection)
users_collection = db['users']
pipeline = [
    {'lookup': {
        'from': 'users',
        'localField': 'author',
        'foreignField': 'name',
        'as': 'author_info'
    }},
    {'unwind': 'author_info'},
    {'project': {
        'title': 1,
        'author': 1,
        'author_email': 'author_info.email'
    }}
]

# Faceted aggregation (multiple aggregations in one)
pipeline = [
    {'facet': {
        'by_author': [
            {'group': {'_id': 'author', 'count': {'sum': 1}}}
        ],
        'by_tag': [
            {'unwind': 'tags'},
            {'group': {'_id': 'tags', 'count': {'sum': 1}}}
        ],
        'stats': [
            {'group': {
                '_id': None,
                'total_posts': {'sum': 1},
                'avg_views': {'avg': 'views'}
            }}
        ]
    }}
]

GridFS for Large File Storage

Store files larger than 16MB in MongoDB using GridFS:

from pymongo import MongoClient
from gridfs import GridFS

client = MongoClient('mongodb://localhost:27017/')
db = client['blog_database']
fs = GridFS(db)

# Store file
with open('document.pdf', 'rb') as f:
    file_id = fs.put(f, filename='document.pdf', content_type='application/pdf')

print(f"File stored with ID: {file_id}")

# Retrieve file
with open('downloaded_document.pdf', 'wb') as f:
    f.write(fs.get(file_id).read())

# List all files
for grid_out in fs.find({'filename': 'document.pdf'}):
    print(f"File: {grid_out.filename}, Size: {grid_out.length}")

# Delete file
fs.delete(file_id)

# Store with metadata
with open('image.jpg', 'rb') as f:
    file_id = fs.put(
        f,
        filename='profile.jpg',
        content_type='image/jpeg',
        user_id='user_123',
        uploaded_by='John Doe'
    )

# Retrieve with metadata
grid_out = fs.get(file_id)
print(f"Uploaded by: {grid_out.uploaded_by}")
print(f"User ID: {grid_out.user_id}")

Troubleshooting Common MongoDB Issues

Issue Cause Solution
Connection refused MongoDB server not running Start MongoDB: brew services start mongodb-community or systemctl start mongod
Slow queries Missing indexes on frequently queried fields Create indexes: collection.create_index('field_name'). Check query plans with explain()
Duplicate key error Unique index constraint violation Ensure unique values or remove unique index constraint
Out of memory errors Aggregation pipeline processing too much data Add $match stage early, limit results with $limit, use allowDiskUse=True
Document too large Document exceeds 16MB size limit Use GridFS for large documents or split data across documents
Authentication failed Wrong credentials or database Verify username, password, and database name in connection string

Real-Life Example: Blog Content Management System

Here’s a complete blog CMS using MongoDB and PyMongo:

from pymongo import MongoClient, ASCENDING, DESCENDING
from bson.objectid import ObjectId
from datetime import datetime, timedelta
import json

class BlogCMS:
    def __init__(self):
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client['blog_cms']
        self.posts = self.db['posts']
        self.comments = self.db['comments']
        self.users = self.db['users']
        self._create_indexes()

    def _create_indexes(self):
        """Create necessary indexes"""
        self.posts.create_index('slug', unique=True)
        self.posts.create_index([('author', ASCENDING), ('created_at', DESCENDING)])
        self.posts.create_index([('title', 'text'), ('content', 'text')])
        self.comments.create_index('post_id')

    def create_post(self, title, content, author, tags, excerpt=''):
        """Create new blog post"""
        slug = title.lower().replace(' ', '-')
        post = {
            'title': title,
            'content': content,
            'excerpt': excerpt,
            'author': author,
            'tags': tags,
            'slug': slug,
            'created_at': datetime.utcnow(),
            'updated_at': datetime.utcnow(),
            'published': False,
            'views': 0,
            'comments_count': 0
        }
        result = self.posts.insert_one(post)
        return result.inserted_id

    def publish_post(self, post_id):
        """Publish a draft post"""
        self.posts.update_one(
            {'_id': ObjectId(post_id)},
            {'set': {
                'published': True,
                'published_at': datetime.utcnow()
            }}
        )

    def get_published_posts(self, page=1, per_page=10):
        """Get published posts with pagination"""
        skip = (page - 1) * per_page
        posts = self.posts.find(
            {'published': True},
            sort=[('created_at', -1)]
        ).skip(skip).limit(per_page)
        return list(posts)

    def search_posts(self, query):
        """Full-text search in posts"""
        results = self.posts.find(
            {'text': {'search': query}},
            {'score': {'meta': 'textScore'}}
        ).sort([('score', {'meta': 'textScore'})])
        return list(results)

    def get_post_by_slug(self, slug):
        """Get post by slug and increment views"""
        self.posts.update_one(
            {'slug': slug},
            {'inc': {'views': 1}}
        )
        return self.posts.find_one({'slug': slug})

    def add_comment(self, post_id, author, content):
        """Add comment to post"""
        comment = {
            'post_id': ObjectId(post_id),
            'author': author,
            'content': content,
            'created_at': datetime.utcnow(),
            'approved': False
        }
        result = self.comments.insert_one(comment)

        # Update comment count
        self.posts.update_one(
            {'_id': ObjectId(post_id)},
            {'inc': {'comments_count': 1}}
        )
        return result.inserted_id

    def get_post_comments(self, post_id, approved_only=True):
        """Get comments for post"""
        query = {'post_id': ObjectId(post_id)}
        if approved_only:
            query['approved'] = True

        return list(self.comments.find(query).sort('created_at', -1))

    def get_trending_posts(self, days=7):
        """Get trending posts from last N days"""
        since = datetime.utcnow() - timedelta(days=days)
        return list(self.posts.find(
            {'created_at': {'gte': since}, 'published': True}
        ).sort('views', -1).limit(10))

    def get_author_stats(self, author):
        """Get statistics for an author"""
        pipeline = [
            {'match': {'author': author, 'published': True}},
            {'group': {
                '_id': author,
                'total_posts': {'sum': 1},
                'total_views': {'sum': 'views'},
                'avg_views': {'avg': 'views'}
            }}
        ]
        return list(self.posts.aggregate(pipeline))

    def delete_post(self, post_id):
        """Delete post and its comments"""
        # Delete comments
        self.comments.delete_many({'post_id': ObjectId(post_id)})
        # Delete post
        self.posts.delete_one({'_id': ObjectId(post_id)})

# Usage
cms = BlogCMS()

# Create post
post_id = cms.create_post(
    title='MongoDB Best Practices',
    content='MongoDB is a flexible...',
    excerpt='Learn MongoDB best practices',
    author='John Doe',
    tags=['mongodb', 'database', 'tutorial']
)

# Publish post
cms.publish_post(post_id)

# Get published posts
posts = cms.get_published_posts(page=1, per_page=10)

# Search
results = cms.search_posts('mongodb python')

# Get post by slug
post = cms.get_post_by_slug('mongodb-best-practices')

# Add comment
cms.add_comment(post_id, 'Jane Smith', 'Great article!')

# Get comments
comments = cms.get_post_comments(post_id)

# Get author stats
stats = cms.get_author_stats('John Doe')
print(stats)

This CMS demonstrates:

  • CRUD operations on multiple collections
  • Unique constraints with indexes
  • Full-text search capability
  • Aggregation for statistics
  • Pagination for large result sets
  • Relationship management between collections
  • Automatic counter updates

MongoDB Best Practices

Follow these guidelines for optimal MongoDB usage:

  • Design documents carefully: Plan your data structure before implementation
  • Use appropriate indexes: Index frequently queried fields
  • Avoid excessive nesting: Keep document depth reasonable
  • Use ObjectId for relationships: Reference documents with IDs
  • Implement validation: Use schema validation in MongoDB 3.6+
  • Monitor query performance: Use explain() to analyze queries
  • Configure backup: Enable oplog and regular snapshots
  • Use connection pooling: Reuse connections across requests

FAQ

Q: Should I use MongoDB or a relational database?

A: Use MongoDB for flexible schemas and hierarchical data. Use relational databases for structured data with complex relationships. Many applications use both.

Q: Does MongoDB support transactions?

A: Yes, MongoDB 4.0+ supports ACID transactions. Single document transactions are atomic by default. Multi-document transactions available in replica sets and sharded clusters.

Q: How do I backup MongoDB?

A: Use mongodump to export data and mongorestore to restore. Enable oplog for continuous backups, or use MongoDB Atlas automated backups.

Q: Can MongoDB handle joins like SQL databases?

A: MongoDB uses the $lookup aggregation stage for joins, or you can denormalize data by embedding related documents.

Q: What is the 16MB document size limit?

A: MongoDB documents cannot exceed 16MB. Use GridFS for larger data or split into multiple documents with references.