Intermediate

SQLite is the perfect database for small to medium applications — no server to run, no configuration, just a file. But Python’s built-in sqlite3 module is synchronous: every query blocks until the database responds. In an async application built with asyncio, aiohttp, or FastAPI, that blocking call halts the entire event loop for every database operation. aiosqlite wraps Python’s sqlite3 module in an async interface, letting your database queries run without freezing everything else.

Install aiosqlite with pip install aiosqlite. There is nothing else to configure — it uses the same SQLite file your synchronous code already uses. The API closely mirrors sqlite3, so if you have used that module before, the transition is mostly adding async def, await, and async with in the right places.

This tutorial covers opening async connections, executing queries, fetching results, using transactions, handling row factories for dict-like access, and building a complete async task tracker backed by SQLite. By the end, you will know how to integrate aiosqlite into any asyncio-based project without blocking the event loop.

Async SQLite in 15 Lines: Quick Example

Here is the minimum aiosqlite script — create a table, insert a row, and read it back, all asynchronously:

# quick_aiosqlite.py
import asyncio
import aiosqlite

async def main():
    async with aiosqlite.connect('quick_demo.db') as db:
        await db.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )
        ''')
        await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ('Alice', 'alice@example.com'))
        await db.commit()

        async with db.execute("SELECT id, name, email FROM users") as cursor:
            async for row in cursor:
                print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")

asyncio.run(main())

Output:

ID: 1, Name: Alice, Email: alice@example.com

Three patterns appear in every aiosqlite script: async with aiosqlite.connect() opens and auto-closes the connection, await db.execute() runs SQL without blocking, and await db.commit() persists changes. Queries that return rows use async with db.execute() to get a cursor you iterate with async for.

aiosqlite vs sqlite3 vs Other Async Databases

LibraryDatabaseI/O ModelBest For
aiosqliteSQLite (file)asyncioAsync apps needing local/embedded DB
sqlite3SQLite (file)SynchronousScripts, Django, Flask
databasesSQLite/Postgres/MySQLasyncioSQLAlchemy Core queries, async
asyncpgPostgreSQL onlyasyncioHigh-performance async Postgres

aiosqlite is the right choice when you want SQLite’s zero-configuration simplicity in an async application. For production systems that need concurrent writes, PostgreSQL with asyncpg or motor will outperform SQLite’s single-writer model. But for bots, CLI tools, personal apps, and microservices with low write concurrency, aiosqlite gives you async SQLite without any infrastructure setup.

Async CRUD Operations

Here is a complete CRUD implementation with a row factory for dict-style access and proper error handling:

# aiosqlite_crud.py
import asyncio
import aiosqlite

DB_PATH = 'products.db'

async def init_db(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            stock INTEGER DEFAULT 0
        )
    ''')
    await db.commit()

async def create_product(db, name, price, stock=0):
    cursor = await db.execute(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        (name, price, stock)
    )
    await db.commit()
    return cursor.lastrowid

async def get_product(db, product_id):
    async with db.execute("SELECT * FROM products WHERE id = ?", (product_id,)) as cur:
        row = await cur.fetchone()
        if row is None:
            return None
        return dict(zip([col[0] for col in cur.description], row))

async def list_products(db, max_price=None):
    sql = "SELECT * FROM products"
    params = ()
    if max_price is not None:
        sql += " WHERE price <= ?"
        params = (max_price,)
    sql += " ORDER BY name"
    async with db.execute(sql, params) as cur:
        cols = [c[0] for c in cur.description]
        return [dict(zip(cols, row)) async for row in cur]

async def update_price(db, product_id, new_price):
    await db.execute("UPDATE products SET price = ? WHERE id = ?", (new_price, product_id))
    await db.commit()

async def delete_product(db, product_id):
    await db.execute("DELETE FROM products WHERE id = ?", (product_id,))
    await db.commit()

