Intermediate

Every application eventually needs to run tasks on a schedule: send a nightly report, clean up expired cache entries every hour, poll an API every 30 seconds, or run a database backup at 2 AM every Sunday. Python’s built-in sched module is too low-level, and spinning up a cron job for every small task in a Python app adds operational overhead. APScheduler fills this gap perfectly.

APScheduler (Advanced Python Scheduler) is a mature, battle-tested scheduling library that runs inside your Python process. It supports three trigger types — interval, cron, and date — and three execution modes: blocking, background thread, and async. It works in any Python application without needing an external daemon or message broker.

This article covers installing APScheduler, running your first interval job, using cron expressions for time-based scheduling, running one-off delayed jobs with the date trigger, managing jobs at runtime (pause, resume, remove), persisting jobs across restarts with job stores, and combining everything into a production-ready task scheduler. By the end you will have a complete scheduling toolkit that fits inside any Python application.

APScheduler: Quick Example

Here is a working APScheduler script that runs a job every 5 seconds in the background while your main program continues:

# quick_scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
import time

def my_job():
    print("Job running!")

scheduler = BackgroundScheduler()
scheduler.add_job(my_job, "interval", seconds=5)
scheduler.start()

print("Scheduler started. Press Ctrl+C to stop.")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()
    print("Scheduler stopped.")
Scheduler started. Press Ctrl+C to stop.
Job running!
Job running!
Job running!
^CScheduler stopped.

The BackgroundScheduler runs jobs in a daemon thread, so your main thread stays free. The add_job() method takes the function to call, the trigger type ("interval"), and trigger-specific keyword arguments. The start() and shutdown() calls control the scheduler lifecycle.

Python apscheduler tutorial illustration 1
scheduler.add_job() — because hardcoded sleep() loops are a cry for help.

What Is APScheduler and When Should You Use It?

APScheduler is an in-process task scheduler for Python applications. “In-process” means the scheduler lives inside your Python program — no Redis, no Celery worker, no cron daemon required. This makes it ideal for lightweight automation tasks that belong inside your application.

ToolBest ForRequiresIn-Process
APSchedulerApp-embedded schedulingNothing externalYes
Celery BeatDistributed task queuesRedis or RabbitMQNo
cron (OS)System-level scriptsUnix/Linux OSNo
schedule libSimple interval jobsNothing externalYes
rq-schedulerQueue-backed schedulingRedisNo

Use APScheduler when you want scheduling built into your Python application without external dependencies. It is the right choice for a Flask/FastAPI web app that needs background jobs, a data pipeline script that runs tasks on varying schedules, or any service that needs more power than the simple schedule library but does not need the full Celery stack.

Installation

Install APScheduler with pip:

pip install apscheduler
Successfully installed APScheduler-3.10.4

APScheduler 3.x is the stable release as of 2026. APScheduler 4.x is in development and has a different API. This article uses 3.x. Verify your version with python -c "import apscheduler; print(apscheduler.__version__)".

Choosing a Scheduler Type

APScheduler provides different scheduler classes depending on how you want to run jobs:

# scheduler_types.py
# BlockingScheduler -- takes over the main thread, good for standalone scripts
from apscheduler.schedulers.background import BackgroundScheduler   # runs in a daemon thread
from apscheduler.schedulers.blocking import BlockingScheduler        # blocks main thread
# from apscheduler.schedulers.asyncio import AsyncIOScheduler        # for asyncio apps

# For most apps: BackgroundScheduler
scheduler = BackgroundScheduler()

# For a standalone scheduler script with nothing else to do:
# scheduler = BlockingScheduler()
# scheduler.start()  # This blocks until scheduler.shutdown() is called

print("BackgroundScheduler and BlockingScheduler imported OK")
BackgroundScheduler and BlockingScheduler imported OK

BackgroundScheduler is the most flexible choice: it runs in a daemon thread so your main thread stays free to do other work (serve HTTP requests, wait for user input, etc.). BlockingScheduler makes sense for a script whose only job is running scheduled tasks. AsyncIOScheduler is for FastAPI or any asyncio-based application where you need to await the scheduled functions.

