Last Updated: June 01, 2026
Intermediate
SQLite is the perfect database for small to medium applications — no server to run, no configuration, just a file. But Python’s built-in sqlite3 module is synchronous: every query blocks until the database responds. In an async application built with asyncio, aiohttp, or FastAPI, that blocking call halts the entire event loop for every database operation. aiosqlite wraps Python’s sqlite3 module in an async interface, letting your database queries run without freezing everything else.
Install aiosqlite with pip install aiosqlite. There is nothing else to configure — it uses the same SQLite file your synchronous code already uses. The API closely mirrors sqlite3, so if you have used that module before, the transition is mostly adding async def, await, and async with in the right places.
This tutorial covers opening async connections, executing queries, fetching results, using transactions, handling row factories for dict-like access, and building a complete async task tracker backed by SQLite. By the end, you will know how to integrate aiosqlite into any asyncio-based project without blocking the event loop.
Python developer and educator with 15+ years building production systems across data engineering, web APIs, and AI tooling. Founder of Python How To Program — 270+ in-depth tutorials covering the modern Python stack.
View all tutorials by Pubs →Async SQLite in 15 Lines: Quick Example
Part of the Python Database Hub. See the full hub for related Python tutorials.
Here is the minimum aiosqlite script — create a table, insert a row, and read it back, all asynchronously:
# quick_aiosqlite.py
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect('quick_demo.db') as db:
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", ('Alice', 'alice@example.com'))
await db.commit()
async with db.execute("SELECT id, name, email FROM users") as cursor:
async for row in cursor:
print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")
asyncio.run(main())
Output:
ID: 1, Name: Alice, Email: alice@example.com
Three patterns appear in every aiosqlite script: async with aiosqlite.connect() opens and auto-closes the connection, await db.execute() runs SQL without blocking, and await db.commit() persists changes. Queries that return rows use async with db.execute() to get a cursor you iterate with async for.
aiosqlite vs sqlite3 vs Other Async Databases
| Library | Database | I/O Model | Best For |
|---|---|---|---|
| aiosqlite | SQLite (file) | asyncio | Async apps needing local/embedded DB |
| sqlite3 | SQLite (file) | Synchronous | Scripts, Django, Flask |
| databases | SQLite/Postgres/MySQL | asyncio | SQLAlchemy Core queries, async |
| asyncpg | PostgreSQL only | asyncio | High-performance async Postgres |
aiosqlite is the right choice when you want SQLite’s zero-configuration simplicity in an async application. For production systems that need concurrent writes, PostgreSQL with asyncpg or motor will outperform SQLite’s single-writer model. But for bots, CLI tools, personal apps, and microservices with low write concurrency, aiosqlite gives you async SQLite without any infrastructure setup.
Async CRUD Operations
Here is a complete CRUD implementation with a row factory for dict-style access and proper error handling:
# aiosqlite_crud.py
import asyncio
import aiosqlite
DB_PATH = 'products.db'
async def init_db(db):
await db.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER DEFAULT 0
)
''')
await db.commit()
async def create_product(db, name, price, stock=0):
cursor = await db.execute(
"INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
(name, price, stock)
)
await db.commit()
return cursor.lastrowid
async def get_product(db, product_id):
async with db.execute("SELECT * FROM products WHERE id = ?", (product_id,)) as cur:
row = await cur.fetchone()
if row is None:
return None
return dict(zip([col[0] for col in cur.description], row))
async def list_products(db, max_price=None):
sql = "SELECT * FROM products"
params = ()
if max_price is not None:
sql += " WHERE price <= ?"
params = (max_price,)
sql += " ORDER BY name"
async with db.execute(sql, params) as cur:
cols = [c[0] for c in cur.description]
return [dict(zip(cols, row)) async for row in cur]
async def update_price(db, product_id, new_price):
await db.execute("UPDATE products SET price = ? WHERE id = ?", (new_price, product_id))
await db.commit()
async def delete_product(db, product_id):
await db.execute("DELETE FROM products WHERE id = ?", (product_id,))
await db.commit()
async def main():
async with aiosqlite.connect(DB_PATH) as db:
await init_db(db)
id1 = await create_product(db, 'Widget', 9.99, 100)
id2 = await create_product(db, 'Gadget', 24.99, 50)
id3 = await create_product(db, 'Doohickey', 4.49, 200)
print(f"Created products: {id1}, {id2}, {id3}")
product = await get_product(db, id1)
print(f"Fetched: {product}")
cheap = await list_products(db, max_price=10.0)
print(f"Products under $10: {[p['name'] for p in cheap]}")
await update_price(db, id1, 7.99)
updated = await get_product(db, id1)
print(f"Updated Widget price: ${updated['price']}")
await delete_product(db, id3)
all_products = await list_products(db)
print(f"Remaining products: {[p['name'] for p in all_products]}")
asyncio.run(main())
Output:
Created products: 1, 2, 3
Fetched: {'id': 1, 'name': 'Widget', 'price': 9.99, 'stock': 100}
Products under $10: ['Doohickey', 'Widget']
Updated Widget price: $7.99
Remaining products: ['Gadget', 'Widget']

Transactions and Batch Inserts
aiosqlite handles transactions explicitly. For batch inserts, wrapping everything in a single transaction is dramatically faster than committing after each row:
# aiosqlite_transactions.py
import asyncio
import aiosqlite
import time
async def insert_batch(db, records):
"""Insert a list of (name, price, stock) tuples in one transaction."""
await db.executemany(
"INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
records
)
await db.commit()
async def main():
async with aiosqlite.connect(':memory:') as db: # in-memory for demo
await db.execute('''
CREATE TABLE products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, price REAL, stock INTEGER
)
''')
# Generate 10,000 sample products
records = [(f'Product_{i}', round(i * 0.99, 2), i % 100) for i in range(1, 10001)]
start = time.perf_counter()
await insert_batch(db, records)
elapsed = time.perf_counter() - start
async with db.execute("SELECT COUNT(*) FROM products") as cur:
count = (await cur.fetchone())[0]
print(f"Inserted {count:,} rows in {elapsed:.3f}s")
# Transaction with rollback on error
try:
async with db.execute("BEGIN"):
await db.execute("UPDATE products SET price = price * 0.9 WHERE stock > 50")
# Simulate an error condition
raise ValueError("Simulated error -- rolling back")
await db.commit()
except ValueError as e:
await db.rollback()
print(f"Transaction rolled back: {e}")
asyncio.run(main())
Output:
Inserted 10,000 rows in 0.041s
Transaction rolled back: Simulated error -- rolling back
executemany() with a single commit is the standard pattern for bulk inserts -- it sends all rows in one database round-trip instead of thousands of individual commits. The async with db.execute("BEGIN") pattern starts an explicit transaction block; call await db.rollback() in the except clause to undo all changes if anything fails mid-transaction.
Real-Life Example: Async Task Tracker
Here is a practical async task tracker built on aiosqlite, showing concurrent queries and the row factory pattern in a complete working application:
# async_task_tracker.py
import asyncio
import aiosqlite
from datetime import datetime
DB = 'tasks.db'
async def setup(db):
await db.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
completed_at TEXT
)
''')
db.row_factory = aiosqlite.Row # dict-like row access
await db.commit()
async def add_task(db, title):
cur = await db.execute(
"INSERT INTO tasks (title, created_at) VALUES (?, ?)",
(title, datetime.now().isoformat())
)
await db.commit()
return cur.lastrowid
async def complete_task(db, task_id):
await db.execute(
"UPDATE tasks SET status='done', completed_at=? WHERE id=?",
(datetime.now().isoformat(), task_id)
)
await db.commit()
async def get_summary(db):
async with db.execute('''
SELECT status, COUNT(*) as count FROM tasks GROUP BY status
''') as cur:
return {row['status']: row['count'] async for row in cur}
async def list_pending(db):
async with db.execute("SELECT id, title, created_at FROM tasks WHERE status='pending'") as cur:
return [dict(row) async for row in cur]
async def main():
async with aiosqlite.connect(DB) as db:
await setup(db)
# Add tasks concurrently
ids = await asyncio.gather(
add_task(db, 'Write unit tests'),
add_task(db, 'Fix login bug'),
add_task(db, 'Update README'),
add_task(db, 'Deploy to staging'),
)
print(f"Added tasks: {ids}")
# Complete two tasks
await asyncio.gather(complete_task(db, ids[0]), complete_task(db, ids[2]))
pending = await list_pending(db)
summary = await get_summary(db)
print(f"\nSummary: {summary}")
print("Pending tasks:")
for t in pending:
print(f" [{t['id']}] {t['title']}")
asyncio.run(main())
Output:
Added tasks: [1, 2, 3, 4]
Summary: {'pending': 2, 'done': 2}
Pending tasks:
[2] Fix login bug
[4] Deploy to staging

