How To Use Celery for Background Tasks in Python
pip install celery redis, define tasks using the @app.task decorator, and call them with .delay() or .apply_async() for non-blocking execution. Use Redis or RabbitMQ as your message broker, and monitor tasks with Flower web interface.
Understanding Celery and Background Tasks
Celery is a powerful asynchronous task queue system for Python that enables you to execute long-running operations without blocking your main application. Instead of making users wait for resource-intensive tasks to complete, you can defer them to background workers that process jobs independently.
Background tasks are essential for modern web applications. Without them, uploading a large file, sending emails, or processing videos would freeze your user interface. Celery solves this by allowing tasks to run in separate processes and even on different machines.
Installing and Setting Up Celery with Redis
First, install Celery along with Redis, which serves as the message broker:
pip install celery redis
Next, install and run Redis server. On macOS with Homebrew:
brew install redis
redis-server
On Ubuntu/Debian:
sudo apt-get install redis-server
sudo systemctl start redis-server
Verify Redis is running by connecting with the Redis CLI:
redis-cli ping
# Output: PONG
Creating Your First Celery Application
Create a file called celery_app.py to define your Celery application and tasks:
from celery import Celery
# Create Celery instance
app = Celery('myapp')
# Configure Celery to use Redis as broker
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
# Define a simple task
@app.task
def add(x, y):
return x + y
# Define a task that takes longer
@app.task
def send_email(email, subject, body):
import time
time.sleep(2) # Simulate email sending delay
print(f"Email sent to {email}: {subject}")
return {"status": "sent", "email": email}
Calling Tasks Asynchronously
Now create a main application file that uses these tasks:
from celery_app import add, send_email
# Method 1: Using .delay() for simple calls
result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Task state: {result.state}")
# Method 2: Using .apply_async() for advanced options
result = send_email.apply_async(
args=('user@example.com', 'Hello', 'Welcome!'),
countdown=10 # Execute after 10 seconds
)
# Method 3: Get task result (blocking)
print(f"Result: {result.get()}") # Waits until task completes
# Method 4: Non-blocking result check
if result.ready():
print(f"Task completed: {result.result}")
else:
print("Task still processing...")
The .delay() method is shorthand for .apply_async() with immediate execution. For more control, use .apply_async() with parameters like:
countdown: Delay execution by N secondsexpires: Task expires after N seconds if not executedpriority: Task priority (higher numbers execute first)retry: Retry failed tasks
Starting Celery Workers
Celery workers are separate processes that consume and execute tasks from the queue. Start a worker with:
celery -A celery_app worker --loglevel=info
You should see output like:
-------------- celery@MacBook-Pro.local v5.3.0 (emerald-rush)
--- ***** -----
-- ******* ----
- *** --- * ---
- ** ---------- [config]
- ** ----------
- *** --- * --- celery@MacBook-Pro.local
-- ******* ---- Linux-5.15.0-1020-aws
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_app.add
. celery_app.send_email
[2024-01-15 10:30:45,123: WARNING/MainProcess] celery@MacBook-Pro.local ready.
For production, run multiple workers with concurrency:
celery -A celery_app worker --loglevel=info --concurrency=4
Task Chains, Groups, and Chords
Celery provides powerful constructs for composing complex workflows:
from celery import chain, group, chord
from celery_app import add
# Chain: Execute tasks sequentially
workflow = chain(add.s(2, 2), add.s(4), add.s(8))
result = workflow.apply_async()
print(result.get()) # Output: 16 (((2+2)+4)+8)
# Group: Execute tasks in parallel
job = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
result = job.apply_async()
print(result.get()) # Output: [4, 8, 16]
# Chord: Parallel tasks then callback
callback = add.s(0)
header = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
result = chord(header)(callback)
print(result.get()) # Output: 28 (sum of all)
These constructs allow you to:
- Chain: Pass results between tasks sequentially
- Group: Execute multiple tasks in parallel
- Chord: Run parallel tasks, then process combined results
Error Handling and Retries
Celery provides built-in retry mechanisms for fault tolerance:
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
import requests
app = Celery('myapp')
app.conf.broker_url = 'redis://localhost:6379/0'
@app.task(bind=True, max_retries=3)
def fetch_data(self, url):
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
except requests.ConnectionError as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except requests.Timeout:
if self.request.retries < self.max_retries:
raise self.retry(countdown=60)
else:
raise MaxRetriesExceededError()
# Call with automatic retries
result = fetch_data.delay('https://api.example.com/data')
Key retry features:
max_retries: Maximum number of retry attemptscountdown: Seconds to wait before retryingautoretry_for: Tuple of exception types to auto-retrybind=True: Gives task access toselffor retry logic
Periodic Tasks with Celery Beat
Schedule recurring tasks using Celery Beat:
from celery import Celery
from celery.schedules import crontab
import datetime
app = Celery('myapp')
app.conf.broker_url = 'redis://localhost:6379/0'
# Configure periodic tasks
app.conf.beat_schedule = {
'send-report-every-hour': {
'task': 'tasks.send_report',
'schedule': 3600.0, # Every 3600 seconds (1 hour)
},
'cleanup-database-daily': {
'task': 'tasks.cleanup_old_data',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
'sync-data-every-30-minutes': {
'task': 'tasks.sync_external_api',
'schedule': 1800.0, # Every 30 minutes
},
}
@app.task
def send_report():
print(f"Report generated at {datetime.datetime.now()}")
return "Report sent"
@app.task
def cleanup_old_data():
print("Cleaning up old database records")
# Delete old records
return "Cleanup complete"
@app.task
def sync_external_api():
print("Syncing with external API")
return "Sync complete"
Start Celery Beat scheduler:
celery -A celery_app beat --loglevel=info
Run both worker and beat together:
celery -A celery_app worker --beat --loglevel=info
Monitoring Tasks with Flower
Flower is a real-time web-based monitoring tool for Celery. Install and run it:
pip install flower
celery -A celery_app flower
Access Flower at http://localhost:5555 to see:
- Active tasks and their progress
- Worker status and capacity
- Task execution history and statistics
- Real-time task graphs
- Task failure details
You can also programmatically inspect tasks:
from celery.app.control import Inspect
app = Celery('myapp')
app.conf.broker_url = 'redis://localhost:6379/0'
# Get inspection object
i = Inspect(app=app)
# Get active tasks
active = i.active()
print(f"Active tasks: {active}")
# Get registered tasks
registered = i.registered()
print(f"Registered tasks: {registered}")
# Get worker stats
stats = i.stats()
print(f"Worker stats: {stats}")
Troubleshooting Common Celery Issues
| Issue | Cause | Solution |
|---|---|---|
| Worker not processing tasks | Worker not running or Redis connection failed | Verify Redis is running: redis-cli ping. Start worker: celery -A celery_app worker --loglevel=info |
| Task remains in PENDING state | Task never executed or worker crashed | Check worker logs, verify broker connection, ensure task is properly registered |
| Redis connection error | Redis not running or wrong connection string | Check Redis is running, verify broker_url in Celery config |
| Task results not stored | Result backend not configured | Set result_backend in Celery config to Redis or other backend |
| High memory usage | Too many tasks queued or workers consuming memory | Limit prefetch with --prefetch-multiplier=1, adjust concurrency |
| Tasks timing out | Task execution exceeds time limit | Increase time_limit or optimize task code |
Real-Life Example: Email Sending Queue
Here’s a production-ready example of an email sending queue system:
from celery import Celery, group
from celery.exceptions import MaxRetriesExceededError
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
app = Celery('email_app')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
)
@app.task(bind=True, max_retries=5)
def send_email_task(self, recipient, subject, body, html=None):
"""Send email with automatic retries"""
try:
sender = 'noreply@example.com'
password = 'your_app_password'
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = recipient
# Attach text and HTML versions
msg.attach(MIMEText(body, 'plain'))
if html:
msg.attach(MIMEText(html, 'html'))
# Send email
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
server.login(sender, password)
server.sendmail(sender, recipient, msg.as_string())
return {
'status': 'success',
'recipient': recipient,
'subject': subject
}
except smtplib.SMTPException as exc:
# Retry with exponential backoff
raise self.retry(
exc=exc,
countdown=min(600, 2 ** self.request.retries)
)
@app.task
def send_bulk_emails(recipients, subject, body):
"""Send emails to multiple recipients in parallel"""
job = group(
send_email_task.s(recipient, subject, body)
for recipient in recipients
)
result = job.apply_async()
return result.id
# Usage example
if __name__ == '__main__':
# Send single email
result = send_email_task.delay(
'user@example.com',
'Welcome!',
'Thanks for signing up!'
)
print(f"Task ID: {result.id}")
# Send bulk emails
recipients = [
'user1@example.com',
'user2@example.com',
'user3@example.com'
]
bulk_result = send_bulk_emails.delay(
recipients,
'Newsletter',
'Check out our latest updates!'
)
print(f"Bulk send task ID: {bulk_result.id}")
This example demonstrates:
- Email sending with automatic retries and exponential backoff
- Both single and bulk email sending
- Error handling for SMTP failures
- Parallel task execution for multiple recipients
- Task result tracking and retrieval
Best Practices for Celery in Production
When deploying Celery to production, follow these guidelines:
- Use a robust broker: RabbitMQ for mission-critical tasks, Redis for caching-heavy workloads
- Set result TTL: Configure
result_expiresto automatically clean up old results - Monitor workers: Use Flower or external monitoring to track worker health
- Use task routing: Route different tasks to different workers for better resource allocation
- Implement task timeout: Set
task_soft_time_limitandtask_time_limitto prevent hanging tasks - Version your API: Use
task_protocolto handle backward compatibility - Log comprehensively: Configure proper logging for debugging production issues
- Test retries: Verify retry logic works correctly before production deployment
FAQ
Q: What is the difference between Celery and threading in Python?
A: Celery is a distributed task queue system that runs tasks in separate processes and can scale across multiple machines, while threading runs tasks in the same process and is limited by Python’s GIL. Celery is better for I/O-bound and CPU-intensive tasks that require true parallelism.
Q: Can Celery run tasks on different machines?
A: Yes, Celery is designed for distributed computing. You can run workers on different servers, and they will all consume tasks from the same message broker for true scalability.
Q: What message brokers does Celery support?
A: Celery supports RabbitMQ, Redis, AWS SQS, and others. RabbitMQ is recommended for production, while Redis is suitable for development and lighter workloads.
Q: How do I handle task failures in Celery?
A: Use the @app.task decorator with max_retries and implement retry logic using self.retry(). You can also use error callbacks with apply_async() to execute functions when tasks fail.
Q: Is Celery suitable for real-time processing?
A: Celery is better for background tasks with some latency tolerance. For true real-time processing, consider message streams like Kafka alongside Celery.