Last Updated: June 01, 2026

Advanced

The python await and async is one of the more advanced features to help run your programs faster by making sure the CPU is spending as little time as possible waiting and instead as much time as possible working. If ever you see a capable chef, you’ll know what I mean. The chef is not just following a recipe step by step (i.e. working synchronously), the chef is boiling water to cook the pasta , measuring the amount of pasta, chopping tomatoes for the pasta sauce until the water boils etc (i.e. the chef is working asynchronously). The chef is minimizing the time they are waiting idle and always working on a task. That’s the same idea with async and await.

For this tutorial, we will focus on python 3.7 as it has some of the more modern features of await and async. We will call out some of the differences for python 3.4 – 3.6.

Pubs - Python How To Program

Written by Pubs

Python developer and educator with 15+ years building production systems across data engineering, web APIs, and AI tooling. Founder of Python How To Program — 270+ in-depth tutorials covering the modern Python stack.

View all tutorials by Pubs →

What is async await in Python?

The async await keywords help to define in your program which parts need to run sequentially, and which parts may take sometime but other parts of the program can execute while this step completes. A modern example of this is that if you’re downloading a web page it may take a few seconds, while the download is happening you can execute other parts of your program.

How does async await work in Python?

Sometimes the best way to explain something is to show how you would achieve the same thing without the feature.

Continuing with the restaurant theme, suppose you are running a hamburger stall (you’re the waiter and the chef) and it is almost instant to collect payment for a customer and serve the final hamburger, but the most time consuming task is to cooking the beef patty which takes 2 seconds (one could only wish!).

See the below diagram:

Figure 1: Sequentially serving customers at a hamburger stall

In the above diagram:

  • Step 1: you would first get the order and collect the money from Customer 1
  • Step 2: you would then put a beef patty on the cook top and then wait for 2 seconds for the beef patty to cook. At the same time, Customer 1 is also waiting for 2 seconds.
  • Step 3: when the beef patty is cooked, you can then plate this onto a hamburger bun
  • Step 4: pass the final hamburger to Customer 1
  • Step 5: You would then start to serve Customer 2 (who has already been waiting 2 seconds for you to serve Customer 1). You can then repeat steps 2-4

With the above approach, Customer 1 would have their burger in about 2 seconds, Customer 2 approx 4 seconds, and then Customer 3 approx 6 seconds.

The equivalent code would be as follows:

import time, datetime, timeit

customer_queue = [ "C1", "C2", "C3" ]

def get_next_customer():
    return customer_queue.pop(0)    #Get the first customer from list

def cook_hamburger(customer):
    start_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Start cooking hamberger for customer")
    time.sleep(2)   # It takes 2 seconds to cook the hamburger
    end_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Finish cooking hamberger for customer.  Total {end_customer_timer-start_customer_timer} seconds\n")

def run_shop():
    while customer_queue:
        curr_customer = get_next_customer()
        cook_hamburger(curr_customer)

def main():
    print('Hamburger Shop')
    start = timeit.default_timer()
    run_shop()
    stop = timeit.default_timer()
    print(f"** Total runtime: {stop-start} seconds ***")

if __name__ == '__main__':
    main()

The code above is fairly straightforward. We have a list of customers that are queuing in the list customer_queue which are being looped under the def run_shop(). For each customer (get_next_customer()), we call cook_hamburger() to cook the hamburger for 2 seconds and wait for it to complete.

Running this code you would get the following output:

As expected, the total runtime for 3 customers is 6 seconds since each customer is served sequentially.

Cooking Hamburgers Asynchronously and coding the event loop manually

Instead of serving the customer and cooking the hamburger for each customer, you can obviously do some of the tasks asynchronously, meaning you can start the task but you don’t have to sit and wait, you can do something else. See the following diagram where the chef/waiter is serving multiple customers and cooking at the same time. It’s not explicitly shown here, but the chef/waiter is constantly checking on the status of the next task and if a task doesn’t require his/her attention they’ll move on to the next task. This process of always looking for something to do is the equivalent of the “event loop”. The Event Loop is a programming construct where the logic is to always look for a task to execute and if there’s a task which will take some time it can release control to the next task in the loop.

Figure 2: Example of how the event loop works in a real life example – the chef/waiter is always busy!

In the above example, the following is happening:

  • Step 1: you would first get the order and collect the money from Customer 1
  • Step 2: you would then put a beef patty on the cook top and then let it cook, then immediately move on to the next customer while the patty is cooking.
  • Step 3: you would first get the order and collect the money from Customer 2. You would also check if the first beef patty has completed cooking yet.
  • Step 4: you would then put another beef patty on the cook top and then let it cook, then immediately move on to the next customer while the patty is cooking.
  • Step 5: When any of the beef patties are done, you would plate it
  • Step 6: Pass the plated hamburger to the respective customer. Note, in the above example we’ve assumed it to be Customer 1, but it could be any customer depending on which beef patty cooked fully first.
  • Step 7: When any of the beef patties are done, you would plate it, and server

This is the equivalent of the event loop. The chef/waiter is constantly checking if it needs to serve the customer or check on the hamburgers which are cooking. When there’s a hamburger is placed on the stove and we need to wait 2 seconds, the chef/waiter moves to the next task and does not wait for the 2 seconds to complete. When the hamburger is done, it is then served to the customer.

How can this be done programatically? Glad you asked:

import time ,datetime, timeit

customer_queue = [ "C1", "C2", "C3" ]
hamburger_queue = []

def get_next_customer():
    if customer_queue: return customer_queue.pop(0)    #Get the first customer from list
    return None 

def start_cooking_hamburger(customer):
    print( f"[{customer}]: Start cooking hamberger for customer")
    hamburger = { "customer":customer, "start_cooking_time": timeit.default_timer(), "cooked":False}
    hamburger_queue.append( hamburger )

def check_hamburger_status():
    curr_timer = timeit.default_timer()

    #Check if it's cooking, but release control
    for index, hamburger in enumerate(hamburger_queue):         
        elapsed_time = curr_timer-hamburger['start_cooking_time']
        if elapsed_time > 2: #2 second has passed for hamrburger to cook
            print( f"[{hamburger['customer']}]: Finish cooking hamberger for customer.  Total {elapsed_time} seconds\n")
            del hamburger_queue[ index].  #delete from list to mark as done

def run_shop():
    while customer_queue or hamburger_queue:        #Event loop
        curr_customer = get_next_customer()
        if curr_customer: start_cooking_hamburger(curr_customer)
        check_hamburger_status()

def main():
    print('Hamburger Shop')
    start = timeit.default_timer()
    run_shop()
    stop = timeit.default_timer()
    print(f"** Total runtime: {stop-start} seconds ***")

if __name__ == '__main__':
    main()

The output of the code is as follows:

Output running asynchronously – notice the runtime of 2 seconds compared to the 6 seconds in the synchronsous method.

So there’s a few things happening here:

  • There’s a new list called hamburger_queue[] which is keeping track of each hamburger that is being cooked
  • The event loop is the while customer_queue or hamburger_queue within the run_shop() function
  • We have a new function called start_cooking_hamburger() which helps to keep track of the task to cooking starting. Why is this needed? Well in the past we would simply wait for a given task. Now, since we are doing something else while we wait, we need to remember a few things to come back to the task
  • We also have a new function called check_hamburger_status() which checks the status of each hamburger being cooked (i.e. item in hamburger_queue[]), and if it is cooked (i.e. 2 seconds have passed), then it is considered complete

You may notice in the output that Customer 3 was in fact served before Customer 2. This is because that the execution order is not guarantee.

How To Automate Tasks with Python: A Practical Guide

How To Automate Tasks with Python: A Practical Guide

Last Updated: June 14, 2026

Beginner to Intermediate

You have a folder full of downloaded files named document(1).pdf, screenshot_2024.png, and report_final_v3_FINAL.xlsx. Every week you spend 20 minutes sorting them by hand. Or maybe you copy data from a website into a spreadsheet every morning, or you run the same three terminal commands every time you start work. These are the tasks Python was built to eliminate. With a few dozen lines of code you can turn a painful weekly chore into something that runs itself while you drink coffee.

Python ships with a rich standard library for automation — pathlib for file operations, subprocess for running system commands, smtplib for sending email — and the broader ecosystem adds libraries like schedule for periodic jobs and requests for web data collection. No special setup is needed beyond a standard Python 3.8+ install. For scheduling, you will need to install schedule with pip, but everything else in this article is built in.

In this guide we will cover four practical automation categories: organizing files and folders with pathlib and shutil, collecting web data with requests and BeautifulSoup, scheduling jobs to run automatically with the schedule library, and running system commands with subprocess. Each section ends with working code you can adapt to your own situation. By the end you will have a toolkit of reusable automation patterns and a complete script that organizes a messy downloads folder automatically.

Pubs - Python How To Program
Written by Pubs

Python developer and educator with 15+ years building production systems across data engineering, web APIs, and AI tooling. Founder of Python How To Program.

Automate Tasks with Python: Quick Example

Here is a self-contained script that renames and moves files in a folder based on their extension — one of the most common automation tasks you will ever write:

# sort_downloads.py
from pathlib import Path
import shutil

FOLDER = Path.home() / "Downloads"
DESTINATIONS = {
    ".pdf":  "Documents",
    ".png":  "Images",
    ".jpg":  "Images",
    ".xlsx": "Spreadsheets",
    ".csv":  "Spreadsheets",
    ".zip":  "Archives",
}