The Three Trigger Types

APScheduler has three built-in trigger types. Understanding when to use each one is the key to using APScheduler effectively.

Interval Trigger — Run Every N Units of Time

The interval trigger runs a job repeatedly with a fixed time gap between executions. Use it for polling, heartbeats, and periodic cleanup tasks.

# interval_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import datetime

def cleanup_expired_sessions():
    print(f"[{datetime.now():%H:%M:%S}] Cleaning up expired sessions...")

def poll_api():
    print(f"[{datetime.now():%H:%M:%S}] Polling external API...")

scheduler = BackgroundScheduler()

# Run every 10 seconds
scheduler.add_job(cleanup_expired_sessions, "interval", seconds=10)

# Run every 2 minutes -- combine time units
scheduler.add_job(poll_api, "interval", minutes=2, seconds=30)

# interval with start_date -- delay first run
scheduler.add_job(
    poll_api,
    "interval",
    hours=1,
    start_date="2026-05-01 08:00:00"
)

scheduler.start()
print("Scheduler running...")
time.sleep(25)
scheduler.shutdown()
print("Done.")
Scheduler running...
[10:00:00] Cleaning up expired sessions...
[10:00:10] Cleaning up expired sessions...
[10:00:20] Cleaning up expired sessions...
Done.

Interval trigger arguments include weeks, days, hours, minutes, seconds, and microseconds. You can combine them: hours=1, minutes=30 runs the job every 90 minutes. The optional start_date delays the first run until a specific datetime.

Python apscheduler tutorial illustration 2
Every 10 seconds, forever. Your server’s new roommate.

Cron Trigger — Run on a Schedule

The cron trigger uses cron-style expressions to run jobs at specific times, days of the week, or calendar positions. It is the right tool for “run at 3 AM every Sunday” or “run on the 1st of every month”.

# cron_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import datetime

def generate_weekly_report():
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Generating weekly sales report...")

def send_daily_digest():
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Sending daily digest email...")

def monthly_db_backup():
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Running monthly database backup...")

scheduler = BackgroundScheduler()

# Every Monday at 8:00 AM
scheduler.add_job(generate_weekly_report, "cron",
                  day_of_week="mon", hour=8, minute=0)

# Every day at 6:30 PM
scheduler.add_job(send_daily_digest, "cron",
                  hour=18, minute=30)

# First day of every month at 2:00 AM
scheduler.add_job(monthly_db_backup, "cron",
                  day=1, hour=2, minute=0)

# Every weekday (Mon-Fri) at noon
scheduler.add_job(
    send_daily_digest, "cron",
    day_of_week="mon-fri", hour=12, minute=0
)

scheduler.start()
print("Cron scheduler running. Ctrl+C to stop.")
try:
    while True:
        time.sleep(60)
except KeyboardInterrupt:
    scheduler.shutdown()
Cron scheduler running. Ctrl+C to stop.
[2026-05-01 02:00] Running monthly database backup...
[2026-05-01 06:30] Sending daily digest email...

Cron trigger fields include year, month, day, week, day_of_week, hour, minute, and second. Each field accepts integers, ranges ("1-5"), lists ("1,15"), steps ("*/2" for every 2 units), and names ("mon-fri"). Omitted fields default to * (every value), so hour=12 alone means “every day at noon every minute of that hour” — you almost always want to specify minute=0 alongside hour.

Date Trigger — Run Once at a Specific Time

The date trigger schedules a job to run exactly once at a specified datetime. Use it for deferred execution, one-off reminders, or delayed notifications.

# date_trigger.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import time

def send_welcome_email(user_email):
    print(f"Sending welcome email to {user_email}")

def process_payment(order_id, amount):
    print(f"Processing payment for order {order_id}: ${amount}")

scheduler = BackgroundScheduler()

# Run in 5 seconds (simulate a delayed notification)
run_at = datetime.now() + timedelta(seconds=5)
scheduler.add_job(send_welcome_email, "date",
                  run_date=run_at,
                  args=["alice@example.com"])

