Intermediate
Every application eventually needs to run tasks on a schedule: send a daily email digest, clean up expired sessions at midnight, poll an API every five minutes, generate a weekly report every Monday at 8am. You could set up a cron job on the server, but then your schedule logic lives outside your Python code in a system-specific format. APScheduler solves this by bringing the scheduler directly into your Python application — no cron, no separate process, no external dependencies.
APScheduler (Advanced Python Scheduler) is a lightweight, flexible scheduling library that runs inside your Python process. It supports three types of triggers — interval (run every N seconds), cron (run on a schedule like a crontab), and date (run once at a specific time) — and it can persist jobs across restarts using SQLite, PostgreSQL, Redis, and other backends. It works with any Python application: standalone scripts, Flask apps, FastAPI services, or Django projects.
In this article you will learn how to install and configure APScheduler, add jobs with each trigger type, handle errors gracefully, use job stores for persistence, and build a real-world task scheduler for a web application. By the end you will know how to replace your cron jobs with Python-native scheduling.
APScheduler Quick Example
Here is the minimal setup — a job that runs every 10 seconds:
# apscheduler_quick.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
def tick():
print(f"Tick! The time is: {datetime.now().strftime('%H:%M:%S')}")
scheduler = BackgroundScheduler()
scheduler.add_job(tick, "interval", seconds=10)
scheduler.start()
print("Scheduler running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
print("Scheduler stopped.")
Output:
Scheduler running. Press Ctrl+C to stop.
Tick! The time is: 09:15:10
Tick! The time is: 09:15:20
Tick! The time is: 09:15:30
The key concepts: a BackgroundScheduler runs in a background thread so it does not block your main program. The add_job() call registers a function and its trigger. The scheduler.start() call launches the background thread. This pattern works identically in Flask, FastAPI, or any other framework — keep reading for the full details.
What Is APScheduler and When To Use It?
APScheduler is a job scheduling library that wraps Python’s threading and asyncio mechanisms with a high-level interface for managing scheduled tasks. It is the right tool when your tasks need to live in the same Python process as your application — for example, when they need to access application state, database sessions, or configuration that would be awkward to share with an external cron process.
| Approach | Best For | Drawbacks |
|---|---|---|
| APScheduler | In-process tasks, Python apps, simple deployments | Jobs stop if process dies |
| System cron | System-level tasks, shell scripts | Schedule lives outside code, no persistence |
| Celery Beat | Distributed, high-volume, multiple workers | Requires Redis/RabbitMQ, complex setup |
| Rq-scheduler | Redis-backed queues, simpler than Celery | Requires Redis |
Installing APScheduler
Install the base package with pip:
# terminal
pip install apscheduler
For SQLAlchemy-based job persistence (recommended for production):
# terminal
pip install apscheduler sqlalchemy
Verify the install:
# verify_apscheduler.py
import apscheduler
print(apscheduler.__version__)
Output:
3.10.4
The Three Trigger Types
Interval Trigger
Run a job every N seconds, minutes, hours, days, or weeks. Use this for polling, cleanup, and any task that should repeat on a fixed cycle:
# interval_trigger.py
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def cleanup_temp_files():
print(f"[{datetime.now():%H:%M:%S}] Cleaning temp files...")
# Your cleanup logic here
def poll_api():
print(f"[{datetime.now():%H:%M:%S}] Polling external API...")
# Your API call here
scheduler = BlockingScheduler()
# Run cleanup every 6 hours
scheduler.add_job(cleanup_temp_files, "interval", hours=6)
# Poll every 30 seconds
scheduler.add_job(poll_api, "interval", seconds=30, id="api_poll")
scheduler.start()
The BlockingScheduler blocks the main thread — use it for scripts where scheduling is the only purpose. Use BackgroundScheduler when you need to do other work in the main thread simultaneously.
Cron Trigger
Run a job on a cron-style schedule — specific times of day, days of week, or days of month. APScheduler’s cron trigger supports all standard cron fields:
# cron_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
def daily_report():
print(f"Generating daily report at {datetime.now():%Y-%m-%d %H:%M:%S}")
def weekly_cleanup():
print(f"Running weekly cleanup at {datetime.now():%Y-%m-%d %H:%M:%S}")
def workday_digest():
print(f"Sending workday digest at {datetime.now():%Y-%m-%d %H:%M:%S}")
scheduler = BackgroundScheduler()
# Every day at 8:00 AM
scheduler.add_job(daily_report, "cron", hour=8, minute=0)
# Every Monday at midnight
scheduler.add_job(weekly_cleanup, "cron", day_of_week="mon", hour=0, minute=0)
# Weekdays (Mon-Fri) at 5:30 PM
scheduler.add_job(workday_digest, "cron", day_of_week="mon-fri", hour=17, minute=30)
scheduler.start()
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
scheduler.shutdown()
The cron trigger accepts the same fields as standard cron: second, minute, hour, day, month, day_of_week, and year. You can also pass a standard cron expression string using CronTrigger.from_crontab("0 8 * * *").
Date Trigger
Run a job once at a specific date and time — useful for delayed tasks, one-off notifications, or deferred processing:
# date_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import time
def send_reminder(username, message):
print(f"Reminder for {username}: {message} at {datetime.now():%H:%M:%S}")
scheduler = BackgroundScheduler()
# Run once 30 seconds from now
run_at = datetime.now() + timedelta(seconds=30)
scheduler.add_job(send_reminder, "date", run_date=run_at,
args=["alice", "Team standup in 5 minutes"])
# Run at a specific datetime
scheduler.add_job(send_reminder, "date",
run_date="2026-05-03 14:00:00",
kwargs={"username": "bob", "message": "Quarterly report due"})
scheduler.start()
try:
time.sleep(60)
scheduler.shutdown()
except KeyboardInterrupt:
scheduler.shutdown()
Output:
Reminder for alice: Team standup in 5 minutes at 09:15:40
The args parameter passes positional arguments to the job function; kwargs passes keyword arguments. Both work with all three trigger types, making it easy to parameterize your jobs.
Error Handling and Listeners
By default, APScheduler logs exceptions but does not crash the scheduler. You can attach event listeners to take action when jobs succeed, fail, or are missed:
# apscheduler_events.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
import logging
import time
logging.basicConfig(level=logging.INFO)
def flaky_job():
import random
if random.random() < 0.4:
raise ValueError("Simulated job failure")
print("Job succeeded!")
def job_listener(event):
if event.exception:
print(f"Job {event.job_id} FAILED: {event.exception}")
elif hasattr(event, 'retval'):
print(f"Job {event.job_id} succeeded")
def missed_listener(event):
print(f"Job {event.job_id} was MISSED (scheduler may have been busy)")
scheduler = BackgroundScheduler()
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_listener(missed_listener, EVENT_JOB_MISSED)
scheduler.add_job(flaky_job, "interval", seconds=5, id="flaky")
scheduler.start()
try:
time.sleep(30)
scheduler.shutdown()
except KeyboardInterrupt:
scheduler.shutdown()
Output:
Job flaky succeeded
Job flaky FAILED: Simulated job failure
Job flaky succeeded
Job flaky succeeded
Job flaky FAILED: Simulated job failure
Event listeners are the right place to add alerting, dead-letter queuing, or retry logic. The EVENT_JOB_MISSED event fires when a job's scheduled time passes before the previous run finishes -- which is a sign your job is taking longer than its interval and you should increase the interval or optimize the job.
Job Persistence with SQLAlchemy
By default, jobs are stored in memory and are lost when the process restarts. For production applications, use the SQLAlchemy job store to persist jobs in a database:
# apscheduler_persistent.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime
import time
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}
executors = {
"default": ThreadPoolExecutor(10)
}
job_defaults = {
"coalesce": True, # merge missed runs into one
"max_instances": 1 # only one instance at a time
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults
)
def hourly_sync():
print(f"[{datetime.now():%H:%M:%S}] Syncing database...")
# add_job with replace_existing=True avoids duplicate jobs on restart
scheduler.add_job(hourly_sync, "interval", hours=1, id="db_sync",
replace_existing=True)
scheduler.start()
print("Scheduler with SQLite persistence running...")
try:
time.sleep(5)
print(f"Jobs: {scheduler.get_jobs()}")
scheduler.shutdown()
except KeyboardInterrupt:
scheduler.shutdown()
Output:
Scheduler with SQLite persistence running...
Jobs: [<Job (id=db_sync name=hourly_sync)>]
The coalesce=True setting merges any missed runs into a single catch-up run -- useful when your process was down during several scheduled intervals. The max_instances=1 setting prevents overlapping runs if a job takes longer than its interval. Always use replace_existing=True when adding jobs that should survive process restarts.
Real-Life Example: Flask App with Background Jobs
Here is a complete Flask application that uses APScheduler for background tasks -- a common production pattern:
# flask_scheduler_app.py
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from datetime import datetime
import atexit
app = Flask(__name__)
app.config["SECRET_KEY"] = "your-secret-key"
# Simple in-memory log for demo purposes
job_log = []
def cleanup_old_sessions():
ts = datetime.now().strftime("%H:%M:%S")
entry = f"[{ts}] Cleaned up expired sessions"
job_log.append(entry)
print(entry)
def send_digest_emails():
ts = datetime.now().strftime("%H:%M:%S")
entry = f"[{ts}] Sent digest emails to subscribers"
job_log.append(entry)
print(entry)
def health_check_external_apis():
ts = datetime.now().strftime("%H:%M:%S")
entry = f"[{ts}] Health check: all APIs responding"
job_log.append(entry)
print(entry)
@app.route("/")
def index():
return jsonify({"status": "running", "recent_jobs": job_log[-5:]})
@app.route("/jobs")
def list_jobs():
jobs = [{"id": j.id, "name": j.name, "next_run": str(j.next_run_time)}
for j in scheduler.get_jobs()]
return jsonify(jobs)
# Initialize scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_old_sessions, "interval", minutes=30, id="cleanup")
scheduler.add_job(send_digest_emails, "cron", hour=8, minute=0, id="digest")
scheduler.add_job(health_check_external_apis, "interval", minutes=5, id="health")
scheduler.start()
# Ensure scheduler shuts down with the app
atexit.register(lambda: scheduler.shutdown())
if __name__ == "__main__":
app.run(debug=False, port=5000)
Test it:
# In another terminal
curl http://localhost:5000/jobs
# Returns:
# [{"id":"cleanup","name":"cleanup_old_sessions","next_run":"2026-05-03 09:45:00+00:00"},
# {"id":"digest","name":"send_digest_emails","next_run":"2026-05-04 08:00:00+00:00"},
# {"id":"health","name":"health_check_external_apis","next_run":"2026-05-03 09:20:00+00:00"}]
The atexit.register() call ensures the scheduler shuts down cleanly when the Flask development server stops. In production (Gunicorn, uWSGI), handle shutdown via a signal handler or application lifecycle hook. The /jobs endpoint gives you a simple way to verify that scheduled tasks are registered and shows their next run times.
Frequently Asked Questions
Does APScheduler work with asyncio and FastAPI?
Yes -- use AsyncIOScheduler instead of BackgroundScheduler. In a FastAPI app, start the scheduler in the startup event and shut it down in shutdown: @app.on_event("startup") calls scheduler.start(). Job functions should be regular functions or async coroutines depending on your use case. APScheduler 4.x (currently in beta) is a full async rewrite -- check the docs for the latest stable version.
What happens to jobs that are missed while my process is down?
With in-memory job stores, missed jobs are lost. With a persistent job store (SQLAlchemy, MongoDB), the jobs are remembered but the behavior on restart depends on coalesce and misfire_grace_time settings. coalesce=True runs missed jobs once; coalesce=False runs each missed instance. misfire_grace_time sets a deadline -- if a job is missed by more than this many seconds, it is skipped entirely.
How do I schedule jobs in a specific timezone?
Pass timezone to the trigger: scheduler.add_job(func, "cron", hour=8, timezone="Australia/Sydney"). You can also set a default timezone on the scheduler itself: BackgroundScheduler(timezone="UTC"). Always use timezone-aware scheduling in production -- naive datetime handling causes subtle bugs around daylight saving transitions.
Can I run APScheduler across multiple processes or servers?
Not directly with the background scheduler -- each process runs its own independent scheduler. For true distributed scheduling across multiple workers, use Celery Beat (backed by Redis or RabbitMQ) or APScheduler with a shared PostgreSQL job store combined with row-level locking. APScheduler's SQLAlchemy job store supports a tableschema for multi-process sharing, but it requires careful configuration to avoid duplicate job execution.
How do I pause, resume, or remove a job at runtime?
Use the job's ID: scheduler.pause_job("my_job_id"), scheduler.resume_job("my_job_id"), and scheduler.remove_job("my_job_id"). You can also get a job object and call methods on it: job = scheduler.get_job("my_job_id") then job.pause(). Build a simple admin endpoint in your web app that calls these methods to control job scheduling at runtime without a restart.
Conclusion
APScheduler brings cron-style scheduling inside your Python application, where it belongs. The three trigger types -- interval for repeated tasks, cron for time-of-day scheduling, and date for one-off jobs -- cover virtually every scheduling need. Add event listeners for error alerting, use the SQLAlchemy job store for persistence across restarts, and integrate with Flask or FastAPI using the background scheduler and atexit.
The real-life Flask example demonstrates the complete pattern: initialize the scheduler at module level, register jobs with IDs so they can be managed at runtime, start it once, and register an atexit handler. For production, add a persistent job store and proper timezone configuration from the start -- these are the two settings that cause the most pain when retrofitted later.
For the full API reference and advanced features like executors and job stores, see the APScheduler documentation.