for file in FOLDER.iterdir():
    if file.is_file() and file.suffix in DESTINATIONS:
        dest_folder = FOLDER / DESTINATIONS[file.suffix]
        dest_folder.mkdir(exist_ok=True)
        shutil.move(str(file), dest_folder / file.name)
        print(f"Moved: {file.name} -> {DESTINATIONS[file.suffix]}/")

Output (example):

Moved: invoice_march.pdf -> Documents/
Moved: screenshot_2024.png -> Images/
Moved: sales_report.xlsx -> Spreadsheets/

The script uses Path.home() to get the user’s home directory regardless of operating system, iterdir() to loop over every item in the folder, and shutil.move() to relocate the file. The mkdir(exist_ok=True) call creates the destination folder if it does not already exist — no crash if it is there, no error if it is not. We will build a more complete version in the real-life example section, including duplicate detection and logging.

What Is Python Automation and When Should You Use It?

Automation means writing code that performs a repetitive task so you do not have to. The rule of thumb is: if you have done something manually more than three times, it is worth automating. Python is the go-to language for automation because it has concise syntax, a massive standard library, and third-party packages that cover almost every automation use case out of the box.

The table below maps common repetitive tasks to the Python tools that handle them:

Task TypePython ToolWhen to Use
File/folder operationspathlib, shutilRenaming, moving, copying, deleting files
Reading/writing filesbuilt-in open(), csvLog parsing, report generation, data transformation
Web data collectionrequests, BeautifulSoupPulling prices, headlines, tables from websites
Scheduled jobsschedule, APSchedulerRunning tasks daily, hourly, or on a cron-like schedule
System commandssubprocessRunning CLI tools, shell scripts, git, ffmpeg
Email sendingsmtplib, yagmailAutomated reports, alerts, notifications

A good automation script is idempotent — running it twice produces the same result as running it once. It handles edge cases (missing files, network errors, duplicate names) without crashing. And it logs what it did so you can review the results later. Keep these principles in mind as we work through each section.

Python developer sorting files automatically with pathlib and shutil
pathlib.iterdir() — because sorting files by hand is how you waste a Tuesday.

Automating File and Folder Operations

The pathlib module (Python 3.4+) provides an object-oriented interface for working with file paths that is far more readable than the older os.path approach. Combined with shutil for copy/move operations, these two modules cover 90% of file automation tasks.

Finding and Filtering Files with pathlib

The glob() and rglob() methods on a Path object let you find files matching a pattern across an entire directory tree. glob() searches one level deep; rglob() (recursive glob) searches all subdirectories:

# find_files.py
from pathlib import Path

base = Path("/tmp/project")  # change to your actual folder

# Find all Python files in this folder only
py_files = list(base.glob("*.py"))
print("Python files (top-level):", [f.name for f in py_files])

# Find all log files anywhere in the tree
log_files = list(base.rglob("*.log"))
print("Log files (all depths):", [f.relative_to(base) for f in log_files])

# Find files larger than 1 MB
large_files = [f for f in base.rglob("*") if f.is_file() and f.stat().st_size > 1_000_000]
print("Files over 1MB:", [f.name for f in large_files])

Output (example):

Python files (top-level): ['app.py', 'utils.py', 'config.py']
Log files (all depths): [PosixPath('logs/app.log'), PosixPath('logs/error.log')]
Files over 1MB: ['dataset.csv', 'backup.tar.gz']

f.stat().st_size returns the file size in bytes. The expression f.relative_to(base) strips the base directory from the path so you see logs/app.log instead of the full absolute path. Both are useful for building reports of what your script found before it starts moving anything.

Renaming and Copying Files Safely

Before moving or renaming files in an automation script, always check whether the destination already exists. Blindly overwriting a file can cause data loss that is impossible to reverse:

# safe_copy.py
from pathlib import Path
import shutil

def safe_copy(src: Path, dest_dir: Path) -> Path:
    """Copy src into dest_dir, appending a counter if the name already exists."""
    dest_dir.mkdir(parents=True, exist_ok=True)
    dest = dest_dir / src.name

    if dest.exists():
        counter = 1
        while dest.exists():
            stem = src.stem
            dest = dest_dir / f"{stem}_{counter}{src.suffix}"
            counter += 1

    shutil.copy2(str(src), dest)   # copy2 preserves metadata (timestamps)
    return dest

# Demo
src_file = Path("/tmp/report.pdf")
src_file.write_text("dummy content")   # create test file
result = safe_copy(src_file, Path("/tmp/archive"))
print(f"Copied to: {result}")

result2 = safe_copy(src_file, Path("/tmp/archive"))  # simulate duplicate
print(f"Duplicate handled: {result2}")

Output:

Copied to: /tmp/archive/report.pdf
Duplicate handled: /tmp/archive/report_1.pdf

The shutil.copy2() function copies the file content AND preserves the original modification time, which is important when you want archive copies to retain their original dates. The counter loop ensures you never silently overwrite an existing file — a critical safety net for any file automation script.

Developer inspecting duplicate files during automation
Duplicate detected. Counter incremented. Crisis averted.

Automating Web Data Collection

Web scraping lets your scripts pull data from websites automatically. The standard approach uses requests to download HTML and BeautifulSoup to parse it. Install both with pip install requests beautifulsoup4.

Fetching and Parsing a Web Page

We will use quotes.toscrape.com, a site built specifically for scraping practice. It serves reliable HTML with a stable structure, so this code will continue to work without modification.

Here is the HTML structure of each quote on that page, so you can see exactly what the selectors are targeting:

<!-- HTML structure of each quote on quotes.toscrape.com -->
<div class="quote">
    <span class="text">"The world as we have created it..."</span>
    <span>
        by <small class="author">Albert Einstein</small>
    </span>
    <div class="tags">
        <a class="tag" href="/tag/change/page/1/">change</a>
    </div>
</div>

Now the scraping code:

# scrape_quotes.py
import requests
from bs4 import BeautifulSoup

def scrape_quotes(url: str) -> list[dict]:
    response = requests.get(url, timeout=10)
    response.raise_for_status()   # raises HTTPError for 4xx/5xx responses

    soup = BeautifulSoup(response.text, "html.parser")
    results = []

    for quote_div in soup.select("div.quote"):
        text_elem = quote_div.select_one("span.text")
        author_elem = quote_div.select_one("small.author")
        tag_elems = quote_div.select("a.tag")

        # Defensive: check before accessing .text
        text   = text_elem.text.strip()   if text_elem   else "Unknown"
        author = author_elem.text.strip() if author_elem else "Unknown"
        tags   = [t.text for t in tag_elems]

        results.append({"quote": text, "author": author, "tags": tags})

    return results

quotes = scrape_quotes("https://quotes.toscrape.com")
for q in quotes[:3]:
    print(f'{q["author"]}: {q["quote"][:60]}...')
    print(f'  Tags: {", ".join(q["tags"])}')
    print()

Output:

Albert Einstein: "The world as we have created it is a process of...
  Tags: change, deep-thoughts, thinking, world

J.K. Rowling: "It is our choices, Harry, that show what we truly a...
  Tags: abilities, choices

Albert Einstein: "There are only two ways to live your life. One is...
  Tags: inspirational, life, live, miracle, miracles

response.raise_for_status() is a one-line safety net — it raises a requests.HTTPError if the server returns a 4xx or 5xx status code instead of silently continuing with bad data. The defensive checks (text_elem.text if text_elem else "Unknown") protect against pages where an element is missing, which happens constantly on real-world sites.

Saving Scraped Data to CSV

Collecting data is only half the job — you need to store it somewhere useful. Writing to CSV with Python’s built-in csv module keeps the output format-agnostic and readable in any spreadsheet application:

# save_to_csv.py
import csv
import requests
from bs4 import BeautifulSoup

def scrape_quotes(url):
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    results = []
    for div in soup.select("div.quote"):
        text   = div.select_one("span.text")
        author = div.select_one("small.author")
        results.append({
            "quote":  text.text.strip()   if text   else "",
            "author": author.text.strip() if author else "",
        })
    return results

quotes = scrape_quotes("https://quotes.toscrape.com")

output_file = "quotes.csv"
with open(output_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["author", "quote"])
    writer.writeheader()
    writer.writerows(quotes)

print(f"Saved {len(quotes)} quotes to {output_file}")

Output:

Saved 10 quotes to quotes.csv

Always pass encoding="utf-8" when writing CSV files — without it, non-ASCII characters (curly quotes, accented letters, em-dashes) will cause encoding errors or garbled output on Windows. The newline="" argument is also required by Python’s csv module to prevent extra blank lines on Windows.

Python web scraping with requests and BeautifulSoup
BeautifulSoup.select() — structured extraction from unstructured chaos.

Running Automated Jobs on a Schedule

Writing the automation code is only half the job. You also need it to run at the right time, without you having to remember to start it. The schedule library provides a clean Python API for defining when jobs should run — every minute, every day at 9am, every Monday, and so on. Install it with pip install schedule.

Basic Scheduling with the schedule Library

The schedule library works with a simple event loop: you register jobs with schedule.every(), then call schedule.run_pending() in a loop to check whether any jobs are due:

# basic_schedule.py
import schedule
import time
from datetime import datetime

def morning_report():
    print(f"[{datetime.now():%H:%M:%S}] Good morning! Running daily report...")
    # your actual report logic goes here

def hourly_check():
    print(f"[{datetime.now():%H:%M:%S}] Hourly check complete.")

# Schedule the jobs
schedule.every().day.at("09:00").do(morning_report)
schedule.every().hour.do(hourly_check)
schedule.every(30).minutes.do(lambda: print("30-min heartbeat"))