Frequently Asked Questions
How does aiosqlite avoid blocking the event loop?
aiosqlite runs all SQLite operations in a separate thread using Python's concurrent.futures.ThreadPoolExecutor. The await keyword yields control back to the event loop while the thread executes the blocking sqlite3 call. This means aiosqlite does not use true async I/O at the OS level -- it uses thread offloading. For most SQLite use cases this is perfectly adequate, but it does mean you are limited by the thread pool size for very high concurrency workloads.
How do I get dict-like row access instead of tuples?
Set db.row_factory = aiosqlite.Row after opening the connection. Then rows support both index access (row[0]) and key access (row['name']). Convert to a plain dict with dict(row) when you need JSON serialization. Alternatively, build the dict manually from cursor.description: dict(zip([c[0] for c in cursor.description], row)).
Can multiple coroutines share one aiosqlite connection?
Yes, but with caution. aiosqlite serializes all operations through its internal thread, so concurrent writes are safe from a thread-safety perspective. However, SQLite itself only allows one writer at a time -- concurrent write operations will serialize at the database level. For read-heavy workloads this is fine; for write-heavy concurrent workloads, consider PostgreSQL instead. Keep one connection per event loop rather than opening connections per coroutine.
Can I use an in-memory database with aiosqlite?
Yes: aiosqlite.connect(':memory:') creates an in-memory database that exists only for the lifetime of the connection. This is useful for tests -- each test function opens its own ':memory:' connection for a clean slate with no file cleanup needed. Note that an in-memory database cannot be shared between connections; each aiosqlite.connect(':memory:') call creates a completely separate database.
How do I handle schema migrations with aiosqlite?
For simple projects, use CREATE TABLE IF NOT EXISTS and ALTER TABLE ADD COLUMN IF NOT EXISTS in your startup script. For production apps with complex schemas, use the alembic migration tool with the synchronous sqlite3 driver (alembic does not need to be async -- run migrations at startup before launching the async event loop). Store the current schema version in a schema_version table and apply pending migrations sequentially.
Conclusion
aiosqlite brings SQLite into the async Python world with minimal friction. In this tutorial you used aiosqlite.connect() as an async context manager, performed CRUD with parameterized queries, ran batch inserts with executemany(), managed transactions with commit and rollback, and built a concurrent async task tracker with dict-like row access via aiosqlite.Row.
The task tracker is a solid base to extend -- add priority columns, due dates, a web interface via FastAPI, or push notifications when tasks are completed. All of these fit naturally into the async CRUD patterns you learned here without any synchronous blocking calls.
Official documentation: aiosqlite -- async bridge to sqlite3.
Why Async SQLite
sqlite3 in Python is synchronous — every query blocks the calling thread. In an async web framework (FastAPI, aiohttp), that blocks the event loop and stalls every other request. aiosqlite wraps sqlite3 with async APIs that yield to the loop during disk I/O:
# pip install aiosqlite
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect("app.db") as db:
await db.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
await db.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
await db.commit()
async with db.execute("SELECT * FROM users") as cursor:
async for row in cursor:
print(row)
asyncio.run(main())
The API mirrors sqlite3 — same methods, same parameter style, just await-able. Migration from sync code is mechanical.
Connection Pooling Pattern
For an async web app, opening a new connection per request is wasteful. Use a connection pool — there's no official pool, but the pattern is straightforward with asyncio.Queue:
import asyncio
import aiosqlite
from contextlib import asynccontextmanager
class SQLitePool:
def __init__(self, path: str, size: int = 5):
self.path = path
self.size = size
self._queue: asyncio.Queue = asyncio.Queue(maxsize=size)
self._closed = False
async def init(self):
for _ in range(self.size):
con = await aiosqlite.connect(self.path)
await con.execute("PRAGMA journal_mode=WAL")
await self._queue.put(con)
@asynccontextmanager
async def acquire(self):
con = await self._queue.get()
try:
yield con
finally:
await self._queue.put(con)
async def close(self):
self._closed = True
while not self._queue.empty():
con = await self._queue.get()
await con.close()
# Usage
pool = SQLitePool("app.db", size=5)
async def get_user(user_id: int):
async with pool.acquire() as con:
async with con.execute("SELECT * FROM users WHERE id = ?", (user_id,)) as cur:
return await cur.fetchone()
Row Factories
Same row_factory trick as sync sqlite3 — set it for dict-like access:
async with aiosqlite.connect("app.db") as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT id, name, email FROM users") as cur:
rows = await cur.fetchall()
for row in rows:
print(row["name"], row["email"])
Transactions
aiosqlite supports the same transaction patterns as sqlite3 but as async context managers:
async with aiosqlite.connect("app.db") as db:
try:
await db.execute("INSERT INTO accounts (id, balance) VALUES (?, ?)", (1, 1000))
await db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
await db.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
await db.commit()
except Exception:
await db.rollback()
raise
FastAPI Integration
The classic pattern: pool initialized at startup, injected via Depends, released after the request:
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = SQLitePool("app.db", size=10)
await app.state.pool.init()
yield
await app.state.pool.close()
app = FastAPI(lifespan=lifespan)
async def get_db():
async with app.state.pool.acquire() as con:
yield con
@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)):
async with db.execute("SELECT * FROM users WHERE id = ?", (user_id,)) as cur:
row = await cur.fetchone()
return dict(row) if row else {"error": "not found"}
Common Pitfalls
- One global connection. aiosqlite connections aren't safe across concurrent coroutines. Use a pool or per-coroutine connection.
- Forgetting commit() in async. Same as sync sqlite3 — writes don't persist without
await db.commit(). - WAL mode confusion. WAL helps when reads and writes happen on different connections. With a single connection, you don't benefit much.
- Blocking pragmas at connect time. Some PRAGMAs are slow (full integrity check). Set fast ones (journal_mode, foreign_keys) at connect; defer expensive ones to maintenance windows.
- aiosqlite vs SQLAlchemy async. SQLAlchemy 2.0 has async support — use it for complex queries with ORM benefits. aiosqlite is leaner for raw SQL.
FAQ
Q: aiosqlite or async SQLAlchemy?
A: aiosqlite for raw SQL with minimal overhead. SQLAlchemy async when you have models, relationships, and migrations.
Q: Is aiosqlite faster than sqlite3?
A: Same speed for individual queries — it wraps the same SQLite engine. The win is concurrency: while one query is on disk, the event loop runs other work.
Q: Does it work with WAL mode?
A: Yes — enable PRAGMA journal_mode=WAL at connect time. Multiple connections to the same DB can read while a writer is active.
Q: Can I use it with Django async?
A: Django has its own async ORM. For raw aiosqlite in Django, wrap calls in sync_to_async or use SQLAlchemy parallel to Django.
Q: What's the performance ceiling?
A: Several thousand queries/sec on commodity hardware. For higher concurrent writes, move to PostgreSQL.
Wrapping Up
aiosqlite is the right SQLite library for async Python applications. Same API as sqlite3, same correctness guarantees, just non-blocking. Add a small connection pool, enable WAL mode, and you have a database layer that handles thousands of concurrent FastAPI requests without breaking a sweat.
Related Articles
Continue Learning Python
Tutorials you might also find useful:
- How To Use SQLite with Python: A Complete Guide
- How To Use Python anyio for Async Compatibility
- How To Use Python Litestar for Async Web APIs
- How To Use Python aiosmtplib for Async Email Sending
- How To Use Python motor for Async MongoDB Operations
- How To Use Python aiofiles for Async File Operations