async def main():
    async with aiosqlite.connect(DB_PATH) as db:
        await init_db(db)

        id1 = await create_product(db, 'Widget', 9.99, 100)
        id2 = await create_product(db, 'Gadget', 24.99, 50)
        id3 = await create_product(db, 'Doohickey', 4.49, 200)
        print(f"Created products: {id1}, {id2}, {id3}")

        product = await get_product(db, id1)
        print(f"Fetched: {product}")

        cheap = await list_products(db, max_price=10.0)
        print(f"Products under $10: {[p['name'] for p in cheap]}")

        await update_price(db, id1, 7.99)
        updated = await get_product(db, id1)
        print(f"Updated Widget price: ${updated['price']}")

        await delete_product(db, id3)
        all_products = await list_products(db)
        print(f"Remaining products: {[p['name'] for p in all_products]}")

asyncio.run(main())

Output:

Created products: 1, 2, 3
Fetched: {'id': 1, 'name': 'Widget', 'price': 9.99, 'stock': 100}
Products under $10: ['Doohickey', 'Widget']
Updated Widget price: $7.99
Remaining products: ['Gadget', 'Widget']
Debug Dee examining aiosqlite async database results methodically
Debug Dee inspects each row — aiosqlite keeps async queries clean.

Transactions and Batch Inserts

aiosqlite handles transactions explicitly. For batch inserts, wrapping everything in a single transaction is dramatically faster than committing after each row:

# aiosqlite_transactions.py
import asyncio
import aiosqlite
import time

async def insert_batch(db, records):
    """Insert a list of (name, price, stock) tuples in one transaction."""
    await db.executemany(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        records
    )
    await db.commit()

async def main():
    async with aiosqlite.connect(':memory:') as db:  # in-memory for demo
        await db.execute('''
            CREATE TABLE products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT, price REAL, stock INTEGER
            )
        ''')

        # Generate 10,000 sample products
        records = [(f'Product_{i}', round(i * 0.99, 2), i % 100) for i in range(1, 10001)]

        start = time.perf_counter()
        await insert_batch(db, records)
        elapsed = time.perf_counter() - start

        async with db.execute("SELECT COUNT(*) FROM products") as cur:
            count = (await cur.fetchone())[0]
        print(f"Inserted {count:,} rows in {elapsed:.3f}s")

        # Transaction with rollback on error
        try:
            async with db.execute("BEGIN"):
                await db.execute("UPDATE products SET price = price * 0.9 WHERE stock > 50")
                # Simulate an error condition
                raise ValueError("Simulated error -- rolling back")
                await db.commit()
        except ValueError as e:
            await db.rollback()
            print(f"Transaction rolled back: {e}")

asyncio.run(main())

Output:

Inserted 10,000 rows in 0.041s
Transaction rolled back: Simulated error -- rolling back

executemany() with a single commit is the standard pattern for bulk inserts -- it sends all rows in one database round-trip instead of thousands of individual commits. The async with db.execute("BEGIN") pattern starts an explicit transaction block; call await db.rollback() in the except clause to undo all changes if anything fails mid-transaction.

Real-Life Example: Async Task Tracker

Here is a practical async task tracker built on aiosqlite, showing concurrent queries and the row factory pattern in a complete working application:

# async_task_tracker.py
import asyncio
import aiosqlite
from datetime import datetime

DB = 'tasks.db'