print("Scheduler running. Press Ctrl+C to stop.")
while True:
    schedule.run_pending()
    time.sleep(30)   # check every 30 seconds to save CPU

Output (example at 09:00:00):

Scheduler running. Press Ctrl+C to stop.
[09:00:00] Good morning! Running daily report...
[09:00:00] Hourly check complete.
[09:00:00] 30-min heartbeat
[09:30:00] 30-min heartbeat
[10:00:00] Hourly check complete.

The time.sleep(30) inside the loop is important — without a sleep, the loop burns 100% of one CPU core doing nothing. Sleeping 30 seconds means jobs can fire up to 30 seconds late (acceptable for most automation), while using negligible CPU. If you need second-level precision, use time.sleep(1) instead.

Handling Errors in Scheduled Jobs

When a job in a scheduled loop raises an unhandled exception, the whole process crashes and no more jobs run. Wrap your job functions with a try/except to log errors and keep the loop running:

# robust_schedule.py
import schedule
import time
import logging
from datetime import datetime

logging.basicConfig(
    filename="automation.log",
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)

def run_safely(job_func):
    """Decorator that catches exceptions and logs them without crashing the loop."""
    def wrapper():
        try:
            job_func()
            logging.info(f"{job_func.__name__} completed successfully")
        except Exception as exc:
            logging.error(f"{job_func.__name__} failed: {exc}", exc_info=True)
    return wrapper

def daily_scrape():
    # Simulating a job that sometimes fails
    import random
    if random.random() < 0.3:
        raise ConnectionError("Network unavailable")
    print("Scraped data successfully.")

schedule.every().day.at("08:00").do(run_safely(daily_scrape))

while True:
    schedule.run_pending()
    time.sleep(60)

The run_safely decorator wraps any function so that exceptions are caught, logged to a file, and the scheduler continues. The exc_info=True argument tells Python's logging module to include the full traceback in the log file -- essential for debugging failures that happen at 3am while you are asleep.

Python schedule library running automated jobs on a timer
schedule.run_pending() -- the world's most patient while loop.

Running System Commands with subprocess

Sometimes the right tool for a job is a command-line program, not a Python library. subprocess.run() lets your Python script launch any system command and capture its output, making it easy to orchestrate CLI tools like git, ffmpeg, or database utilities.

Basic subprocess Usage

# run_commands.py
import subprocess

# Run a command and capture its output
result = subprocess.run(
    ["python3", "--version"],
    capture_output=True,
    text=True,       # decode bytes to str automatically
    check=False,     # don't raise on non-zero exit code
)

print("Return code:", result.returncode)
print("Stdout:", result.stdout.strip())
print("Stderr:", result.stderr.strip())

# Run a command that lists files (works on macOS/Linux)
ls_result = subprocess.run(
    ["ls", "-la", "/tmp"],
    capture_output=True,
    text=True,
)
print("\nFirst 3 lines of /tmp listing:")
for line in ls_result.stdout.strip().split("\n")[:3]:
    print(" ", line)

Output:

Return code: 0
Stdout: Python 3.11.4
Stderr:

First 3 lines of /tmp listing:
  total 0
  drwxrwxrwt  20 root  wheel  640 Jun 12 09:31 .
  drwxr-xr-x  20 root  wheel  640 May 18 11:02 ..

Always use a list for the command argument (["ls", "-la", "/tmp"]) rather than a string ("ls -la /tmp"). The list form avoids shell injection vulnerabilities -- if any part of the command comes from user input, a string passed to shell=True can execute arbitrary shell commands. The list form is always safer.

Practical Example: Automating Git Operations

Here is a real-world use case -- a script that automatically stages, commits, and pushes changes in a git repository. This is useful for automating backup commits or syncing generated files:

# git_autocommit.py
import subprocess
from datetime import datetime
from pathlib import Path

def git_run(args: list[str], cwd: Path) -> subprocess.CompletedProcess:
    """Run a git command in the specified directory."""
    return subprocess.run(
        ["git"] + args,
        capture_output=True,
        text=True,
        cwd=cwd,
    )

def auto_commit(repo_path: Path, message: str = None) -> bool:
    """Stage all changes and commit if there is anything to commit."""
    # Check for changes
    status = git_run(["status", "--porcelain"], repo_path)
    if not status.stdout.strip():
        print("No changes to commit.")
        return False

    if message is None:
        message = f"Auto-commit {datetime.now():%Y-%m-%d %H:%M}"

    # Stage all changes
    git_run(["add", "-A"], repo_path)

    # Commit
    commit = git_run(["commit", "-m", message], repo_path)
    if commit.returncode == 0:
        print(f"Committed: {message}")
        return True
    else:
        print(f"Commit failed: {commit.stderr.strip()}")
        return False

# Usage (change to your actual repo path)
repo = Path("/tmp/my-project")
repo.mkdir(exist_ok=True)
auto_commit(repo, "Automated daily backup")

Output (when changes exist):

Committed: Automated daily backup

The helper function git_run() takes the git subcommand as a list and prepends "git", keeping the calling code clean. The cwd=cwd argument tells subprocess where to run the command -- without it, git would operate on whatever directory the script itself lives in, which is almost never what you want.

Real-Life Example: Automated Downloads Folder Organizer

We will now build a complete, production-ready script that watches your Downloads folder and organizes files into subfolders by type. It handles duplicates, logs every action, and can be scheduled to run automatically.

Python automation script organizing downloads folder with logging
Automation log: 47 files sorted. 3 duplicates handled. 0 manual clicks.
# downloads_organizer.py
import shutil
import logging
from pathlib import Path
from datetime import datetime

# --- Configuration ---
DOWNLOADS_DIR = Path.home() / "Downloads"
LOG_FILE = Path.home() / "downloads_organizer.log"

RULES = {
    "Documents": [".pdf", ".doc", ".docx", ".txt", ".rtf"],
    "Images":    [".jpg", ".jpeg", ".png", ".gif", ".svg", ".webp", ".heic"],
    "Videos":    [".mp4", ".mov", ".mkv", ".avi", ".m4v"],
    "Audio":     [".mp3", ".m4a", ".flac", ".wav", ".aac"],
    "Archives":  [".zip", ".tar", ".gz", ".rar", ".7z"],
    "Code":      [".py", ".js", ".html", ".css", ".json", ".sh", ".ipynb"],
    "Data":      [".csv", ".xlsx", ".xls", ".tsv", ".parquet"],
}

# Build reverse lookup: extension -> folder name
EXT_MAP = {ext: folder for folder, exts in RULES.items() for ext in exts}

# --- Logging setup ---
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)

def unique_dest(dest: Path) -> Path:
    """Append a counter to avoid overwriting existing files."""
    if not dest.exists():
        return dest
    counter = 1
    while True:
        candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
        if not candidate.exists():
            return candidate
        counter += 1

def organize(dry_run: bool = False) -> dict:
    """Move files from Downloads into categorized subfolders."""
    stats = {"moved": 0, "skipped": 0, "unknown": 0}

    for item in DOWNLOADS_DIR.iterdir():
        if not item.is_file():
            continue

        folder_name = EXT_MAP.get(item.suffix.lower())
        if folder_name is None:
            logging.info(f"SKIP (unknown type): {item.name}")
            stats["unknown"] += 1
            continue

        dest_dir = DOWNLOADS_DIR / folder_name
        dest = unique_dest(dest_dir / item.name)

        if dry_run:
            print(f"[DRY RUN] Would move: {item.name} -> {folder_name}/")
        else:
            dest_dir.mkdir(exist_ok=True)
            shutil.move(str(item), dest)
            logging.info(f"MOVED: {item.name} -> {folder_name}/{dest.name}")
            stats["moved"] += 1

    return stats

if __name__ == "__main__":
    print(f"Organizing {DOWNLOADS_DIR} ...")
    result = organize(dry_run=False)
    summary = f"Done. Moved: {result['moved']}, Unknown: {result['unknown']}"
    print(summary)
    logging.info(summary)

Output (example):

Organizing /Users/alice/Downloads ...
Done. Moved: 12, Unknown: 3

The dry_run=True mode lets you preview what the script would do without actually moving anything -- run it first to confirm the output looks right. The unique_dest() function guarantees you never silently overwrite a file with the same name in the destination folder. You can schedule this script to run daily using the schedule library covered earlier, or on macOS/Linux you can add it to your crontab with crontab -e for OS-level scheduling without a Python process running continuously.

Frequently Asked Questions

What is the difference between shutil and pathlib for file operations?

pathlib is for path manipulation and simple operations: checking if a file exists, reading its metadata, renaming it within the same filesystem. shutil is for heavier operations: copying files (with or without metadata), moving files across filesystems, and deleting entire directory trees. In practice you often use both -- pathlib to build and check paths, shutil to actually move or copy the files. Use shutil.move() for moves (it handles cross-filesystem moves gracefully) and shutil.copy2() when you want to preserve file modification times.

Should I use the schedule library or cron for scheduled tasks?

It depends on your setup. The schedule library is pure Python and works identically on Windows, macOS, and Linux -- great for scripts you want to be portable or that run within an existing Python process. Cron (on Linux/macOS) and Task Scheduler (on Windows) are OS-level schedulers that are more reliable for long-running production tasks because they survive reboots automatically and do not require a Python process to stay running. For personal automation scripts on a development machine, schedule is simpler. For server deployments, lean on cron or a process manager like systemd.

When is it safe to use subprocess with shell=True?