# Run in 10 seconds with keyword arguments
scheduler.add_job(process_payment, "date",
                  run_date=datetime.now() + timedelta(seconds=10),
                  kwargs={"order_id": "ORD-001", "amount": 49.99})

scheduler.start()
print(f"Jobs scheduled. Waiting...")
time.sleep(15)
scheduler.shutdown()
print("Done.")
Jobs scheduled. Waiting...
Sending welcome email to alice@example.com
Processing payment for order ORD-001: $49.99
Done.

The args parameter passes positional arguments to the job function; kwargs passes keyword arguments. This lets you schedule parameterized jobs dynamically — for example, scheduling a welcome email when a user registers, with their email address baked into the scheduled call.

Managing Jobs at Runtime

APScheduler gives you full control over jobs after they are scheduled. You can pause, resume, modify, and remove jobs while the scheduler is running.

# job_management.py
from apscheduler.schedulers.background import BackgroundScheduler
import time

def counter_job():
    print(f"Tick!")

scheduler = BackgroundScheduler()

# Add job and capture its ID
job = scheduler.add_job(counter_job, "interval", seconds=3, id="my_counter")
print(f"Added job: {job.id}")

scheduler.start()
time.sleep(7)

# Pause the job
scheduler.pause_job("my_counter")
print("Job paused.")
time.sleep(5)  # Nothing fires during pause

# Resume the job
scheduler.resume_job("my_counter")
print("Job resumed.")
time.sleep(7)

# Modify the interval to every 1 second
scheduler.reschedule_job("my_counter", trigger="interval", seconds=1)
print("Job rescheduled to every 1 second.")
time.sleep(5)

# List all jobs
for j in scheduler.get_jobs():
    print(f"Job: {j.id} | Next run: {j.next_run_time}")

# Remove the job
scheduler.remove_job("my_counter")
print("Job removed.")

scheduler.shutdown()
Added job: my_counter
Tick!
Tick!
Job paused.
Job resumed.
Tick!
Tick!
Job rescheduled to every 1 second.
Tick!
Tick!
Tick!
Job: my_counter | Next run: 2026-04-30 10:00:15+00:00
Job removed.

Always use explicit id strings when adding jobs you plan to manage later. Without an ID, APScheduler generates a UUID, making it hard to reference the job later. The get_jobs() method returns a list of all active Job objects with properties like id, next_run_time, and func.

Python apscheduler tutorial illustration 3
pause_job(), resume_job(), remove_job() — the runtime controls cron never gave you.

Real-Life Example: Automated Site Health Monitor

Here is a complete monitoring script that checks a website’s health on multiple schedules — a fast heartbeat check every minute, a detailed check every 15 minutes, and a daily summary report.

# site_monitor.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
import urllib.request
import urllib.error

SITE_URL = "https://httpbin.org/status/200"   # Use httpbin for testing
stats = {"checks": 0, "failures": 0, "last_status": None}

def heartbeat_check():
    """Quick ping every 60 seconds."""
    try:
        req = urllib.request.urlopen(SITE_URL, timeout=5)
        stats["last_status"] = req.getcode()
        stats["checks"] += 1
        print(f"[{datetime.now():%H:%M:%S}] Heartbeat OK ({req.getcode()})")
    except urllib.error.URLError as e:
        stats["failures"] += 1
        print(f"[{datetime.now():%H:%M:%S}] Heartbeat FAILED: {e.reason}")

def detailed_check():
    """Measure response time every 15 minutes."""
    import time as t
    start = t.monotonic()
    try:
        urllib.request.urlopen(SITE_URL, timeout=10)
        elapsed = t.monotonic() - start
        print(f"[{datetime.now():%H:%M:%S}] Detailed check: {elapsed:.3f}s response time")
    except urllib.error.URLError as e:
        print(f"[{datetime.now():%H:%M:%S}] Detailed check FAILED: {e.reason}")