async def setup(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            status TEXT DEFAULT 'pending',
            created_at TEXT NOT NULL,
            completed_at TEXT
        )
    ''')
    db.row_factory = aiosqlite.Row  # dict-like row access
    await db.commit()

async def add_task(db, title):
    cur = await db.execute(
        "INSERT INTO tasks (title, created_at) VALUES (?, ?)",
        (title, datetime.now().isoformat())
    )
    await db.commit()
    return cur.lastrowid

async def complete_task(db, task_id):
    await db.execute(
        "UPDATE tasks SET status='done', completed_at=? WHERE id=?",
        (datetime.now().isoformat(), task_id)
    )
    await db.commit()

async def get_summary(db):
    async with db.execute('''
        SELECT status, COUNT(*) as count FROM tasks GROUP BY status
    ''') as cur:
        return {row['status']: row['count'] async for row in cur}

async def list_pending(db):
    async with db.execute("SELECT id, title, created_at FROM tasks WHERE status='pending'") as cur:
        return [dict(row) async for row in cur]

async def main():
    async with aiosqlite.connect(DB) as db:
        await setup(db)

        # Add tasks concurrently
        ids = await asyncio.gather(
            add_task(db, 'Write unit tests'),
            add_task(db, 'Fix login bug'),
            add_task(db, 'Update README'),
            add_task(db, 'Deploy to staging'),
        )
        print(f"Added tasks: {ids}")

        # Complete two tasks
        await asyncio.gather(complete_task(db, ids[0]), complete_task(db, ids[2]))

        pending = await list_pending(db)
        summary = await get_summary(db)

        print(f"\nSummary: {summary}")
        print("Pending tasks:")
        for t in pending:
            print(f"  [{t['id']}] {t['title']}")

asyncio.run(main())

Output:

Added tasks: [1, 2, 3, 4]

Summary: {'pending': 2, 'done': 2}
Pending tasks:
  [2] Fix login bug
  [4] Deploy to staging
Stack Trace Steve checking off aiosqlite transaction commits
Steve ticks each commit — async transactions done right.

Frequently Asked Questions

How does aiosqlite avoid blocking the event loop?

aiosqlite runs all SQLite operations in a separate thread using Python's concurrent.futures.ThreadPoolExecutor. The await keyword yields control back to the event loop while the thread executes the blocking sqlite3 call. This means aiosqlite does not use true async I/O at the OS level -- it uses thread offloading. For most SQLite use cases this is perfectly adequate, but it does mean you are limited by the thread pool size for very high concurrency workloads.

How do I get dict-like row access instead of tuples?

Set db.row_factory = aiosqlite.Row after opening the connection. Then rows support both index access (row[0]) and key access (row['name']). Convert to a plain dict with dict(row) when you need JSON serialization. Alternatively, build the dict manually from cursor.description: dict(zip([c[0] for c in cursor.description], row)).

Can multiple coroutines share one aiosqlite connection?

Yes, but with caution. aiosqlite serializes all operations through its internal thread, so concurrent writes are safe from a thread-safety perspective. However, SQLite itself only allows one writer at a time -- concurrent write operations will serialize at the database level. For read-heavy workloads this is fine; for write-heavy concurrent workloads, consider PostgreSQL instead. Keep one connection per event loop rather than opening connections per coroutine.

Can I use an in-memory database with aiosqlite?

Yes: aiosqlite.connect(':memory:') creates an in-memory database that exists only for the lifetime of the connection. This is useful for tests -- each test function opens its own ':memory:' connection for a clean slate with no file cleanup needed. Note that an in-memory database cannot be shared between connections; each aiosqlite.connect(':memory:') call creates a completely separate database.

How do I handle schema migrations with aiosqlite?

For simple projects, use CREATE TABLE IF NOT EXISTS and ALTER TABLE ADD COLUMN IF NOT EXISTS in your startup script. For production apps with complex schemas, use the alembic migration tool with the synchronous sqlite3 driver (alembic does not need to be async -- run migrations at startup before launching the async event loop). Store the current schema version in a schema_version table and apply pending migrations sequentially.

Conclusion

aiosqlite brings SQLite into the async Python world with minimal friction. In this tutorial you used aiosqlite.connect() as an async context manager, performed CRUD with parameterized queries, ran batch inserts with executemany(), managed transactions with commit and rollback, and built a concurrent async task tracker with dict-like row access via aiosqlite.Row.

The task tracker is a solid base to extend -- add priority columns, due dates, a web interface via FastAPI, or push notifications when tasks are completed. All of these fit naturally into the async CRUD patterns you learned here without any synchronous blocking calls.

Official documentation: aiosqlite -- async bridge to sqlite3.