Use shell=True only when the entire command is a string literal you control completely -- for example, a hardcoded one-liner like subprocess.run("ls -la /tmp | wc -l", shell=True). Never pass user input, command-line arguments, or any external data into a shell=True command string; doing so opens a shell injection vulnerability where an attacker can execute arbitrary commands. The list form (["ls", "-la", "/tmp"]) is safe with external data because each list element is passed directly to the OS without going through a shell interpreter.

How do I avoid getting blocked when scraping websites?

The main causes of blocks are too-fast request rates, missing request headers, and large-volume scraping. Add a delay between requests using time.sleep(random.uniform(1, 3)) -- randomized delays look more human than a fixed interval. Always set a User-Agent header in your requests session to identify your scraper politely: session.headers.update({"User-Agent": "MyBot/1.0 (research project)"}). Always check the site's robots.txt file before scraping -- if the path you want to scrape is listed as disallowed, respect it. For sites that require JavaScript to load content, switch to a tool like playwright or selenium instead of requests.

Why should automation scripts log to a file instead of just printing?

When a script runs unattended -- on a schedule overnight or as a background process -- there is no terminal to see the output. File logging means you can review what happened after the fact, including any errors. Python's built-in logging module automatically records timestamps, log levels (INFO, WARNING, ERROR), and full tracebacks on exceptions. Set up a RotatingFileHandler for long-running scripts to cap the log file size so it does not grow indefinitely: from logging.handlers import RotatingFileHandler, then RotatingFileHandler("script.log", maxBytes=1_000_000, backupCount=3) keeps the last 3MB of logs and discards older entries automatically.

How do I make an automation script safe to run multiple times?

Design for idempotency -- the script should produce the same result whether it runs once or ten times. For file organization scripts, this means checking if a file already exists in the destination before moving it (and using a counter suffix for duplicates, as shown in the real-life example). For database or API writes, check for existing records before inserting. For web scraping pipelines, track which pages or records have already been collected in a CSV or SQLite database, and skip them on subsequent runs. The general pattern is: check state first, act only if the desired state is not already present.

Conclusion

We have covered four practical automation categories in this guide: file and folder operations with pathlib and shutil, web data collection with requests and BeautifulSoup, scheduled jobs with the schedule library, and system command automation with subprocess. The real-life Downloads Organizer script ties these concepts together into a complete, production-ready tool with duplicate handling, configurable rules, dry-run mode, and file logging.

The best next step is to adapt the Downloads Organizer to your own situation -- add more file types, change the destination folders, or hook it into schedule to run automatically every morning. Once you have the pattern down, you will start seeing automation opportunities everywhere: renaming podcast downloads, archiving old project folders, pulling daily exchange rates from an API, or auto-committing generated reports to git.

For deeper reading, the official Python documentation covers pathlib, shutil, and subprocess in full detail. The schedule library docs have examples for every scheduling pattern you might need. And BeautifulSoup4's documentation is an excellent reference for parsing more complex HTML structures.

Async Await Code Example in Python

In the previous section we created an asynchronous version manually. Here’s the same outcome but written with the async await syntax. As you’ll notice it is very similar to the original synchronous version:

import time, datetime, time
import asyncio

import time, datetime, timeit

customer_queue = [ "C1", "C2", "C3"  ]

def get_next_customer():
    return customer_queue.pop(0)    #Get the first customer from list

async def cook_hamburger(customer):     
    start_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Start cooking hamberger for customer")
    await asyncio.sleep(2)   # Sleep but release control
    end_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Finish cooking hamberger for customer.  Total {end_customer_timer-start_customer_timer} seconds\n")

async def run_shop():
    cooking_queue = []

    while customer_queue:
        curr_customer = get_next_customer()
        cooking_queue.append(  cook_hamburger(curr_customer)  )   #this returns a task only

    #cooking_queue[] has all the async tasks
    await asyncio.gather( *cooking_queue )      #Run all in parallel

def main():
    print('Hamburger Shop')
    start = timeit.default_timer()

    asyncio.run( run_shop() )           #Start the event loop

    stop = timeit.default_timer()
    print(f"** Total runtime: {stop-start} seconds ***")

if __name__ == '__main__':
    main()

Output as follows:

Let’s walk through the code:

  • Firstly, the async await is available from the library asyncio hence the import asyncio
  • There’s funny set of async keywords which precede the def run_shop() and the def cook_hamburger(customer) functions. In addition the run_shop() is no longer called directly, instead it is called with a asyncio.run( run_shop() ) function call. So here’s what is happening:
    • The asyncio.run() function is the trigger for the so-called event loop. It continues to run forever until all the tasks given to it are completed. You must pass it a function with the async def... prefix hence why run_shop() has the async prefix
    • In the async def run_shop() function call, the code iterates while there are customers in the queue to process, and then there’s a call to cook_hamburger(curr_customer) for each customer. A direct call to the customer does not actually call the function but instead creates a task to execute this. That is what the async tells the compiler – that when called directly, return a task.
    • At the end of the function code in def run_shop() there’s a call to function await asyncio.gather( *cooking_queue). There’s a few things going on here:
      • The await keywords indicates that you need wait for the work to complete but python can do something else in the meantime
      • The call to gather() actually executes all the tasks given to it as a parameter collectively as a group and then returns the results sequentially (please note that the order of the tasks being executed may be random)
      • The *customer_queue simply expands the list into a list of parameter items. So for example if customer_queue[] == [ '1', '2', '3'] then the gather( *customer_queue) would be the same as gather( '1', '2', '3').
    • When the await asyncio.gather( *customer_queue ) is called, the await keyword releases control to any activities that are pending and one of them would be to the calls to function cook_hamburger() which was added to the customer_queue list. Hence calls to cook_hamburger() would be triggered.
    • Within cook_hamburger() there is also an await asyncio.sleep(2). This simply waits for 2 seconds, however, it does not force the program to wait for the 2 seconds to complete, instead the await keyword releases python to do something else in the meantime. This is similar to step 3 in Figure 2 where the chef/waiter puts the hamburger on the grill, but then doesn’t wait for the 2 second but instead does something else (i.e. serve the next customer)
  • The asyncio.run() are new keywords as part of python 3.7. In older versions of python you may see the following but it is the same as simply running asyncio.run( run_shop() ) :
    • loop = asyncio.get_event_loop()
    • loop.run_until_complete(run_shop())
    • loop.close()
  • As you will notice, this is very similar to the synchronous code that covers Figure 1 above. This is the beauty of async/await

So remember, whenever there’s an await then that means python pauses at that point for that task to complete but then also releases python to do something else. That’s how the performance improvement occurs. In this example, the runtime of this is 2 seconds instead of the sequential 6 seconds!

Async Asynchronous Calling Another Async Function Code Example

Suppose you want t also call another async function once your first async function is completed – how do you go about this? Remember the rule, if you want to run something asynchronously, you have to use the await keyword, and that the function you’re calling has to be defined with async def ...

To continue with the restaurant theme, suppose that after the hamburger is cooked you ask an assistant to put the hamburger into a takeaway bag which takes 1 second. This is also another task that you need not ‘block’ and wait for it to complete. Hence, this action can be put into a function which is defined as an async. Here’s what the code can look like:

import time, datetime, time
import asyncio

customer_queue = [ "C1", "C2", "C3" ]

def get_next_customer():
    return customer_queue.pop(0)    #Get the first customer from list

async def cook_hamburger(customer):     
    start_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Start cooking hamberger for customer")
    await asyncio.sleep(2)   # Sleep but release control
    end_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Finish cooking hamberger for customer.  Total {end_customer_timer-start_customer_timer} seconds")
    await put_hamburger_in_takeaway_bag( customer )

async def put_hamburger_in_takeaway_bag( customer):
    start_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Start packing hamberger")
    await asyncio.sleep(1)   # It takes 2 seconds to cook the hamburger
    end_customer_timer = timeit.default_timer()
    print( f"[{customer}]: Finish packing hamberger.  Total {end_customer_timer-start_customer_timer} seconds\n")

async def run_shop():
    cooking_queue = []

    while customer_queue:
        curr_customer = get_next_customer()
        cooking_queue.append( cook_hamburger(curr_customer) )   #Get each of the event loops
    await asyncio.gather( *cooking_queue )      #Run all in parallel

def main():
    print('Hamburger Shop')
    start = timeit.default_timer()
    asyncio.run( run_shop() )           #Start the event loop 
    stop = timeit.default_timer()
    print(f"** Total runtime: {stop-start} seconds ***")

if __name__ == '__main__':
    main()

The output would be:

See how once the hamburger is cooked (e.g. [C1]: Finish cooking hamburger for customer. Total 2.000924572115764 seconds), then immediately afterwards you have the [C1]: Start packing hamburger step but also gets called asynchronously.

Async Await Real World Example With Web Crawler in Python

One difficulty in learning Async / Await is that many examples provided simply provide the asyncio.sleep() as an example which is helpful to understand the concept, but not very helpful when you want to make something more useful. Let’s try a more complex example where you want to get some stock data from finance.yahoo.com and then, for that same stock, you also get the first 3 newspaper articles from news.google.com in the last 24 hours.

Now one thing you will realise is that await only works with functions that are defined as async. So you cannot call any function with await. Why? Well recall that when you call await you are expecting a function to return a task and not actually call the function, hence that function needs to be defined as async in order to tell python that it returns a task to be executed at the next available time.

Let’s see the synchronous version of the code:

import asyncio, requests, timeit
from bs4 import BeautifulSoup
from pygooglenews import GoogleNews

stock_list = [ "TSLA", "AAPL"]