def daily_summary():
    """Log daily stats at midnight."""
    uptime = 0.0
    if stats["checks"] > 0:
        uptime = (stats["checks"] - stats["failures"]) / stats["checks"] * 100
    print(f"\n=== DAILY SUMMARY [{datetime.now():%Y-%m-%d}] ===")
    print(f"  Total checks  : {stats['checks']}")
    print(f"  Failures      : {stats['failures']}")
    print(f"  Uptime        : {uptime:.2f}%")
    print(f"  Last status   : {stats['last_status']}\n")
    # Reset stats for the next day
    stats.update({"checks": 0, "failures": 0})

scheduler = BackgroundScheduler()
scheduler.add_job(heartbeat_check, "interval", seconds=60,  id="heartbeat")
scheduler.add_job(detailed_check,  "interval", minutes=15,  id="detailed")
scheduler.add_job(daily_summary,   "cron",     hour=0, minute=0, id="daily")

scheduler.start()
print(f"Site monitor started for {SITE_URL}")
print("Running checks... (Ctrl+C to stop)")

try:
    # Run heartbeat immediately on start
    heartbeat_check()
    detailed_check()
    while True:
        time.sleep(30)
except KeyboardInterrupt:
    daily_summary()
    scheduler.shutdown()
    print("Monitor stopped.")
Site monitor started for https://httpbin.org/status/200
[10:00:00] Heartbeat OK (200)
[10:00:00] Detailed check: 0.312s response time
[10:01:00] Heartbeat OK (200)
...
^C
=== DAILY SUMMARY [2026-04-30] ===
  Total checks  : 3
  Failures      : 0
  Uptime        : 100.00%
  Last status   : 200
Monitor stopped.

This script uses httpbin.org as a real test endpoint — it always returns HTTP 200, so you can verify the monitoring logic works without setting up your own server. Replace SITE_URL with your actual site and extend daily_summary() to write to a log file or send an email via smtplib.

Python apscheduler tutorial illustration 4
Uptime: 100%. Stress: 0%. Cron job: replaced.

Frequently Asked Questions

What happens if a job misfire (the scheduler was down when it should have run)?

By default, APScheduler has a misfire_grace_time of 1 second. If a job is more than 1 second late, it is skipped and logged as a misfire. You can increase this: scheduler.add_job(my_job, "cron", hour=8, misfire_grace_time=3600) gives the job a 1-hour window to run after its scheduled time. Set misfire_grace_time=None to always run the job, no matter how late.

How do I prevent a job from running again if the previous run is still going?

Use max_instances=1 in add_job(): scheduler.add_job(my_job, "interval", seconds=30, max_instances=1). This prevents a second instance of the job from starting if the previous one has not finished. The default is 1 for most trigger types, but setting it explicitly makes your intent clear. For CPU-bound jobs, also consider setting executor to a process pool executor.

How do I make jobs survive application restarts?

Use a jobstore. APScheduler supports SQLite, PostgreSQL, MongoDB, and Redis job stores. For SQLite: scheduler = BackgroundScheduler(jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')}). Jobs stored in the database survive restarts. Note that the job function must still exist and be importable when the scheduler starts — the job store saves the schedule, not the code.

How do I use APScheduler with FastAPI or asyncio?

Use AsyncIOScheduler instead of BackgroundScheduler. Start it in FastAPI’s lifespan function: scheduler.start() at startup and scheduler.shutdown() at shutdown. If your scheduled function is a coroutine (async def), APScheduler will await it automatically when using AsyncIOScheduler.

How do I schedule jobs in a specific timezone?

Pass the timezone argument to the scheduler or to the job: BackgroundScheduler(timezone="America/New_York") sets the default timezone. For a specific job: scheduler.add_job(my_job, "cron", hour=8, timezone="Europe/London"). APScheduler uses the pytz library for timezone handling — install it separately if needed.

Conclusion

APScheduler gives you three powerful scheduling primitives — interval for repeating tasks, cron for calendar-based schedules, and date for one-off deferred execution — all running inside your Python process without any external infrastructure. You have seen how to choose between BackgroundScheduler and BlockingScheduler, manage jobs at runtime with pause/resume/remove, handle misfires and overlapping runs, and build a complete site monitoring application. The official APScheduler documentation at apscheduler.readthedocs.io covers advanced topics including distributed job stores and custom executors.