How To Connect Python to Redis for Caching and Queues
pip install redis, then connect with r = redis.Redis(host='localhost', port=6379, db=0). Use string operations like r.set() and r.get() for caching, list operations for queues, and pub/sub for real-time messaging. Redis data expires automatically using TTL settings.
Understanding Redis and Its Use Cases
Redis is an open-source, in-memory data structure store that operates at extremely high speeds. Unlike traditional databases that store data on disk, Redis keeps everything in RAM, making it ideal for applications requiring sub-millisecond response times.
Redis is particularly useful for:
- Caching: Store frequently accessed data to reduce database load
- Sessions: Store user sessions for web applications
- Task Queues: Implement job queues with multiple workers
- Pub/Sub Messaging: Build real-time messaging systems
- Leaderboards: Track scores and rankings efficiently
- Rate Limiting: Implement API rate limiting
Installing Redis and redis-py
First, install the Redis server. On macOS using Homebrew:
brew install redis
brew services start redis
On Ubuntu/Debian:
sudo apt-get install redis-server
sudo systemctl start redis-server
Next, install the Python redis library:
pip install redis
Verify Redis is running:
redis-cli ping
# Output: PONG
Check Redis version and info:
redis-cli --version
redis-cli info server
Connecting to Redis from Python
Create a basic connection to Redis:
import redis
# Connect to local Redis instance
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # Decode bytes to strings
)
# Test the connection
print(r.ping()) # Output: True
print(r.echo('Hello Redis!')) # Output: Hello Redis!
For production with password authentication and connection pooling:
import redis
from redis import ConnectionPool
# Create connection pool for better performance
pool = ConnectionPool(
host='redis.example.com',
port=6379,
db=0,
password='your_password',
max_connections=50,
decode_responses=True
)
# Create Redis client from pool
r = redis.Redis(connection_pool=pool)
# Or use URL connection string
r = redis.from_url(
'redis://:password@redis.example.com:6379/0',
decode_responses=True
)
String Operations for Caching
Strings are the simplest Redis data type, perfect for caching:
import redis
import json
from datetime import timedelta
r = redis.Redis(decode_responses=True)
# SET and GET operations
r.set('user:1:name', 'John Doe')
name = r.get('user:1:name')
print(name) # Output: John Doe
# SET with expiration
r.set('session:abc123', 'user_data', ex=3600) # Expires in 1 hour
# GET with fallback
user_data = r.get('user:2:name')
if not user_data:
print("User not in cache, fetch from database")
user_data = 'Jane Smith'
r.set('user:2:name', user_data, ex=3600)
# Cache JSON data
user_dict = {'id': 3, 'name': 'Bob', 'email': 'bob@example.com'}
r.set('user:3', json.dumps(user_dict), ex=7200)
cached_user = json.loads(r.get('user:3'))
print(cached_user) # Output: {'id': 3, 'name': 'Bob', 'email': 'bob@example.com'}
# Multiple operations
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
values = r.mget(['key1', 'key2', 'key3'])
print(values) # Output: ['value1', 'value2', 'value3']
# Atomic increment/decrement
r.set('page_views', 100)
r.incr('page_views') # Increment by 1
r.incrby('page_views', 5) # Increment by 5
r.decr('page_views') # Decrement by 1
print(r.get('page_views')) # Output: 105
Hash Operations for Complex Data
Hashes store multiple fields under a single key, ideal for object data:
import redis
r = redis.Redis(decode_responses=True)
# Store user object as hash
r.hset('user:100', mapping={
'name': 'Alice Johnson',
'email': 'alice@example.com',
'age': '28',
'city': 'New York'
})
# Get single field
email = r.hget('user:100', 'email')
print(email) # Output: alice@example.com
# Get all fields
user_data = r.hgetall('user:100')
print(user_data)
# Output: {'name': 'Alice Johnson', 'email': 'alice@example.com', 'age': '28', 'city': 'New York'}
# Get multiple fields
info = r.hmget('user:100', ['name', 'email'])
print(info) # Output: ['Alice Johnson', 'alice@example.com']
# Update single field
r.hset('user:100', 'age', '29')
# Increment hash field
r.hset('product:1', mapping={
'name': 'Laptop',
'price': '999.99',
'stock': '50'
})
r.hincrbyfloat('product:1', 'price', 100.00) # Increase price
print(r.hget('product:1', 'price')) # Output: 1099.99
# Check if field exists
exists = r.hexists('user:100', 'city')
print(exists) # Output: 1 (True)
# Get all field names
fields = r.hkeys('user:100')
print(fields) # Output: ['name', 'email', 'age', 'city']
# Get all values
values = r.hvals('user:100')
print(values) # Output: ['Alice Johnson', 'alice@example.com', '29', 'New York']
List Operations for Queues
Lists are perfect for implementing FIFO queues and job processing:
import redis
import json
r = redis.Redis(decode_responses=True)
# Push items to queue (FIFO)
r.rpush('tasks:queue', 'task1', 'task2', 'task3')
# Get queue length
queue_length = r.llen('tasks:queue')
print(f"Queue has {queue_length} tasks") # Output: Queue has 3 tasks
# Pop item from queue (blocking, 0 = no timeout)
task = r.blpop('tasks:queue', timeout=0)
print(task) # Output: ('tasks:queue', 'task1')
# Process with worker pattern
def worker():
while True:
# Block until task available
result = r.blpop('tasks:queue', timeout=1)
if result:
queue_name, task = result
print(f"Processing: {task}")
# Process task here
else:
print("No tasks, waiting...")
# Get all items without removing
all_tasks = r.lrange('tasks:queue', 0, -1)
print(all_tasks) # Output: ['task2', 'task3']
# Push email to processing queue
email_tasks = [
json.dumps({'to': 'user1@example.com', 'subject': 'Welcome'}),
json.dumps({'to': 'user2@example.com', 'subject': 'Newsletter'}),
json.dumps({'to': 'user3@example.com', 'subject': 'Alert'})
]
for email_task in email_tasks:
r.rpush('email:queue', email_task)
# Consumer processes emails
while r.llen('email:queue') > 0:
email_json = r.lpop('email:queue')
email = json.loads(email_json)
print(f"Sending email to {email['to']}: {email['subject']}")
Set Operations for Unique Data
Sets store unique values and are efficient for membership testing:
import redis
r = redis.Redis(decode_responses=True)
# Add items to set
r.sadd('online_users', 'user1', 'user2', 'user3', 'user4')
# Check membership
is_online = r.sismember('online_users', 'user1')
print(is_online) # Output: 1 (True)
# Get all members
users = r.smembers('online_users')
print(users) # Output: {'user1', 'user2', 'user3', 'user4'}
# Get set cardinality (size)
online_count = r.scard('online_users')
print(f"Online users: {online_count}") # Output: Online users: 4
# Remove items
r.srem('online_users', 'user2')
# Set operations for user tags
r.sadd('interests:user1', 'python', 'javascript', 'databases')
r.sadd('interests:user2', 'python', 'django', 'web')
# Intersection: common interests
common = r.sinter('interests:user1', 'interests:user2')
print(f"Common interests: {common}") # Output: Common interests: {'python'}
# Union: all interests
all_interests = r.sunion('interests:user1', 'interests:user2')
print(all_interests)
# Difference: unique to user1
unique_to_user1 = r.sdiff('interests:user1', 'interests:user2')
print(unique_to_user1)
Pub/Sub Messaging for Real-Time Communication
Publish/Subscribe pattern enables real-time messaging between applications:
import redis
import threading
import time
# Publisher
def publisher():
r = redis.Redis(decode_responses=True)
for i in range(5):
message = f"Message {i+1}"
r.publish('chat:room1', message)
print(f"Published: {message}")
time.sleep(1)
# Subscriber
def subscriber():
r = redis.Redis(decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe('chat:room1')
print("Listening for messages...")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
# Run publisher and subscriber in threads
publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)
subscriber_thread.start()
time.sleep(1) # Let subscriber start first
publisher_thread.start()
publisher_thread.join()
subscriber_thread.join(timeout=10)
Pattern-based subscriptions:
import redis
r = redis.Redis(decode_responses=True)
pubsub = r.pubsub()
# Subscribe to pattern
pubsub.psubscribe('notifications:*')
# Messages will match notifications:user1, notifications:user2, etc.
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Pattern: {message['pattern']}")
print(f"Channel: {message['channel']}")
print(f"Message: {message['data']}")
Expiration and Time-To-Live (TTL)
Redis automatically deletes data when TTL expires:
import redis
import time
r = redis.Redis(decode_responses=True)
# Set with expiration in seconds
r.set('temp_session', 'session_data', ex=60)
# Set with expiration at specific timestamp
import datetime
expire_at = datetime.datetime.now() + datetime.timedelta(hours=1)
r.expireat('temp_session', expire_at)
# Set expiration on existing key
r.set('api_token', 'token_123')
r.expire('api_token', 3600) # Expire in 1 hour
# Get remaining TTL
ttl = r.ttl('api_token')
print(f"TTL: {ttl} seconds") # Output: TTL: 3599 seconds
# Get TTL in milliseconds
pttl = r.pttl('api_token')
print(f"PTTL: {pttl} ms")
# Make key persistent (remove expiration)
r.persist('api_token')
# Check if key has expiration
ttl = r.ttl('api_token')
print(ttl) # Output: -1 (no expiration)
# Sliding window rate limiting
def rate_limit(user_id, max_requests=10, window=60):
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= max_requests
# Test rate limiting
for i in range(15):
allowed = rate_limit('user_123', max_requests=10, window=60)
print(f"Request {i+1}: {'Allowed' if allowed else 'Blocked'}")
Troubleshooting Common Redis Issues
| Issue | Cause | Solution |
|---|---|---|
| Connection refused error | Redis server not running | Start Redis: redis-server or brew services start redis |
| Slow performance | Memory full or eviction policy misconfigured | Check memory: redis-cli info memory. Adjust maxmemory policy |
| Data loss on restart | RDB persistence not enabled | Enable RDB in redis.conf or use BGSAVE |
| Memory usage increasing | Keys not expiring or memory leaks | Check TTL on keys, implement proper expiration policy |
| Network timeouts | Connection pool exhausted | Increase max_connections or reduce concurrent requests |
| High CPU usage | Complex operations or too many clients | Optimize operations, limit client connections |
Real-Life Example: Session Caching for Web Apps
Here's a complete session management system using Redis:
import redis
import json
import secrets
from datetime import datetime, timedelta
class RedisSessionManager:
def __init__(self, redis_host='localhost', redis_port=6379):
self.r = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.session_ttl = 3600 # 1 hour
def create_session(self, user_id, user_data):
"""Create new session for user"""
session_id = secrets.token_urlsafe(32)
session_data = {
'user_id': user_id,
'user_data': json.dumps(user_data),
'created_at': datetime.now().isoformat(),
'ip_address': '192.168.1.1',
'user_agent': 'Mozilla/5.0...'
}
# Store in Redis hash
r.hset(f'session:{session_id}', mapping=session_data)
r.expire(f'session:{session_id}', self.session_ttl)
# Track user sessions
r.sadd(f'user_sessions:{user_id}', session_id)
return session_id
def get_session(self, session_id):
"""Retrieve session data"""
session = r.hgetall(f'session:{session_id}')
if not session:
return None
# Refresh TTL on access
r.expire(f'session:{session_id}', self.session_ttl)
# Deserialize user data
session['user_data'] = json.loads(session['user_data'])
return session
def update_session(self, session_id, key, value):
"""Update session field"""
if r.hexists(f'session:{session_id}', 'user_id'):
r.hset(f'session:{session_id}', key, value)
r.expire(f'session:{session_id}', self.session_ttl)
return True
return False
def destroy_session(self, session_id):
"""Delete session"""
user_id = r.hget(f'session:{session_id}', 'user_id')
r.delete(f'session:{session_id}')
if user_id:
r.srem(f'user_sessions:{user_id}', session_id)
return True
def get_user_sessions(self, user_id):
"""Get all sessions for user"""
session_ids = r.smembers(f'user_sessions:{user_id}')
sessions = []
for sid in session_ids:
session = r.hgetall(f'session:{sid}')
if session:
sessions.append({'id': sid, 'data': session})
return sessions
def invalidate_user_sessions(self, user_id):
"""Logout user from all devices"""
session_ids = r.smembers(f'user_sessions:{user_id}')
for sid in session_ids:
r.delete(f'session:{sid}')
r.delete(f'user_sessions:{user_id}')
return True
# Usage
manager = RedisSessionManager()
# Create session
session_id = manager.create_session(
user_id='user_123',
user_data={'email': 'user@example.com', 'name': 'John'}
)
print(f"Session created: {session_id}")
# Retrieve session
session = manager.get_session(session_id)
print(f"Session data: {session}")
# Update session
manager.update_session(session_id, 'last_activity', datetime.now().isoformat())
# Get all user sessions
all_sessions = manager.get_user_sessions('user_123')
print(f"User has {len(all_sessions)} active sessions")
# Logout from all devices
manager.invalidate_user_sessions('user_123')
This example demonstrates:
- Session creation with unique tokens
- Automatic expiration using TTL
- Tracking multiple sessions per user
- Session refresh on access
- Single logout and multi-device logout
- JSON serialization for complex data
Redis Best Practices
Follow these guidelines for optimal Redis usage:
- Use connection pooling: Share a connection pool across your application
- Set appropriate TTLs: Prevent unbounded memory growth
- Monitor memory usage: Configure maxmemory and eviction policies
- Use pipelining: Batch multiple commands for better performance
- Implement error handling: Handle connection failures gracefully
- Use hashes for objects: More efficient than storing JSON strings
- Enable persistence: Use RDB or AOF for durability in production
- Encrypt sensitive data: Don't store passwords or tokens in plain text
FAQ
Q: Is Redis suitable for permanent data storage?
A: Redis is primarily for caching. Enable RDB or AOF persistence to save data to disk. For permanent data, use a traditional database alongside Redis.
Q: How much data can Redis store?
A: Redis capacity is limited by available RAM. Use Redis Cluster to distribute data across multiple nodes for larger datasets.
Q: Can Redis cluster across multiple machines?
A: Yes, Redis Cluster distributes data across nodes for scalability and high availability with automatic failover.
Q: What is the difference between Pub/Sub and task queues?
A: Pub/Sub broadcasts messages in real-time but loses undelivered messages. Queues store jobs persistently for durable processing. Choose based on your durability requirements.
Q: How do I secure Redis access?
A: Use password authentication, restrict network access via firewall, enable TLS encryption, use private networks, and implement ACLs. Never expose Redis to the internet.