def get_stock_price_data(stock):
    print(f"-- getting stock data for {stock}")
    data = {"stock":stock, "price_open":0, "price_close":0 }
    stock_page = requests.get( 'https://finance.yahoo.com/quote/' + stock, headers={'Cache-Control': 'no-cache',  "Pragma": "no-cache"})

    soup = BeautifulSoup(stock_page.text, 'html.parser')
    #<fin-streamer active="" class="Fw(b) Fz(36px) Mb(-4px) D(ib)" data-field="regularMarketPrice" data-pricehint="2" data-symbol="TSLA" data-test="qsp-price" data-trend="none" value="759.63">759.63</fin-streamer>
    data['price_close'] = soup.find('fin-streamer', attrs={"data-symbol":stock, "data-field":"regularMarketPrice"} ).text

    #<td class="Ta(end) Fw(600) Lh(14px)" data-test="OPEN-value">723.25</td>
    data['price_open'] = soup.find( attrs={"data-test":"OPEN-value"}).text

    return data

def get_recent_news(stock):
    print(f"-- getting news data for {stock}")
    gn = GoogleNews()
    search = gn.search(f"stocks {stock}", when = '24h')
    news = search['entries'][0:3]
    return news

def print_stock_update(stock, data, news):
    print(f"Stock:{ stock }")
    price_change = 0
    if int(float(data['price_open'])) != 0: price_change = round( 100 * ( float( data['price_close'])/float(data['price_open'])-1), 2)
    print(f"Open Price:{data['price_open']} Close Price:{data['price_close']} Change:{price_change}% ")
    print("Latest News:")
    for news_item in news:        
        print( f"{news_item.published}:{news_item.source.title} - {news_item.title}" )
    print("\n")

def process_stocks():
    for stock in stock_list:
        data = get_stock_price_data( stock )
        news=[]
        news = get_recent_news( stock )
        print_stock_update(stock, data, news)

if __name__ == '__main__':
    start_timer = timeit.default_timer()
    process_stocks()
    end_timer = timeit.default_timer()

    print(f"** Total runtime: {end_timer-start_timer} seconds ***")

Output as follows:

So what’s happening here. Well, you are looping through two stocks TSLA and AAPL, and for each stock the following happens sequentially:

  • A call to data = get_stock_price_data( stock ) occurs in order to make a call to requests.get( 'https://finance.yahoo.com/quote/' + stock) to get the HTML page for the TSLA stock. Effectively, this page: https://finance.yahoo.com/quote/TSLA
  • Next we use BeautifulSoup() in order to find the HTML snippet that contains the stock price data for the opening price and the closing price:
  • After the call to yahoo is complete, then there’s a call to news = get_recent_news( stock ) which uses the module pygooglenews to get the latest google news. In fact we have used this function in our previous Twitter Bot article.
  • Once this is all done, that output is printed out with the call to print_stock_update(stock, data, news)

Clearly this could be called asynchronously as we are looping each time for each stock, and then also the call to get the stock data is independent to getting the news data. However, one thing has to happen sequentially is the print_stock_update(stock, data, news) which has to wait for both the async calls to complete.

One wait to try is to simply call the website download with:

stock_page = await requests.get( 'https://finance.yahoo.com/quote/' + stock, headers={'Cache-Control': 'no-cache',  "Pragma": "no-cache"})

However, you will get the following error:

The reason is, as you may have guessed, is that the requests.get() is not created with the async def... construct and hence cannot be called asynchronously.

What you can do however is to use another ‘get’ web page module called httpx. This function is defined with async def... and can be called similar to requests. That same line would be re-written as:

import httpx
#....

async def get_stock_price_data(stock):
    print(f"-- stock data:getting stock data for {stock}")
    data = {"stock":stock, "price_open":0, "price_close":0 }

    #*** instead of requests.get('https://finance.yahoo.com/quote/' + stock)) ****
    client = httpx.AsyncClient() 
    stock_page = await client.get( 'https://finance.yahoo.com/quote/' + stock)

    soup = BeautifulSoup(stock_page.text, 'html.parser')
    #<fin-streamer active="" class="Fw(b) Fz(36px) Mb(-4px) D(ib)" data-field="regularMarketPrice" data-pricehint="2" data-symbol="TSLA" data-test="qsp-price" data-trend="none" value="759.63">759.63</fin-streamer>
    data['price_close'] = soup.find('fin-streamer', attrs={"data-symbol":stock, "data-field":"regularMarketPrice"} ).text

    #<td class="Ta(end) Fw(600) Lh(14px)" data-test="OPEN-value">723.25</td>
    data['price_open'] = soup.find( attrs={"data-test":"OPEN-value"}).text
    print(f"-- stock data:done {stock}")
    return data

Ok, that works well. However, but what about the GoogleNews() code. There is no such async version of this function, so how can this be called asynchronously? Well for this, you can actually wrap it around a new thread. A ‘thread’ is way to run a piece of code under the same CPU process but in a parallel. It warrants a whole separate article but for now you can think of it as finding a separate space to execute this independent of the current execution path. However, to execute this in a separate thread, there’s a bit more involved.

The code looks like the following:

### Original Version
def get_recent_news(stock):
    print(f"-- stock news:getting stock data for {stock}")
    gn = GoogleNews()
    search = gn.search(f"stocks {stock}", '24h') #Slow code to run asynchronously
    news = search['entries'][0:3]
    print(f"-- stock news:done {stock}")
    return news

### Asynchronous Version
async def get_recent_news(stock):
    print(f"-- stock news:getting stock data for {stock}")
    gn = GoogleNews()
    search = await asyncio.get_event_loop().run_in_executor( None, gn.search, f"stocks {stock}", '24h')
    news = search['entries'][0:3]
    print(f"-- stock news:done {stock}")
    return news

Here what’s happening is that firstly we are using the await keyword to call the gn.search() function which is now being called through this asyncio.get_event_loop().run_in_executor( .. ) function call. What’s happening here is that we are asking the asyncio module to get access to the event loop (that piece of code that continuously checks for tasks to be done) and then to run in a separate thread. The way it is called is that the parameters must be passed in separate to the function call and hence why the parameters are to be passed in after the function name itself. You will also notice that the whole function can now be defined as async def get_recent_news(stock)

How To Mix Asynchronous And Synchronous Code With Await Async in Python

Now the final problem to be solved is how do we call the two functions of get_stock_price_data( stock ) and get_recent_news(stock) to be run asynchronously, but then wait for both to finish, and THEN run the print. This is where these steps should all be grouped under one function. This is the trick to mix asynchronous and synchronous code.

In order to run a group of tasks in parallel as a group you use asyncio.gather(). However, if you want to execute a synchronous function when ALL tasks that were given to asyncio.gather() is complete, then you should wrap it in another asyncio.gather()

async def process_stock_batch(stock):
    (data, news) = await asyncio.gather( get_stock_price_data( stock ), get_recent_news(stock)  )
    print('-- print:request printing')
    print_stock_update(stock, data, news) 
    print('-- print:done')

async def process_stocks():
    run_stock_list = []
    for stock in stock_list:
        run_stock_list.append(   process_stock_batch(stock) )
    await asyncio.gather( *run_stock_list )

Before we solve it for the real world examples, lets show a simpler example. Suppose we had the following example:

import asyncio, timeit

async def get_web_data_A(index):
    await asyncio.sleep(1)
    print(f"Get Web Data-A[{index}] - sleep 1 second")
        
async def get_web_data_B(index):
    await asyncio.sleep(1)
    print(f"Get Web Data-B[{index}] - sleep 1 second")

async def process(index, start_timer):
    await asyncio.gather( get_web_data_A(index), get_web_data_B(index) )
    print(f"Calculate [{index}] - Elapsed time:[{timeit.default_timer()-start_timer}]")

async def run_all():
    start_timer = timeit.default_timer()
    for index in range(0,2):
        await process(index, start_timer)

if __name__ == '__main__':
    asyncio.run( run_all() )

This has the following output:

What is encouraging with this code, is that even though the call to get_web_data_A() and get_web_data_B() both sleep for 1 second, since they were doing that asynchronously, then the total runtime is still just a little over 1 second. This can be shown by the Calculate [0]... output. However, the problem is that the code still iterates each index sequentially, meaning, that index 0 is processed completely first, and once that’s done, then index 1 is processed. What we want instead is to run all the slow get_web_data_A() and get_web_data_B() first, and then run the code to calculate afterwards. This is where you need to first create the tasks for ALL the iterations, and then call gather() on all the tasks. See the following code:

import asyncio, timeit

async def get_web_data_A(index):
    await asyncio.sleep(1)
    print(f"Get Web Data-A[{index}] - sleep 1 second")
        
async def get_web_data_B(index):
    await asyncio.sleep(1)
    print(f"Get Web Data-B[{index}] - sleep 1 second")

async def process(index, start_timer):
    await asyncio.gather( get_web_data_A(index), get_web_data_B(index) )
    print(f"Calculate [{index}] - Elapsed time:[{timeit.default_timer()-start_timer}]")

async def run_all_2():
    start_timer = timeit.default_timer()
    task_queue = []
    for index in range(0,2):
        task_queue.append( process(index, start_timer) )
    await asyncio.gather( *task_queue )

if __name__ == '__main__':
    asyncio.run( run_all_2() )

Here, in the function async def run_all_2() when we loop, we do not call the blocking code await asyncio.gather... inside the for loop. Instead, we are adding all the tasks to call process(..) into a list called task_queue[], and then at the end of the for loop we are calling await asyncio.gather( *task_queue ) on all tasks in one go. Hence, the output is as follows:

