How To Connect Python to Redis for Caching and Queues
pip install redis, then connect with r = redis.Redis(host='localhost', port=6379, db=0). Use string operations like r.set() and r.get() for caching, list operations for queues, and pub/sub for real-time messaging. Redis data expires automatically using TTL settings.
Understanding Redis and Its Use Cases
Redis is an open-source, in-memory data structure store that operates at extremely high speeds. Unlike traditional databases that store data on disk, Redis keeps everything in RAM, making it ideal for applications requiring sub-millisecond response times.
Redis is particularly useful for:
- Caching: Store frequently accessed data to reduce database load
- Sessions: Store user sessions for web applications
- Task Queues: Implement job queues with multiple workers
- Pub/Sub Messaging: Build real-time messaging systems
- Leaderboards: Track scores and rankings efficiently
- Rate Limiting: Implement API rate limiting
Installing Redis and redis-py
First, install the Redis server. On macOS using Homebrew:
brew install redis
brew services start redis
On Ubuntu/Debian:
sudo apt-get install redis-server
sudo systemctl start redis-server
Next, install the Python redis library:
pip install redis
Verify Redis is running:
redis-cli ping
# Output: PONG
Check Redis version and info:
redis-cli --version
redis-cli info server
Connecting to Redis from Python
Create a basic connection to Redis:
import redis
# Connect to local Redis instance
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # Decode bytes to strings
)
# Test the connection
print(r.ping()) # Output: True
print(r.echo('Hello Redis!')) # Output: Hello Redis!
For production with password authentication and connection pooling:
import redis
from redis import ConnectionPool
# Create connection pool for better performance
pool = ConnectionPool(
host='redis.example.com',
port=6379,
db=0,
password='your_password',
max_connections=50,
decode_responses=True
)
# Create Redis client from pool
r = redis.Redis(connection_pool=pool)
# Or use URL connection string
r = redis.from_url(
'redis://:password@redis.example.com:6379/0',
decode_responses=True
)
String Operations for Caching
Strings are the simplest Redis data type, perfect for caching:
import redis
import json
from datetime import timedelta
r = redis.Redis(decode_responses=True)
# SET and GET operations
r.set('user:1:name', 'John Doe')
name = r.get('user:1:name')
print(name) # Output: John Doe
# SET with expiration
r.set('session:abc123', 'user_data', ex=3600) # Expires in 1 hour
# GET with fallback
user_data = r.get('user:2:name')
if not user_data:
print("User not in cache, fetch from database")
user_data = 'Jane Smith'
r.set('user:2:name', user_data, ex=3600)
# Cache JSON data
user_dict = {'id': 3, 'name': 'Bob', 'email': 'bob@example.com'}
r.set('user:3', json.dumps(user_dict), ex=7200)
cached_user = json.loads(r.get('user:3'))
print(cached_user) # Output: {'id': 3, 'name': 'Bob', 'email': 'bob@example.com'}
# Multiple operations
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
values = r.mget(['key1', 'key2', 'key3'])
print(values) # Output: ['value1', 'value2', 'value3']
# Atomic increment/decrement
r.set('page_views', 100)
r.incr('page_views') # Increment by 1
r.incrby('page_views', 5) # Increment by 5
r.decr('page_views') # Decrement by 1
print(r.get('page_views')) # Output: 105
Hash Operations for Complex Data
Hashes store multiple fields under a single key, ideal for object data:
import redis
r = redis.Redis(decode_responses=True)
# Store user object as hash
r.hset('user:100', mapping={
'name': 'Alice Johnson',
'email': 'alice@example.com',
'age': '28',
'city': 'New York'
})
# Get single field
email = r.hget('user:100', 'email')
print(email) # Output: alice@example.com
# Get all fields
user_data = r.hgetall('user:100')
print(user_data)
# Output: {'name': 'Alice Johnson', 'email': 'alice@example.com', 'age': '28', 'city': 'New York'}
# Get multiple fields
info = r.hmget('user:100', ['name', 'email'])
print(info) # Output: ['Alice Johnson', 'alice@example.com']
# Update single field
r.hset('user:100', 'age', '29')
# Increment hash field
r.hset('product:1', mapping={
'name': 'Laptop',
'price': '999.99',
'stock': '50'
})
r.hincrbyfloat('product:1', 'price', 100.00) # Increase price
print(r.hget('product:1', 'price')) # Output: 1099.99
# Check if field exists
exists = r.hexists('user:100', 'city')
print(exists) # Output: 1 (True)
# Get all field names
fields = r.hkeys('user:100')
print(fields) # Output: ['name', 'email', 'age', 'city']
# Get all values
values = r.hvals('user:100')
print(values) # Output: ['Alice Johnson', 'alice@example.com', '29', 'New York']
List Operations for Queues
Lists are perfect for implementing FIFO queues and job processing:
import redis
import json
r = redis.Redis(decode_responses=True)
# Push items to queue (FIFO)
r.rpush('tasks:queue', 'task1', 'task2', 'task3')
# Get queue length
queue_length = r.llen('tasks:queue')
print(f"Queue has {queue_length} tasks") # Output: Queue has 3 tasks
# Pop item from queue (blocking, 0 = no timeout)
task = r.blpop('tasks:queue', timeout=0)
print(task) # Output: ('tasks:queue', 'task1')
# Process with worker pattern
def worker():
while True:
# Block until task available
result = r.blpop('tasks:queue', timeout=1)
if result:
queue_name, task = result
print(f"Processing: {task}")
# Process task here
else:
print("No tasks, waiting...")
# Get all items without removing
all_tasks = r.lrange('tasks:queue', 0, -1)
print(all_tasks) # Output: ['task2', 'task3']
# Push email to processing queue
email_tasks = [
json.dumps({'to': 'user1@example.com', 'subject': 'Welcome'}),
json.dumps({'to': 'user2@example.com', 'subject': 'Newsletter'}),
json.dumps({'to': 'user3@example.com', 'subject': 'Alert'})
]
for email_task in email_tasks:
r.rpush('email:queue', email_task)
# Consumer processes emails
while r.llen('email:queue') > 0:
email_json = r.lpop('email:queue')
email = json.loads(email_json)
print(f"Sending email to {email['to']}: {email['subject']}")
Set Operations for Unique Data
Sets store unique values and are efficient for membership testing:
import redis
r = redis.Redis(decode_responses=True)
# Add items to set
r.sadd('online_users', 'user1', 'user2', 'user3', 'user4')
# Check membership
is_online = r.sismember('online_users', 'user1')
print(is_online) # Output: 1 (True)
# Get all members
users = r.smembers('online_users')
print(users) # Output: {'user1', 'user2', 'user3', 'user4'}
# Get set cardinality (size)
online_count = r.scard('online_users')
print(f"Online users: {online_count}") # Output: Online users: 4
# Remove items
r.srem('online_users', 'user2')
# Set operations for user tags
r.sadd('interests:user1', 'python', 'javascript', 'databases')
r.sadd('interests:user2', 'python', 'django', 'web')
# Intersection: common interests
common = r.sinter('interests:user1', 'interests:user2')
print(f"Common interests: {common}") # Output: Common interests: {'python'}
# Union: all interests
all_interests = r.sunion('interests:user1', 'interests:user2')
print(all_interests)
# Difference: unique to user1
unique_to_user1 = r.sdiff('interests:user1', 'interests:user2')
print(unique_to_user1)
Pub/Sub Messaging for Real-Time Communication
Publish/Subscribe pattern enables real-time messaging between applications:
import redis
import threading
import time
# Publisher
def publisher():
r = redis.Redis(decode_responses=True)
for i in range(5):
message = f"Message {i+1}"
r.publish('chat:room1', message)
print(f"Published: {message}")
time.sleep(1)
# Subscriber
def subscriber():
r = redis.Redis(decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe('chat:room1')
print("Listening for messages...")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
# Run publisher and subscriber in threads
publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)
subscriber_thread.start()
time.sleep(1) # Let subscriber start first
publisher_thread.start()
publisher_thread.join()
subscriber_thread.join(timeout=10)
Pattern-based subscriptions:
import redis
r = redis.Redis(decode_responses=True)
pubsub = r.pubsub()
# Subscribe to pattern
pubsub.psubscribe('notifications:*')
# Messages will match notifications:user1, notifications:user2, etc.
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Pattern: {message['pattern']}")
print(f"Channel: {message['channel']}")
print(f"Message: {message['data']}")
Expiration and Time-To-Live (TTL)
Redis automatically deletes data when TTL expires:
import redis
import time
r = redis.Redis(decode_responses=True)
# Set with expiration in seconds
r.set('temp_session', 'session_data', ex=60)
# Set with expiration at specific timestamp
import datetime
expire_at = datetime.datetime.now() + datetime.timedelta(hours=1)
r.expireat('temp_session', expire_at)
# Set expiration on existing key
r.set('api_token', 'token_123')
r.expire('api_token', 3600) # Expire in 1 hour
# Get remaining TTL
ttl = r.ttl('api_token')
print(f"TTL: {ttl} seconds") # Output: TTL: 3599 seconds
# Get TTL in milliseconds
pttl = r.pttl('api_token')
print(f"PTTL: {pttl} ms")
# Make key persistent (remove expiration)
r.persist('api_token')
# Check if key has expiration
ttl = r.ttl('api_token')
print(ttl) # Output: -1 (no expiration)
# Sliding window rate limiting
def rate_limit(user_id, max_requests=10, window=60):
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= max_requests
# Test rate limiting
for i in range(15):
allowed = rate_limit('user_123', max_requests=10, window=60)
print(f"Request {i+1}: {'Allowed' if allowed else 'Blocked'}")
Troubleshooting Common Redis Issues
| Issue | Cause | Solution |
|---|---|---|
| Connection refused error | Redis server not running | Start Redis: redis-server or brew services start redis |
| Slow performance | Memory full or eviction policy misconfigured | Check memory: redis-cli info memory. Adjust maxmemory policy |
| Data loss on restart | RDB persistence not enabled | Enable RDB in redis.conf or use BGSAVE |
| Memory usage increasing | Keys not expiring or memory leaks | Check TTL on keys, implement proper expiration policy |
| Network timeouts | Connection pool exhausted | Increase max_connections or reduce concurrent requests |
| High CPU usage | Complex operations or too many clients | Optimize operations, limit client connections |
Real-Life Example: Session Caching for Web Apps
Here's a complete session management system using Redis:
import redis
import json
import secrets
from datetime import datetime, timedelta
class RedisSessionManager:
def __init__(self, redis_host='localhost', redis_port=6379):
self.r = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.session_ttl = 3600 # 1 hour
def create_session(self, user_id, user_data):
"""Create new session for user"""
session_id = secrets.token_urlsafe(32)
session_data = {
'user_id': user_id,
'user_data': json.dumps(user_data),
'created_at': datetime.now().isoformat(),
'ip_address': '192.168.1.1',
'user_agent': 'Mozilla/5.0...'
}
# Store in Redis hash
r.hset(f'session:{session_id}', mapping=session_data)
r.expire(f'session:{session_id}', self.session_ttl)
# Track user sessions
r.sadd(f'user_sessions:{user_id}', session_id)
return session_id
def get_session(self, session_id):
"""Retrieve session data"""
session = r.hgetall(f'session:{session_id}')
if not session:
return None
# Refresh TTL on access
r.expire(f'session:{session_id}', self.session_ttl)
# Deserialize user data
session['user_data'] = json.loads(session['user_data'])
return session
def update_session(self, session_id, key, value):
"""Update session field"""
if r.hexists(f'session:{session_id}', 'user_id'):
r.hset(f'session:{session_id}', key, value)
r.expire(f'session:{session_id}', self.session_ttl)
return True
return False
def destroy_session(self, session_id):
"""Delete session"""
user_id = r.hget(f'session:{session_id}', 'user_id')
r.delete(f'session:{session_id}')
if user_id:
r.srem(f'user_sessions:{user_id}', session_id)
return True
def get_user_sessions(self, user_id):
"""Get all sessions for user"""
session_ids = r.smembers(f'user_sessions:{user_id}')
sessions = []
for sid in session_ids:
session = r.hgetall(f'session:{sid}')
if session:
sessions.append({'id': sid, 'data': session})
return sessions
def invalidate_user_sessions(self, user_id):
"""Logout user from all devices"""
session_ids = r.smembers(f'user_sessions:{user_id}')
for sid in session_ids:
r.delete(f'session:{sid}')
r.delete(f'user_sessions:{user_id}')
return True
# Usage
manager = RedisSessionManager()
# Create session
session_id = manager.create_session(
user_id='user_123',
user_data={'email': 'user@example.com', 'name': 'John'}
)
print(f"Session created: {session_id}")
# Retrieve session
session = manager.get_session(session_id)
print(f"Session data: {session}")
# Update session
manager.update_session(session_id, 'last_activity', datetime.now().isoformat())
# Get all user sessions
all_sessions = manager.get_user_sessions('user_123')
print(f"User has {len(all_sessions)} active sessions")
# Logout from all devices
manager.invalidate_user_sessions('user_123')
This example demonstrates:
- Session creation with unique tokens
- Automatic expiration using TTL
- Tracking multiple sessions per user
- Session refresh on access
- Single logout and multi-device logout
- JSON serialization for complex data
Redis Best Practices
Follow these guidelines for optimal Redis usage:
- Use connection pooling: Share a connection pool across your application
- Set appropriate TTLs: Prevent unbounded memory growth
- Monitor memory usage: Configure maxmemory and eviction policies
- Use pipelining: Batch multiple commands for better performance
- Implement error handling: Handle connection failures gracefully
- Use hashes for objects: More efficient than storing JSON strings
- Enable persistence: Use RDB or AOF for durability in production
- Encrypt sensitive data: Don't store passwords or tokens in plain text
FAQ
Q: Is Redis suitable for permanent data storage?
A: Redis is primarily for caching. Enable RDB or AOF persistence to save data to disk. For permanent data, use a traditional database alongside Redis.
Q: How much data can Redis store?
A: Redis capacity is limited by available RAM. Use Redis Cluster to distribute data across multiple nodes for larger datasets.
Q: Can Redis cluster across multiple machines?
A: Yes, Redis Cluster distributes data across nodes for scalability and high availability with automatic failover.
Q: What is the difference between Pub/Sub and task queues?
A: Pub/Sub broadcasts messages in real-time but loses undelivered messages. Queues store jobs persistently for durable processing. Choose based on your durability requirements.
Q: How do I secure Redis access?
A: Use password authentication, restrict network access via firewall, enable TLS encryption, use private networks, and implement ACLs. Never expose Redis to the internet.
Pipelining for Performance
Each redis-py command is a round-trip to the server. Doing 1,000 SETs takes 1,000 round-trips — at ~0.5ms each that's 500ms of pure latency. Pipelining batches commands into a single round-trip:
import redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
# Slow way — 1000 round-trips
for i in range(1000):
r.set(f"key:{i}", i)
# Fast way — 1 round-trip
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", i)
pipe.execute()
# 100x faster in practice
Use pipelines for bulk inserts, bulk reads, or any sequence of commands you want to send atomically. The transaction=True default wraps the pipeline in MULTI/EXEC so commands run as a unit; pass transaction=False when you just want batching without atomicity (faster).
Pub/Sub for Real-Time Messaging
Redis pub/sub turns Redis into a lightweight message bus. Producers publish to channels; subscribers receive messages in real time:
import redis
import threading
r = redis.Redis(decode_responses=True)
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe("notifications", "alerts")
for msg in pubsub.listen():
if msg["type"] == "message":
print(f"[{msg['channel']}] {msg['data']}")
threading.Thread(target=subscriber, daemon=True).start()
# Publish from anywhere
r.publish("notifications", "User 42 logged in")
r.publish("alerts", "Disk space below 10%")
import time; time.sleep(1)
Pub/sub is fire-and-forget — subscribers must be connected when the message is published. For durable queues use Redis Streams, RabbitMQ, or Kafka.
Caching Patterns with Redis
The most common Redis use case is caching expensive computations or database queries. Two patterns dominate:
Cache-aside: Check cache first, fall back to source, populate cache:
import json
import redis
r = redis.Redis(decode_responses=True)
def get_user(user_id: int) -> dict:
key = f"user:{user_id}"
cached = r.get(key)
if cached:
return json.loads(cached)
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
r.setex(key, 300, json.dumps(user)) # 5-min TTL
return user
Write-through: Update cache when you update source. Eliminates the stale-cache problem at the cost of slower writes.
def update_user(user_id: int, name: str):
db.execute("UPDATE users SET name = ? WHERE id = ?", name, user_id)
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
r.setex(f"user:{user_id}", 300, json.dumps(user))
Redis as a Queue: BLPOP / RPUSH
Lists make a serviceable queue. Producers RPUSH, consumers BLPOP (blocking pop with timeout):
import redis, json, time
r = redis.Redis(decode_responses=True)
# Producer
r.rpush("jobs", json.dumps({"task": "send_email", "to": "alice@x.com"}))
r.rpush("jobs", json.dumps({"task": "render_pdf", "doc_id": 99}))
# Consumer (in a worker process)
while True:
item = r.blpop(["jobs"], timeout=10)
if item is None: continue
queue, payload = item
job = json.loads(payload)
print(f"Processing {job['task']}")
For more features (retries, dead-letter queues, monitoring), reach for RQ, Celery, or Dramatiq. They build on Redis but add the operational layer you'll want past 100 jobs/sec.
Common Pitfalls
- Forgetting decode_responses. Without
decode_responses=True, redis-py returns bytes, not strings. Most code expects strings — pick a side at client creation, not per call. - No TTL on cache keys. A cache without expiration is a memory leak. Always
SETEXorSET ... EXrather than plainSET. - Using KEYS in production.
KEYS *blocks Redis on every key in the keyspace. UseSCANfor production traversal. - Single connection, many threads. Without a connection pool, threads serialize on the connection. Use
redis.ConnectionPoolorredis.Redis(connection_pool=pool). - Treating pub/sub as durable. Subscribers that disconnect lose messages. If you need durability, use Redis Streams (
XADD/XREAD) or a real broker.
FAQ
Q: redis-py or aioredis?
A: redis-py 4.2+ has built-in async support: redis.asyncio.Redis. aioredis was the legacy library; it's been merged into redis-py. Use the unified package.
Q: How big can a Redis value be?
A: 512MB per value. Practically, keep individual values under 1MB — anything bigger is a sign you should use object storage (S3) and cache the metadata.
Q: How do I persist Redis across restarts?
A: Two options — RDB (periodic snapshots) and AOF (append-only log). Default config uses both. For cache-only workloads, you can disable persistence entirely with save "".
Q: Redis vs Memcached?
A: Redis is richer (data structures, pub/sub, transactions, persistence). Memcached is simpler and slightly faster for the narrow get/set use case. Pick Redis for almost everything modern.
Q: Should I run Redis on my web server or separate it out?
A: Separate it once you have more than one web instance. Single-host setups can colocate Redis with the app server; multi-host setups need it on its own.
Wrapping Up
Redis is one of those infrastructure pieces that pays back tenfold once you have it running. Caching, queues, rate limiting, pub/sub, session storage, distributed locks — all in one tiny binary. Start with simple GET / SET against a managed Redis (Redis Cloud, ElastiCache, Upstash), graduate to pipelines and pub/sub when you need them. Don't over-engineer until the load actually demands it.