Intermediate

Your web app sends a welcome email when users register. That email call takes three seconds. For those three seconds, the user’s browser sits waiting for the server to respond — and your server thread is blocked, unable to handle anyone else. The fix is a task queue: hand the email job off to a background worker and return a 200 response immediately. The user gets instant feedback, your server is free, and the email arrives a moment later.

Celery is the most widely used task queue library in the Python ecosystem. It runs background jobs in separate worker processes, supports scheduled tasks (cron-style), and retries failed jobs automatically. Redis is the most common broker for Celery — it acts as the message bus between your application and the worker processes. Install both with pip install celery redis. You will also need a running Redis server; docker run -d -p 6379:6379 redis:7-alpine spins one up in seconds.

This article covers setting up Celery with Redis, defining and calling tasks, checking task results, chaining tasks together, scheduling periodic jobs, and monitoring your workers. By the end you will have a working background processing system with retry logic and result tracking.

Celery with Redis: Quick Example

The smallest possible Celery setup defines one task and calls it asynchronously. Three files are all you need: a Celery configuration, a task definition, and a caller script.

# tasks.py
from celery import Celery

# Connect to Redis running on localhost:6379
app = Celery("demo", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def add(x: int, y: int) -> int:
    """Add two numbers in the background."""
    return x + y
# caller.py
from tasks import add

# Queue the task -- returns immediately
result = add.delay(4, 6)
print(f"Task ID: {result.id}")

# Wait for the result (up to 10 seconds)
value = result.get(timeout=10)
print(f"Result: {value}")

Start the worker in a terminal, then run caller.py in another:

# Terminal 1 -- start the worker
$ celery -A tasks worker --loglevel=info

# Terminal 2 -- call the task
$ python caller.py
Task ID: 3f8a2b1c-4d5e-6f7a-8b9c-0d1e2f3a4b5c
Result: 10

The @app.task decorator registers the function as a Celery task. Calling add.delay(4, 6) puts a message on the Redis broker queue and returns an AsyncResult object immediately — the calling process does not wait. The worker process picks up the message, runs the function, and stores the result back in Redis. result.get() fetches that stored value. The sections below show how to build on this for real application scenarios.

What Is Celery and When Should You Use It?

Celery is a distributed task queue. Your application (the producer) puts job descriptions onto a message broker (Redis or RabbitMQ). One or more worker processes read jobs from the broker, execute the underlying Python function, and optionally store the result in a result backend. The producer and workers run as separate processes, which means slow tasks cannot block your web server.

Use caseWithout CeleryWith Celery
Send welcome emailBlocks request for 3+ secReturns instantly, email queued
Generate PDF reportRequest times out for large docsBackground job, user polls for result
Nightly database cleanupCron job, hard to monitorCelery Beat schedule, tracked
Process uploaded imageServer memory spike per requestWorker pool handles burst gracefully
Call a slow third-party APICascading timeoutsIsolated failure, retried automatically

Celery is not always the right tool. If you only need to defer a single function call without result tracking, Python’s built-in concurrent.futures.ThreadPoolExecutor or asyncio may be simpler. Celery adds operational complexity — you now have Redis and worker processes to monitor and keep running. Choose Celery when you need durable queuing (jobs survive a worker restart), retry logic, scheduling, or multiple worker machines.

Configuring Celery with a Config Object

Hard-coding broker URLs inside tasks.py makes configuration difficult to change between environments. The production pattern is to create a dedicated configuration object and load it with app.config_from_object(), keeping all settings in one place.

# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/0"

# Serialise tasks as JSON (not pickle) for security
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]

# Expire results after 1 hour
result_expires = 3600

# Maximum retries for failed tasks
task_max_retries = 3

# Time limit per task (seconds)
task_soft_time_limit = 300
task_time_limit = 360
# app.py
from celery import Celery

celery_app = Celery("myproject")
celery_app.config_from_object("celery_config")

@celery_app.task(bind=True, max_retries=3)
def send_email(self, to_address: str, subject: str, body: str) -> dict:
    """Send an email, retrying up to 3 times on failure."""
    try:
        # Simulate calling an email API
        print(f"Sending email to {to_address}: {subject}")
        # In production: smtplib.sendmail() or requests.post(email_api_url, ...)
        return {"status": "sent", "to": to_address}
    except Exception as exc:
        # Retry with exponential backoff: 60s, 120s, 240s
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Output (worker log):

[2026-05-21 09:00:01,234: INFO/ForkPoolWorker-1] Task app.send_email[a1b2c3] received
Sending email to user@example.org: Welcome!
[2026-05-21 09:00:01,289: INFO/ForkPoolWorker-1] Task app.send_email[a1b2c3] succeeded: {'status': 'sent', 'to': 'user@example.org'}

The bind=True argument gives the task function access to self, which is the task instance. This is required for self.retry(). The retry call re-queues the task with a delay computed by countdown and increments self.request.retries. After max_retries failures, Celery raises the original exception rather than retrying again.