You’ll notice that ALL the get_web_data_A() and get_web_data_B() are being called asynchronously, and then the calculate function is called on all the available data. Hence, the elapsed time for all the iterations is only 1 second, compared to the previous 2 seconds.

So what does this mean for our real world example for getting stock data from Yahoo and then calling Google News asynchronously, and then only printing the data once both are done? Well, the same principle applies. The code is as follows:

import asyncio, httpx, timeit
from bs4 import BeautifulSoup
from pygooglenews import GoogleNews

stock_list = [ "TSLA", "AAPL"]

async def get_stock_price_data(stock):
    print(f"-- stock data:getting stock data for {stock}")
    data = {"stock":stock, "price_open":0, "price_close":0 }

    client = httpx.AsyncClient()
    stock_page = await client.get( 'https://finance.yahoo.com/quote/' + stock)

    soup = BeautifulSoup(stock_page.text, 'html.parser')
    #<fin-streamer active="" class="Fw(b) Fz(36px) Mb(-4px) D(ib)" data-field="regularMarketPrice" data-pricehint="2" data-symbol="TSLA" data-test="qsp-price" data-trend="none" value="759.63">759.63</fin-streamer>
    data['price_close'] = soup.find('fin-streamer', attrs={"data-symbol":stock, "data-field":"regularMarketPrice"} ).text

    #<td class="Ta(end) Fw(600) Lh(14px)" data-test="OPEN-value">723.25</td>
    data['price_open'] = soup.find( attrs={"data-test":"OPEN-value"}).text
    print(f"-- stock data:done {stock}")
    return data

async def get_recent_news(stock):
    print(f"-- stock news:getting stock data for {stock}")
    gn = GoogleNews()
    search = await asyncio.get_event_loop().run_in_executor( None, gn.search, f"stocks {stock}", '24h')
    news = search['entries'][0:3]
    print(f"-- stock news:done {stock}")
    return news

def print_stock_update(stock, data, news):
    print('-- print:starting print')
    print(f"Stock:{ stock }")
    price_change = 0
    if int(float(data['price_open'])) != 0: price_change = round( 100 * ( float( data['price_close'])/float(data['price_open'])-1), 2)
    print(f"Open Price:{data['price_open']} Close Price:{data['price_close']} Change:{price_change}% ")
    print("Latest News:")
    for news_item in news:        
        print( f"{news_item.published}:{news_item.source.title} - {news_item.title}" )

    print("\n")

async def process_stock_batch(stock):
    (data, news) = await asyncio.gather( get_stock_price_data( stock ), get_recent_news(stock)  )
    print('-- print:request printing')
    print_stock_update(stock, data, news) 
    print('-- print:done')

async def process_stocks():
    run_stock_list = []
    for stock in stock_list:
        run_stock_list.append(   process_stock_batch(stock) )
    await asyncio.gather( *run_stock_list )

if __name__ == '__main__':
    start_timer = timeit.default_timer()
    asyncio.run( process_stocks() )
    end_timer = timeit.default_timer()

    print(f"** Total runtime: {end_timer-start_timer} seconds ***")

The key bit of code is in the async def process_stocks() which now iterates over each of the stocks, creates tasks, and then calls await asyncio.gather( *run_stock_list ) on all the stocks in one go, and then in the function process_stock_batch(stock) we have the asynchronous call to (data, news) = await asyncio.gather( get_stock_price_data( stock ), and then the synchronous call to print_stock_update(stock, data, news) once both web data is complete.

Conclusion

The await and async function is an incredibly useful feature of python which takes a bit of getting used to in order to understand the concept, but once you’ve got the hang of it, it can be incredibly useful to get an improve of the performance of your code by leveraging idle time where you are waiting for a task to complete. Remember to be sure about the sequencing and being mindful of whether you care to have a follow-up activity once that task is completed, or you can simply continue to execute.

This not easy to grasp as a beginner, but follow the example code above, and if you get stuck feel free to reach out through our email list below.

How To Automate Tasks with Python: A Practical Guide

How To Automate Tasks with Python: A Practical Guide

Last Updated: June 14, 2026

Beginner to Intermediate

You have a folder full of downloaded files named document(1).pdf, screenshot_2024.png, and report_final_v3_FINAL.xlsx. Every week you spend 20 minutes sorting them by hand. Or maybe you copy data from a website into a spreadsheet every morning, or you run the same three terminal commands every time you start work. These are the tasks Python was built to eliminate. With a few dozen lines of code you can turn a painful weekly chore into something that runs itself while you drink coffee.

Python ships with a rich standard library for automation — pathlib for file operations, subprocess for running system commands, smtplib for sending email — and the broader ecosystem adds libraries like schedule for periodic jobs and requests for web data collection. No special setup is needed beyond a standard Python 3.8+ install. For scheduling, you will need to install schedule with pip, but everything else in this article is built in.

In this guide we will cover four practical automation categories: organizing files and folders with pathlib and shutil, collecting web data with requests and BeautifulSoup, scheduling jobs to run automatically with the schedule library, and running system commands with subprocess. Each section ends with working code you can adapt to your own situation. By the end you will have a toolkit of reusable automation patterns and a complete script that organizes a messy downloads folder automatically.

Pubs - Python How To Program
Written by Pubs

Python developer and educator with 15+ years building production systems across data engineering, web APIs, and AI tooling. Founder of Python How To Program.

Automate Tasks with Python: Quick Example

Here is a self-contained script that renames and moves files in a folder based on their extension — one of the most common automation tasks you will ever write:

# sort_downloads.py
from pathlib import Path
import shutil

FOLDER = Path.home() / "Downloads"
DESTINATIONS = {
    ".pdf":  "Documents",
    ".png":  "Images",
    ".jpg":  "Images",
    ".xlsx": "Spreadsheets",
    ".csv":  "Spreadsheets",
    ".zip":  "Archives",
}

for file in FOLDER.iterdir():
    if file.is_file() and file.suffix in DESTINATIONS:
        dest_folder = FOLDER / DESTINATIONS[file.suffix]
        dest_folder.mkdir(exist_ok=True)
        shutil.move(str(file), dest_folder / file.name)
        print(f"Moved: {file.name} -> {DESTINATIONS[file.suffix]}/")

Output (example):

Moved: invoice_march.pdf -> Documents/
Moved: screenshot_2024.png -> Images/
Moved: sales_report.xlsx -> Spreadsheets/

The script uses Path.home() to get the user’s home directory regardless of operating system, iterdir() to loop over every item in the folder, and shutil.move() to relocate the file. The mkdir(exist_ok=True) call creates the destination folder if it does not already exist — no crash if it is there, no error if it is not. We will build a more complete version in the real-life example section, including duplicate detection and logging.

What Is Python Automation and When Should You Use It?

Automation means writing code that performs a repetitive task so you do not have to. The rule of thumb is: if you have done something manually more than three times, it is worth automating. Python is the go-to language for automation because it has concise syntax, a massive standard library, and third-party packages that cover almost every automation use case out of the box.

The table below maps common repetitive tasks to the Python tools that handle them:

Task TypePython ToolWhen to Use
File/folder operationspathlib, shutilRenaming, moving, copying, deleting files
Reading/writing filesbuilt-in open(), csvLog parsing, report generation, data transformation
Web data collectionrequests, BeautifulSoupPulling prices, headlines, tables from websites
Scheduled jobsschedule, APSchedulerRunning tasks daily, hourly, or on a cron-like schedule
System commandssubprocessRunning CLI tools, shell scripts, git, ffmpeg
Email sendingsmtplib, yagmailAutomated reports, alerts, notifications

A good automation script is idempotent — running it twice produces the same result as running it once. It handles edge cases (missing files, network errors, duplicate names) without crashing. And it logs what it did so you can review the results later. Keep these principles in mind as we work through each section.

Python developer sorting files automatically with pathlib and shutil
pathlib.iterdir() — because sorting files by hand is how you waste a Tuesday.

Automating File and Folder Operations

The pathlib module (Python 3.4+) provides an object-oriented interface for working with file paths that is far more readable than the older os.path approach. Combined with shutil for copy/move operations, these two modules cover 90% of file automation tasks.

Finding and Filtering Files with pathlib

The glob() and rglob() methods on a Path object let you find files matching a pattern across an entire directory tree. glob() searches one level deep; rglob() (recursive glob) searches all subdirectories:

# find_files.py
from pathlib import Path

base = Path("/tmp/project")  # change to your actual folder

# Find all Python files in this folder only
py_files = list(base.glob("*.py"))
print("Python files (top-level):", [f.name for f in py_files])

# Find all log files anywhere in the tree
log_files = list(base.rglob("*.log"))
print("Log files (all depths):", [f.relative_to(base) for f in log_files])

# Find files larger than 1 MB
large_files = [f for f in base.rglob("*") if f.is_file() and f.stat().st_size > 1_000_000]
print("Files over 1MB:", [f.name for f in large_files])

Output (example):

Python files (top-level): ['app.py', 'utils.py', 'config.py']
Log files (all depths): [PosixPath('logs/app.log'), PosixPath('logs/error.log')]
Files over 1MB: ['dataset.csv', 'backup.tar.gz']

f.stat().st_size returns the file size in bytes. The expression f.relative_to(base) strips the base directory from the path so you see logs/app.log instead of the full absolute path. Both are useful for building reports of what your script found before it starts moving anything.

Renaming and Copying Files Safely

Before moving or renaming files in an automation script, always check whether the destination already exists. Blindly overwriting a file can cause data loss that is impossible to reverse:

# safe_copy.py
from pathlib import Path
import shutil

def safe_copy(src: Path, dest_dir: Path) -> Path:
    """Copy src into dest_dir, appending a counter if the name already exists."""
    dest_dir.mkdir(parents=True, exist_ok=True)
    dest = dest_dir / src.name

    if dest.exists():
        counter = 1
        while dest.exists():
            stem = src.stem
            dest = dest_dir / f"{stem}_{counter}{src.suffix}"
            counter += 1

    shutil.copy2(str(src), dest)   # copy2 preserves metadata (timestamps)
    return dest

# Demo
src_file = Path("/tmp/report.pdf")
src_file.write_text("dummy content")   # create test file
result = safe_copy(src_file, Path("/tmp/archive"))
print(f"Copied to: {result}")

result2 = safe_copy(src_file, Path("/tmp/archive"))  # simulate duplicate
print(f"Duplicate handled: {result2}")

Output:

Copied to: /tmp/archive/report.pdf
Duplicate handled: /tmp/archive/report_1.pdf

The shutil.copy2() function copies the file content AND preserves the original modification time, which is important when you want archive copies to retain their original dates. The counter loop ensures you never silently overwrite an existing file — a critical safety net for any file automation script.

Developer inspecting duplicate files during automation
Duplicate detected. Counter incremented. Crisis averted.

Automating Web Data Collection

Web scraping lets your scripts pull data from websites automatically. The standard approach uses requests to download HTML and BeautifulSoup to parse it. Install both with pip install requests beautifulsoup4.

Fetching and Parsing a Web Page

We will use quotes.toscrape.com, a site built specifically for scraping practice. It serves reliable HTML with a stable structure, so this code will continue to work without modification.

Here is the HTML structure of each quote on that page, so you can see exactly what the selectors are targeting:

<!-- HTML structure of each quote on quotes.toscrape.com -->
<div class="quote">
    <span class="text">"The world as we have created it..."</span>
    <span>
        by <small class="author">Albert Einstein</small>
    </span>
    <div class="tags">
        <a class="tag" href="/tag/change/page/1/">change</a>
    </div>
</div>

Now the scraping code:

# scrape_quotes.py
import requests
from bs4 import BeautifulSoup

def scrape_quotes(url: str) -> list[dict]:
    response = requests.get(url, timeout=10)
    response.raise_for_status()   # raises HTTPError for 4xx/5xx responses

    soup = BeautifulSoup(response.text, "html.parser")
    results = []

    for quote_div in soup.select("div.quote"):
        text_elem = quote_div.select_one("span.text")
        author_elem = quote_div.select_one("small.author")
        tag_elems = quote_div.select("a.tag")

        # Defensive: check before accessing .text
        text   = text_elem.text.strip()   if text_elem   else "Unknown"
        author = author_elem.text.strip() if author_elem else "Unknown"
        tags   = [t.text for t in tag_elems]

        results.append({"quote": text, "author": author, "tags": tags})

    return results

quotes = scrape_quotes("https://quotes.toscrape.com")
for q in quotes[:3]:
    print(f'{q["author"]}: {q["quote"][:60]}...')
    print(f'  Tags: {", ".join(q["tags"])}')
    print()

Output:

Albert Einstein: "The world as we have created it is a process of...
  Tags: change, deep-thoughts, thinking, world

J.K. Rowling: "It is our choices, Harry, that show what we truly a...
  Tags: abilities, choices

Albert Einstein: "There are only two ways to live your life. One is...
  Tags: inspirational, life, live, miracle, miracles

response.raise_for_status() is a one-line safety net — it raises a requests.HTTPError if the server returns a 4xx or 5xx status code instead of silently continuing with bad data. The defensive checks (text_elem.text if text_elem else "Unknown") protect against pages where an element is missing, which happens constantly on real-world sites.

Saving Scraped Data to CSV

Collecting data is only half the job — you need to store it somewhere useful. Writing to CSV with Python’s built-in csv module keeps the output format-agnostic and readable in any spreadsheet application:

# save_to_csv.py
import csv
import requests
from bs4 import BeautifulSoup

def scrape_quotes(url):
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    results = []
    for div in soup.select("div.quote"):
        text   = div.select_one("span.text")
        author = div.select_one("small.author")
        results.append({
            "quote":  text.text.strip()   if text   else "",
            "author": author.text.strip() if author else "",
        })
    return results

quotes = scrape_quotes("https://quotes.toscrape.com")

output_file = "quotes.csv"
with open(output_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["author", "quote"])
    writer.writeheader()
    writer.writerows(quotes)

print(f"Saved {len(quotes)} quotes to {output_file}")

Output:

Saved 10 quotes to quotes.csv

Always pass encoding="utf-8" when writing CSV files — without it, non-ASCII characters (curly quotes, accented letters, em-dashes) will cause encoding errors or garbled output on Windows. The newline="" argument is also required by Python’s csv module to prevent extra blank lines on Windows.

Python web scraping with requests and BeautifulSoup
BeautifulSoup.select() — structured extraction from unstructured chaos.

Running Automated Jobs on a Schedule

Writing the automation code is only half the job. You also need it to run at the right time, without you having to remember to start it. The schedule library provides a clean Python API for defining when jobs should run — every minute, every day at 9am, every Monday, and so on. Install it with pip install schedule.

Basic Scheduling with the schedule Library

The schedule library works with a simple event loop: you register jobs with schedule.every(), then call schedule.run_pending() in a loop to check whether any jobs are due:

# basic_schedule.py
import schedule
import time
from datetime import datetime

def morning_report():
    print(f"[{datetime.now():%H:%M:%S}] Good morning! Running daily report...")
    # your actual report logic goes here

def hourly_check():
    print(f"[{datetime.now():%H:%M:%S}] Hourly check complete.")

# Schedule the jobs
schedule.every().day.at("09:00").do(morning_report)
schedule.every().hour.do(hourly_check)
schedule.every(30).minutes.do(lambda: print("30-min heartbeat"))

print("Scheduler running. Press Ctrl+C to stop.")
while True:
    schedule.run_pending()
    time.sleep(30)   # check every 30 seconds to save CPU

Output (example at 09:00:00):

Scheduler running. Press Ctrl+C to stop.
[09:00:00] Good morning! Running daily report...
[09:00:00] Hourly check complete.
[09:00:00] 30-min heartbeat
[09:30:00] 30-min heartbeat
[10:00:00] Hourly check complete.

The time.sleep(30) inside the loop is important — without a sleep, the loop burns 100% of one CPU core doing nothing. Sleeping 30 seconds means jobs can fire up to 30 seconds late (acceptable for most automation), while using negligible CPU. If you need second-level precision, use time.sleep(1) instead.

Handling Errors in Scheduled Jobs

When a job in a scheduled loop raises an unhandled exception, the whole process crashes and no more jobs run. Wrap your job functions with a try/except to log errors and keep the loop running:

# robust_schedule.py
import schedule
import time
import logging
from datetime import datetime

logging.basicConfig(
    filename="automation.log",
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)

def run_safely(job_func):
    """Decorator that catches exceptions and logs them without crashing the loop."""
    def wrapper():
        try:
            job_func()
            logging.info(f"{job_func.__name__} completed successfully")
        except Exception as exc:
            logging.error(f"{job_func.__name__} failed: {exc}", exc_info=True)
    return wrapper

def daily_scrape():
    # Simulating a job that sometimes fails
    import random
    if random.random() < 0.3:
        raise ConnectionError("Network unavailable")
    print("Scraped data successfully.")

schedule.every().day.at("08:00").do(run_safely(daily_scrape))

while True:
    schedule.run_pending()
    time.sleep(60)

The run_safely decorator wraps any function so that exceptions are caught, logged to a file, and the scheduler continues. The exc_info=True argument tells Python's logging module to include the full traceback in the log file -- essential for debugging failures that happen at 3am while you are asleep.

Python schedule library running automated jobs on a timer
schedule.run_pending() -- the world's most patient while loop.

Running System Commands with subprocess

Sometimes the right tool for a job is a command-line program, not a Python library. subprocess.run() lets your Python script launch any system command and capture its output, making it easy to orchestrate CLI tools like git, ffmpeg, or database utilities.

Basic subprocess Usage

# run_commands.py
import subprocess

# Run a command and capture its output
result = subprocess.run(
    ["python3", "--version"],
    capture_output=True,
    text=True,       # decode bytes to str automatically
    check=False,     # don't raise on non-zero exit code
)

print("Return code:", result.returncode)
print("Stdout:", result.stdout.strip())
print("Stderr:", result.stderr.strip())

# Run a command that lists files (works on macOS/Linux)
ls_result = subprocess.run(
    ["ls", "-la", "/tmp"],
    capture_output=True,
    text=True,
)
print("\nFirst 3 lines of /tmp listing:")
for line in ls_result.stdout.strip().split("\n")[:3]:
    print(" ", line)

Output:

Return code: 0
Stdout: Python 3.11.4
Stderr:

First 3 lines of /tmp listing:
  total 0
  drwxrwxrwt  20 root  wheel  640 Jun 12 09:31 .
  drwxr-xr-x  20 root  wheel  640 May 18 11:02 ..

Always use a list for the command argument (["ls", "-la", "/tmp"]) rather than a string ("ls -la /tmp"). The list form avoids shell injection vulnerabilities -- if any part of the command comes from user input, a string passed to shell=True can execute arbitrary shell commands. The list form is always safer.

Practical Example: Automating Git Operations

Here is a real-world use case -- a script that automatically stages, commits, and pushes changes in a git repository. This is useful for automating backup commits or syncing generated files:

# git_autocommit.py
import subprocess
from datetime import datetime
from pathlib import Path

def git_run(args: list[str], cwd: Path) -> subprocess.CompletedProcess:
    """Run a git command in the specified directory."""
    return subprocess.run(
        ["git"] + args,
        capture_output=True,
        text=True,
        cwd=cwd,
    )

def auto_commit(repo_path: Path, message: str = None) -> bool:
    """Stage all changes and commit if there is anything to commit."""
    # Check for changes
    status = git_run(["status", "--porcelain"], repo_path)
    if not status.stdout.strip():
        print("No changes to commit.")
        return False

    if message is None:
        message = f"Auto-commit {datetime.now():%Y-%m-%d %H:%M}"

    # Stage all changes
    git_run(["add", "-A"], repo_path)

    # Commit
    commit = git_run(["commit", "-m", message], repo_path)
    if commit.returncode == 0:
        print(f"Committed: {message}")
        return True
    else:
        print(f"Commit failed: {commit.stderr.strip()}")
        return False

# Usage (change to your actual repo path)
repo = Path("/tmp/my-project")
repo.mkdir(exist_ok=True)
auto_commit(repo, "Automated daily backup")

Output (when changes exist):

Committed: Automated daily backup

The helper function git_run() takes the git subcommand as a list and prepends "git", keeping the calling code clean. The cwd=cwd argument tells subprocess where to run the command -- without it, git would operate on whatever directory the script itself lives in, which is almost never what you want.

Real-Life Example: Automated Downloads Folder Organizer

We will now build a complete, production-ready script that watches your Downloads folder and organizes files into subfolders by type. It handles duplicates, logs every action, and can be scheduled to run automatically.

Python automation script organizing downloads folder with logging
Automation log: 47 files sorted. 3 duplicates handled. 0 manual clicks.
# downloads_organizer.py
import shutil
import logging
from pathlib import Path
from datetime import datetime

# --- Configuration ---
DOWNLOADS_DIR = Path.home() / "Downloads"
LOG_FILE = Path.home() / "downloads_organizer.log"

RULES = {
    "Documents": [".pdf", ".doc", ".docx", ".txt", ".rtf"],
    "Images":    [".jpg", ".jpeg", ".png", ".gif", ".svg", ".webp", ".heic"],
    "Videos":    [".mp4", ".mov", ".mkv", ".avi", ".m4v"],
    "Audio":     [".mp3", ".m4a", ".flac", ".wav", ".aac"],
    "Archives":  [".zip", ".tar", ".gz", ".rar", ".7z"],
    "Code":      [".py", ".js", ".html", ".css", ".json", ".sh", ".ipynb"],
    "Data":      [".csv", ".xlsx", ".xls", ".tsv", ".parquet"],
}

# Build reverse lookup: extension -> folder name
EXT_MAP = {ext: folder for folder, exts in RULES.items() for ext in exts}

# --- Logging setup ---
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)