Checking Task Status and Results

When a task is queued from a web request, you typically return the task ID to the client so they can poll for the result. The AsyncResult class provides a clean interface for checking status without blocking the server.

# result_tracking.py
from celery import Celery
from celery.result import AsyncResult
import time

app = Celery("tracking", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def slow_report(rows: int) -> dict:
    """Simulate a slow report generation."""
    time.sleep(2)
    return {"rows_processed": rows, "status": "complete"}

def check_result(task_id: str) -> None:
    result = AsyncResult(task_id, app=app)

    states = {
        "PENDING":  "Task is waiting to be picked up",
        "STARTED":  "Task is currently running",
        "SUCCESS":  "Task completed",
        "FAILURE":  "Task failed",
        "RETRY":    "Task is being retried",
        "REVOKED":  "Task was cancelled",
    }
    print(f"State: {result.state} -- {states.get(result.state, 'Unknown')}")

    if result.state == "SUCCESS":
        print(f"Result: {result.result}")
    elif result.state == "FAILURE":
        print(f"Error: {result.result}")
        print(f"Traceback: {result.traceback}")

if __name__ == "__main__":
    task = slow_report.delay(10000)
    print(f"Queued task: {task.id}")

    for _ in range(5):
        check_result(task.id)
        time.sleep(0.5)

    final = task.get(timeout=10)
    print(f"Final: {final}")

Output:

Queued task: 7f8a9b0c-1d2e-3f4a-5b6c-7d8e9f0a1b2c
State: PENDING -- Task is waiting to be picked up
State: STARTED -- Task is currently running
State: STARTED -- Task is currently running
State: STARTED -- Task is currently running
State: SUCCESS -- Task completed
Final: {'rows_processed': 10000, 'status': 'complete'}

In a web application, you would return {"task_id": task.id} from a POST endpoint and expose a GET endpoint like /tasks/{task_id} that calls check_result() and returns the current state. The client polls this endpoint until the state is SUCCESS or FAILURE. This pattern keeps web responses fast while still giving users visibility into long-running operations.

Scheduling Periodic Tasks with Celery Beat

Celery Beat is a scheduler that sends tasks to the worker queue on a defined interval. It runs as a separate process alongside your workers and replaces cron jobs for Python applications, with the advantage that schedules are defined in Python code and are visible in the same place as your task definitions.

# schedule_app.py
from celery import Celery
from celery.schedules import crontab

app = Celery("scheduler", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

app.conf.beat_schedule = {
    # Run every 30 seconds
    "health-check-every-30s": {
        "task": "schedule_app.health_check",
        "schedule": 30.0,
    },
    # Run every day at 08:00
    "daily-report-8am": {
        "task": "schedule_app.generate_daily_report",
        "schedule": crontab(hour=8, minute=0),
        "kwargs": {"recipients": ["admin@company.com"]},
    },
    # Run every Monday at midnight
    "weekly-cleanup": {
        "task": "schedule_app.cleanup_old_records",
        "schedule": crontab(day_of_week=0, hour=0, minute=0),
    },
}
app.conf.timezone = "UTC"

@app.task
def health_check() -> str:
    print("Health check OK")
    return "OK"

@app.task
def generate_daily_report(recipients: list[str]) -> dict:
    print(f"Generating report for {recipients}")
    return {"status": "sent", "recipients": recipients}

@app.task
def cleanup_old_records() -> int:
    print("Cleaning up records older than 90 days...")
    return 0  # In production: DELETE FROM records WHERE created < NOW() - INTERVAL '90 days'

Start Beat and the worker in separate terminals:

# Terminal 1 -- worker
$ celery -A schedule_app worker --loglevel=info

# Terminal 2 -- beat scheduler
$ celery -A schedule_app beat --loglevel=info

# Beat output:
[2026-05-21 08:00:00,001: INFO/MainProcess] Scheduler: Sending due task daily-report-8am

The crontab() helper accepts the same fields as a Unix crontab entry -- hour, minute, day_of_week, day_of_month, month_of_year -- with * meaning "every". Always set app.conf.timezone explicitly; if left as the default it uses the server's local timezone, which causes confusing behaviour when servers are in different timezones or when daylight saving changes the offset.

Real-Life Example: Image Processing Pipeline

The following example shows a realistic Celery pipeline that processes an uploaded image: it resizes it, generates a thumbnail, and sends a completion notification -- three tasks chained together so each step waits for the previous one to finish.

# pipeline.py
from celery import Celery, chain
import os
import time

app = Celery("pipeline", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def resize_image(filepath: str, width: int = 800) -> dict:
    """Resize an image to the given width (simulated)."""
    print(f"Resizing {filepath} to {width}px wide...")
    time.sleep(1)  # In production: use Pillow Image.resize()
    resized_path = filepath.replace(".", f"_{width}.")
    return {"original": filepath, "resized": resized_path, "width": width}

@app.task
def create_thumbnail(resize_result: dict, thumb_size: int = 150) -> dict:
    """Create a thumbnail from the resized image."""
    resized_path = resize_result["resized"]
    print(f"Creating {thumb_size}px thumbnail from {resized_path}...")
    time.sleep(0.5)  # In production: use Pillow Image.thumbnail()
    thumb_path = resized_path.replace(".", "_thumb.")
    return {**resize_result, "thumbnail": thumb_path, "thumb_size": thumb_size}

@app.task
def notify_complete(pipeline_result: dict, user_id: int) -> str:
    """Send a completion notification to the user."""
    msg = (
        f"User {user_id}: image processing complete. "
        f"Resized: {pipeline_result['resized']}, "
        f"Thumbnail: {pipeline_result['thumbnail']}"
    )
    print(f"Notifying: {msg}")
    return msg

def process_uploaded_image(filepath: str, user_id: int):
    """Chain the three tasks: resize -> thumbnail -> notify."""
    workflow = chain(
        resize_image.s(filepath, width=1200),
        create_thumbnail.s(thumb_size=150),
        notify_complete.s(user_id=user_id),
    )
    result = workflow.apply_async()
    print(f"Pipeline queued: {result.id}")
    return result.id

if __name__ == "__main__":
    task_id = process_uploaded_image("uploads/photo.jpg", user_id=42)
    final = app.AsyncResult(task_id).get(timeout=30)
    print(f"Pipeline complete: {final}")

Output:

Pipeline queued: 9a8b7c6d-5e4f-3a2b-1c0d-9e8f7a6b5c4d
Resizing uploads/photo.jpg to 1200px wide...
Creating 150px thumbnail from uploads/photo_1200.jpg...
Notifying: User 42: image processing complete. Resized: uploads/photo_1200.jpg, Thumbnail: uploads/photo_1200_thumb.jpg
Pipeline complete: User 42: image processing complete...

The chain() primitive creates a workflow where each task's return value is automatically passed as the first argument to the next task. The .s() method creates a task "signature" -- a serialisable description of a task call that can be passed around and composed with other signatures. You can extend this pipeline by adding error handling with link_error, parallel processing with group(), or conditional branching with chord().

Frequently Asked Questions

Should I use Redis or RabbitMQ as the Celery broker?

Redis is simpler to set up and is the right choice for most projects. It doubles as both the broker and the result backend, requires minimal configuration, and handles thousands of tasks per second. RabbitMQ is better suited for very high message volumes, complex routing (fanout, topic exchanges), or systems where message durability guarantees are critical. For a typical web application, start with Redis -- you can migrate to RabbitMQ later if you hit its limits.

How many worker processes should I run?

Celery defaults to one worker process per CPU core (using the prefork pool). For CPU-bound tasks (image processing, data crunching), this is usually optimal. For IO-bound tasks (API calls, database queries), use the --concurrency flag to run more workers than cores: celery worker --concurrency=20 --pool=gevent with the gevent library can run hundreds of concurrent IO-bound tasks on a single machine.

What happens when a Celery task fails?

By default, a failed task (one that raises an unhandled exception) is marked as FAILURE and the exception is stored in the result backend. The task is not retried automatically unless you call self.retry() inside the task body or set autoretry_for on the task decorator: @app.task(autoretry_for=(RequestException,), max_retries=3, default_retry_delay=60). Always set a retry limit -- without one, a persistently failing task will retry forever.

How do I monitor Celery workers in production?

Flower is the standard monitoring tool for Celery: pip install flower, then celery -A myapp flower --port=5555. It provides a web dashboard showing active workers, queued tasks, task history, and success/failure rates. For production systems, integrate Celery with your existing observability stack by using Celery's signal hooks (task_prerun, task_postrun, task_failure) to emit metrics to Prometheus or log structured events to your log aggregator.

How do I integrate Celery with Django?

Create a celery.py file in your Django project's root package, set os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings'), and configure the app with app.config_from_object('django.conf:settings', namespace='CELERY'). Add CELERY_BROKER_URL and CELERY_RESULT_BACKEND to your Django settings. In your app's __init__.py, import the Celery app to ensure tasks are discovered on startup. The official Django-Celery integration guide at docs.celeryq.dev covers the exact file structure.

Conclusion

Celery with Redis transforms slow, blocking operations into fast, asynchronous background jobs. You have seen how to define tasks with @app.task, call them with .delay(), track results with AsyncResult, configure automatic retries, schedule periodic jobs with Celery Beat, and chain multiple tasks into a pipeline with chain(). These primitives cover the vast majority of real-world background processing needs.

The image processing pipeline in the real-life example is a natural starting point to extend: add a group() to process multiple uploaded images in parallel, use a chord() to aggregate results when all parallel tasks complete, or connect to a real email API in the notification step. Each extension follows the same .s() signature pattern you have already seen.

For the complete Celery API reference including canvas primitives, beat schedulers, and monitoring integrations, see the official Celery documentation.