def unique_dest(dest: Path) -> Path:
    """Append a counter to avoid overwriting existing files."""
    if not dest.exists():
        return dest
    counter = 1
    while True:
        candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
        if not candidate.exists():
            return candidate
        counter += 1

def organize(dry_run: bool = False) -> dict:
    """Move files from Downloads into categorized subfolders."""
    stats = {"moved": 0, "skipped": 0, "unknown": 0}

    for item in DOWNLOADS_DIR.iterdir():
        if not item.is_file():
            continue

        folder_name = EXT_MAP.get(item.suffix.lower())
        if folder_name is None:
            logging.info(f"SKIP (unknown type): {item.name}")
            stats["unknown"] += 1
            continue

        dest_dir = DOWNLOADS_DIR / folder_name
        dest = unique_dest(dest_dir / item.name)

        if dry_run:
            print(f"[DRY RUN] Would move: {item.name} -> {folder_name}/")
        else:
            dest_dir.mkdir(exist_ok=True)
            shutil.move(str(item), dest)
            logging.info(f"MOVED: {item.name} -> {folder_name}/{dest.name}")
            stats["moved"] += 1

    return stats

if __name__ == "__main__":
    print(f"Organizing {DOWNLOADS_DIR} ...")
    result = organize(dry_run=False)
    summary = f"Done. Moved: {result['moved']}, Unknown: {result['unknown']}"
    print(summary)
    logging.info(summary)

Output (example):

Organizing /Users/alice/Downloads ...
Done. Moved: 12, Unknown: 3

The dry_run=True mode lets you preview what the script would do without actually moving anything -- run it first to confirm the output looks right. The unique_dest() function guarantees you never silently overwrite a file with the same name in the destination folder. You can schedule this script to run daily using the schedule library covered earlier, or on macOS/Linux you can add it to your crontab with crontab -e for OS-level scheduling without a Python process running continuously.

Frequently Asked Questions

What is the difference between shutil and pathlib for file operations?

pathlib is for path manipulation and simple operations: checking if a file exists, reading its metadata, renaming it within the same filesystem. shutil is for heavier operations: copying files (with or without metadata), moving files across filesystems, and deleting entire directory trees. In practice you often use both -- pathlib to build and check paths, shutil to actually move or copy the files. Use shutil.move() for moves (it handles cross-filesystem moves gracefully) and shutil.copy2() when you want to preserve file modification times.

Should I use the schedule library or cron for scheduled tasks?

It depends on your setup. The schedule library is pure Python and works identically on Windows, macOS, and Linux -- great for scripts you want to be portable or that run within an existing Python process. Cron (on Linux/macOS) and Task Scheduler (on Windows) are OS-level schedulers that are more reliable for long-running production tasks because they survive reboots automatically and do not require a Python process to stay running. For personal automation scripts on a development machine, schedule is simpler. For server deployments, lean on cron or a process manager like systemd.

When is it safe to use subprocess with shell=True?

Use shell=True only when the entire command is a string literal you control completely -- for example, a hardcoded one-liner like subprocess.run("ls -la /tmp | wc -l", shell=True). Never pass user input, command-line arguments, or any external data into a shell=True command string; doing so opens a shell injection vulnerability where an attacker can execute arbitrary commands. The list form (["ls", "-la", "/tmp"]) is safe with external data because each list element is passed directly to the OS without going through a shell interpreter.

How do I avoid getting blocked when scraping websites?

The main causes of blocks are too-fast request rates, missing request headers, and large-volume scraping. Add a delay between requests using time.sleep(random.uniform(1, 3)) -- randomized delays look more human than a fixed interval. Always set a User-Agent header in your requests session to identify your scraper politely: session.headers.update({"User-Agent": "MyBot/1.0 (research project)"}). Always check the site's robots.txt file before scraping -- if the path you want to scrape is listed as disallowed, respect it. For sites that require JavaScript to load content, switch to a tool like playwright or selenium instead of requests.

Why should automation scripts log to a file instead of just printing?

When a script runs unattended -- on a schedule overnight or as a background process -- there is no terminal to see the output. File logging means you can review what happened after the fact, including any errors. Python's built-in logging module automatically records timestamps, log levels (INFO, WARNING, ERROR), and full tracebacks on exceptions. Set up a RotatingFileHandler for long-running scripts to cap the log file size so it does not grow indefinitely: from logging.handlers import RotatingFileHandler, then RotatingFileHandler("script.log", maxBytes=1_000_000, backupCount=3) keeps the last 3MB of logs and discards older entries automatically.

How do I make an automation script safe to run multiple times?

Design for idempotency -- the script should produce the same result whether it runs once or ten times. For file organization scripts, this means checking if a file already exists in the destination before moving it (and using a counter suffix for duplicates, as shown in the real-life example). For database or API writes, check for existing records before inserting. For web scraping pipelines, track which pages or records have already been collected in a CSV or SQLite database, and skip them on subsequent runs. The general pattern is: check state first, act only if the desired state is not already present.

Conclusion

We have covered four practical automation categories in this guide: file and folder operations with pathlib and shutil, web data collection with requests and BeautifulSoup, scheduled jobs with the schedule library, and system command automation with subprocess. The real-life Downloads Organizer script ties these concepts together into a complete, production-ready tool with duplicate handling, configurable rules, dry-run mode, and file logging.

The best next step is to adapt the Downloads Organizer to your own situation -- add more file types, change the destination folders, or hook it into schedule to run automatically every morning. Once you have the pattern down, you will start seeing automation opportunities everywhere: renaming podcast downloads, archiving old project folders, pulling daily exchange rates from an API, or auto-committing generated reports to git.

For deeper reading, the official Python documentation covers pathlib, shutil, and subprocess in full detail. The schedule library docs have examples for every scheduling pattern you might need. And BeautifulSoup4's documentation is an excellent reference for parsing more complex HTML structures.

Further Reading: For more details, see the Python asyncio documentation.

Frequently Asked Questions

What is async/await in Python?

async def defines a coroutine function and await pauses execution until an asynchronous operation completes. This enables concurrent I/O operations without threading, using the asyncio event loop.

When should I use async/await instead of threading?

Use async/await for I/O-bound tasks like network requests and database queries with many concurrent connections. Use threading for CPU-bound tasks or libraries that do not support async.

How do I run multiple async tasks concurrently?

Use asyncio.gather(task1(), task2()) to run multiple coroutines concurrently. Use asyncio.create_task() to schedule without immediately waiting.

What does ‘coroutine was never awaited’ mean?

You called an async function without await. Async functions return coroutine objects that must be awaited. Add await before the call or use asyncio.run() from synchronous code.

Can I mix synchronous and asynchronous code?

Yes. Use asyncio.run() to call async from sync. Use loop.run_in_executor() to run blocking functions inside async code without blocking the event loop.