Intermediate

PostgreSQL is one of the most powerful open-source relational databases, and Python developers interact with it constantly — whether building web APIs, running data pipelines, or managing application state. If you have ever needed to store structured data beyond what SQLite can handle, PostgreSQL is usually the next step.

The good news is that psycopg (version 3, the modern successor to the venerable psycopg2) makes connecting Python to PostgreSQL straightforward and safe. It supports parameterized queries out of the box, handles connection pooling, and works beautifully with async code. You can install it with a single pip install psycopg[binary] command and be running queries in minutes.

In this article, we will cover everything you need to connect Python to PostgreSQL. We will start with a quick example showing a basic connection and query, then explain the psycopg library and why it is the recommended adapter. From there, we will walk through CRUD operations (Create, Read, Update, Delete), parameterized queries for security, connection pooling for performance, error handling patterns, and finish with a complete real-life project that builds a task manager backed by PostgreSQL.

Connecting Python to PostgreSQL: Quick Example

Here is a minimal working example that connects to a PostgreSQL database, creates a table, inserts a row, and reads it back. This gives you the core pattern you will use in every PostgreSQL project.

# quick_example.py
import psycopg

# Connect to PostgreSQL (adjust these for your setup)
conn_string = "host=localhost dbname=testdb user=postgres password=postgres"

with psycopg.connect(conn_string) as conn:
    with conn.cursor() as cur:
        # Create a simple table
        cur.execute("""
            CREATE TABLE IF NOT EXISTS greetings (
                id SERIAL PRIMARY KEY,
                message TEXT NOT NULL
            )
        """)
        # Insert a row
        cur.execute("INSERT INTO greetings (message) VALUES (%s)", ("Hello from Python!",))
        conn.commit()

        # Read it back
        cur.execute("SELECT id, message FROM greetings ORDER BY id DESC LIMIT 1")
        row = cur.fetchone()
        print(f"ID: {row[0]}, Message: {row[1]}")

Output:

ID: 1, Message: Hello from Python!

The key things to notice: we use psycopg.connect() with a connection string, wrap everything in with blocks for automatic cleanup, and use %s placeholders for parameterized queries (never string formatting). The conn.commit() call makes the insert permanent. Want to go deeper? Below we cover connection options, all four CRUD operations, pooling, and a complete project.

Understanding psycopg Python PostgreSQL adapter
One connection string. Infinite queries. Zero SQL injection.

What Is psycopg and Why Use It?

psycopg is the most popular PostgreSQL adapter for Python. Version 3 (just called psycopg) is a complete rewrite of the classic psycopg2 that powered Django, Flask, and countless Python applications for over a decade. The new version brings a cleaner API, native async support, and better type handling while keeping the reliability developers trusted.

Here is how psycopg compares to other options for connecting Python to PostgreSQL:

Featurepsycopg (v3)psycopg2asyncpg
Python 3.7+ supportYesYesYes
Async supportBuilt-inNo (needs wrappers)Async only
Connection poolingBuilt-inSeparate packageBuilt-in
Parameterized queries%s and named%s and named$1, $2 style
COPY supportExcellentGoodGood
Active developmentYes (recommended)Maintenance onlyYes
Django/Flask compatibleYesYesLimited

For most Python developers, psycopg (v3) is the right choice. It handles both sync and async workflows, has excellent documentation, and is officially recommended by the PostgreSQL community. The rest of this article uses psycopg v3 exclusively.

Installing psycopg

The easiest way to install psycopg is with the binary package, which bundles the C library so you do not need PostgreSQL development headers installed:

# install_psycopg.sh
pip install "psycopg[binary]"

Output:

Successfully installed psycopg-3.1.18 psycopg-binary-3.1.18

If you prefer to compile from source (common in production Docker images), install the base package and make sure libpq-dev is available: pip install psycopg[c]. For development and tutorials, the binary option is the fastest path.

Connecting to PostgreSQL

psycopg offers several ways to specify your connection. The most common patterns are a connection string (DSN) and keyword arguments. Both produce identical results — choose whichever reads better in your codebase.

# connection_methods.py
import psycopg

# Method 1: Connection string (DSN)
conn1 = psycopg.connect("host=localhost dbname=myapp user=appuser password=secret")

# Method 2: Keyword arguments
conn2 = psycopg.connect(
    host="localhost",
    dbname="myapp",
    user="appuser",
    password="secret",
    port=5432
)

# Method 3: PostgreSQL URI format
conn3 = psycopg.connect("postgresql://appuser:secret@localhost:5432/myapp")

# Always use context managers for automatic cleanup
with psycopg.connect("host=localhost dbname=myapp user=appuser password=secret") as conn:
    print(f"Connected to: {conn.info.dbname}")
    print(f"Server version: {conn.info.server_version}")

conn1.close()
conn2.close()
conn3.close()

Output:

Connected to: myapp
Server version: 160001

The context manager pattern (with psycopg.connect(...) as conn) is strongly recommended. It automatically commits the transaction on success, rolls back on exception, and closes the connection when the block exits. This prevents connection leaks and orphaned transactions — two of the most common PostgreSQL headaches in production.

Connecting Python to PostgreSQL database
conn = psycopg.connect() — three seconds to production-ready database access.

CRUD Operations with psycopg

CREATE: Inserting Data

Inserting data uses cursor.execute() with parameterized queries. Always use %s placeholders — never f-strings or string concatenation. Parameterized queries prevent SQL injection and handle type conversion automatically.

# insert_data.py
import psycopg

with psycopg.connect("host=localhost dbname=testdb user=postgres password=postgres") as conn:
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                age INTEGER
            )
        """)

        # Insert a single row with parameterized query
        cur.execute(
            "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
            ("Alice Chen", "alice@example.com", 28)
        )

        # Insert multiple rows efficiently with executemany
        new_users = [
            ("Bob Park", "bob@example.com", 34),
            ("Carol Smith", "carol@example.com", 22),
            ("Dave Wilson", "dave@example.com", 45),
        ]
        cur.executemany(
            "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
            new_users
        )

        conn.commit()
        print(f"Inserted {1 + len(new_users)} users successfully")

Output:

Inserted 4 users successfully

The executemany() method is cleaner than looping with individual execute() calls, and psycopg optimizes it internally. For truly large batches (thousands of rows), look into cursor.copy() which uses PostgreSQL’s COPY protocol and is dramatically faster.

READ: Querying Data

Reading data involves executing a SELECT query and fetching results. psycopg gives you several fetch options depending on how much data you expect.

# read_data.py
import psycopg

with psycopg.connect("host=localhost dbname=testdb user=postgres password=postgres") as conn:
    with conn.cursor() as cur:
        # Fetch all rows
        cur.execute("SELECT id, name, email, age FROM users ORDER BY name")
        all_users = cur.fetchall()
        print("All users:")
        for user in all_users:
            print(f"  {user[0]}: {user[1]} ({user[2]}), age {user[3]}")

        # Fetch one row
        cur.execute("SELECT name, age FROM users WHERE email = %s", ("alice@example.com",))
        alice = cur.fetchone()
        print(f"\nFound: {alice[0]}, age {alice[1]}")

        # Use row factory for named columns (much more readable)
        cur = conn.cursor(row_factory=psycopg.rows.dict_row)
        cur.execute("SELECT name, email, age FROM users WHERE age > %s", (25,))
        older_users = cur.fetchall()
        print(f"\nUsers over 25:")
        for u in older_users:
            print(f"  {u['name']}: {u['email']}, age {u['age']}")

Output:

All users:
  1: Alice Chen (alice@example.com), age 28
  2: Bob Park (bob@example.com), age 34
  3: Carol Smith (carol@example.com), age 22
  4: Dave Wilson (dave@example.com), age 45

Found: Alice Chen, age 28

Users over 25:
  Alice Chen: alice@example.com, age 28
  Bob Park: bob@example.com, age 34
  Dave Wilson: dave@example.com, age 45

The dict_row row factory is a game-changer for readability. Instead of accessing columns by index (row[0], row[1]), you use names (row['name'], row['email']). This makes your code self-documenting and resilient to column order changes.

UPDATE: Modifying Data

Updates follow the same parameterized pattern. The rowcount attribute tells you how many rows were affected.

# update_data.py
import psycopg

with psycopg.connect("host=localhost dbname=testdb user=postgres password=postgres") as conn:
    with conn.cursor() as cur:
        # Update a single user
        cur.execute(
            "UPDATE users SET age = %s WHERE email = %s",
            (29, "alice@example.com")
        )
        print(f"Updated {cur.rowcount} row(s)")

        # Update multiple rows with a condition
        cur.execute(
            "UPDATE users SET age = age + 1 WHERE age < %s",
            (30,)
        )
        print(f"Birthday bump: {cur.rowcount} user(s) aged up")

        conn.commit()

Output:

Updated 1 row(s)
Birthday bump: 2 user(s) aged up

Always check cur.rowcount after updates and deletes. If it returns 0 when you expected changes, your WHERE clause might be wrong -- and catching that early saves hours of debugging.

DELETE: Removing Data

Deletes work the same way. Be cautious with DELETE statements -- a missing WHERE clause deletes everything in the table.

# delete_data.py
import psycopg

with psycopg.connect("host=localhost dbname=testdb user=postgres password=postgres") as conn:
    with conn.cursor() as cur:
        # Delete a specific user
        cur.execute(
            "DELETE FROM users WHERE email = %s",
            ("dave@example.com",)
        )
        print(f"Deleted {cur.rowcount} user(s)")

        # Verify the deletion
        cur.execute("SELECT COUNT(*) FROM users")
        count = cur.fetchone()[0]
        print(f"Remaining users: {count}")

        conn.commit()

Output:

Deleted 1 user(s)
Remaining users: 3
CRUD operations with Python and PostgreSQL
Four operations, infinite applications. CRUD is the backbone of every database app.

Error Handling

Database operations fail in predictable ways -- duplicate keys, connection drops, malformed queries. psycopg raises specific exception types for each, so you can handle them precisely.

# error_handling.py
import psycopg
from psycopg import errors

conn_string = "host=localhost dbname=testdb user=postgres password=postgres"

try:
    with psycopg.connect(conn_string) as conn:
        with conn.cursor() as cur:
            # This will fail if email already exists (UNIQUE constraint)
            cur.execute(
                "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
                ("Alice Chen", "alice@example.com", 28)
            )
            conn.commit()
except errors.UniqueViolation as e:
    print(f"Duplicate entry: {e.diag.message_detail}")
except errors.OperationalError as e:
    print(f"Connection problem: {e}")
except errors.ProgrammingError as e:
    print(f"SQL error: {e}")
except Exception as e:
    print(f"Unexpected error: {type(e).__name__}: {e}")

Output:

Duplicate entry: Key (email)=(alice@example.com) already exists.

The psycopg.errors module maps every PostgreSQL error code to a Python exception class. UniqueViolation, ForeignKeyViolation, CheckViolation -- they are all there. This lets you show users a friendly "email already taken" message instead of a raw database error.

Connection Pooling

Creating a new database connection for every request is slow (each connection involves a TCP handshake, authentication, and memory allocation on the server). Connection pooling solves this by maintaining a set of open connections that get reused across requests.

# connection_pool.py
from psycopg_pool import ConnectionPool

# Create a pool with min 2, max 10 connections
pool = ConnectionPool(
    "host=localhost dbname=testdb user=postgres password=postgres",
    min_size=2,
    max_size=10
)

# Use connections from the pool
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM users")
        count = cur.fetchone()[0]
        print(f"User count: {count}")

# The connection is returned to the pool, not closed
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT name FROM users LIMIT 1")
        name = cur.fetchone()[0]
        print(f"First user: {name}")

# Get pool stats
stats = pool.get_stats()
print(f"Pool size: {stats['pool_size']}, available: {stats['pool_available']}")

pool.close()

Output:

User count: 3
First user: Alice Chen
Pool size: 2, available: 2

In a web application (Flask, FastAPI, Django), you would create the pool once at startup and share it across all request handlers. This dramatically reduces latency since connections are reused instead of created fresh for every HTTP request. The max_size parameter prevents your application from overwhelming the database with too many simultaneous connections.

Connection pooling for PostgreSQL in Python
One pool, ten connections, a thousand requests. Connection pooling is free performance.

Working with Transactions

By default, psycopg wraps every operation in a transaction. The context manager commits on success and rolls back on failure. But sometimes you need more control -- for example, when multiple operations must succeed or fail together.

# transactions.py
import psycopg

conn_string = "host=localhost dbname=testdb user=postgres password=postgres"

with psycopg.connect(conn_string) as conn:
    # Explicit transaction control
    try:
        with conn.transaction():
            with conn.cursor() as cur:
                # Both operations must succeed
                cur.execute(
                    "UPDATE users SET age = age - 1 WHERE name = %s",
                    ("Alice Chen",)
                )
                cur.execute(
                    "UPDATE users SET age = age + 1 WHERE name = %s",
                    ("Bob Park",)
                )
                print("Both updates committed together")
    except Exception as e:
        print(f"Transaction rolled back: {e}")

    # Nested savepoints
    with conn.transaction() as tx1:
        with conn.cursor() as cur:
            cur.execute("INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
                        ("Eve Brown", "eve@example.com", 31))
            try:
                with conn.transaction() as tx2:
                    cur.execute("INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
                                ("Eve Brown", "eve-duplicate@example.com", 31))
                    # This inner transaction can fail without killing the outer one
            except Exception:
                print("Inner savepoint rolled back, outer transaction continues")

        conn.commit()
        print("Eve inserted successfully")

Output:

Both updates committed together
Eve inserted successfully

The conn.transaction() context manager creates a savepoint when nested. This is incredibly useful for "try this, but if it fails, keep going" patterns -- common in data import pipelines where you want to skip bad rows without losing the entire batch.

Real-Life Example: Building a Task Manager CLI

Let us tie everything together with a complete task manager that stores tasks in PostgreSQL. This project uses connection pooling, parameterized queries, error handling, and all four CRUD operations.

Building a task manager CLI with Python and PostgreSQL
A complete CRUD app with pooling and error handling. Not bad for 50 lines.
# task_manager.py
import psycopg
from psycopg_pool import ConnectionPool
from psycopg import errors
from datetime import datetime

DB_URL = "host=localhost dbname=testdb user=postgres password=postgres"

def setup_database(pool):
    """Create the tasks table if it does not exist."""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS tasks (
                    id SERIAL PRIMARY KEY,
                    title TEXT NOT NULL,
                    description TEXT DEFAULT '',
                    status TEXT DEFAULT 'pending',
                    created_at TIMESTAMP DEFAULT NOW(),
                    completed_at TIMESTAMP
                )
            """)
            conn.commit()

def add_task(pool, title, description=""):
    """Add a new task and return its ID."""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO tasks (title, description) VALUES (%s, %s) RETURNING id",
                (title, description)
            )
            task_id = cur.fetchone()[0]
            conn.commit()
            return task_id

def list_tasks(pool, status_filter=None):
    """List tasks, optionally filtered by status."""
    with pool.connection() as conn:
        with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
            if status_filter:
                cur.execute(
                    "SELECT id, title, status, created_at FROM tasks WHERE status = %s ORDER BY created_at",
                    (status_filter,)
                )
            else:
                cur.execute("SELECT id, title, status, created_at FROM tasks ORDER BY created_at")
            return cur.fetchall()

def complete_task(pool, task_id):
    """Mark a task as completed."""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE tasks SET status = %s, completed_at = %s WHERE id = %s",
                ("completed", datetime.now(), task_id)
            )
            conn.commit()
            return cur.rowcount > 0

def delete_task(pool, task_id):
    """Delete a task by ID."""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute("DELETE FROM tasks WHERE id = %s", (task_id,))
            conn.commit()
            return cur.rowcount > 0

# Demo usage
pool = ConnectionPool(DB_URL, min_size=2, max_size=5)
setup_database(pool)

# Add some tasks
id1 = add_task(pool, "Learn psycopg", "Complete the PostgreSQL tutorial")
id2 = add_task(pool, "Build REST API", "Create FastAPI endpoints for tasks")
id3 = add_task(pool, "Write tests", "Add pytest coverage for database layer")
print(f"Created tasks: {id1}, {id2}, {id3}")

# List all tasks
print("\nAll tasks:")
for task in list_tasks(pool):
    print(f"  [{task['status']}] #{task['id']}: {task['title']}")

# Complete a task
complete_task(pool, id1)
print(f"\nCompleted task #{id1}")

# List pending tasks only
print("\nPending tasks:")
for task in list_tasks(pool, "pending"):
    print(f"  #{task['id']}: {task['title']}")

# Delete a task
delete_task(pool, id3)
print(f"\nDeleted task #{id3}")

# Final count
print(f"\nTotal tasks remaining: {len(list_tasks(pool))}")

pool.close()

Output:

Created tasks: 1, 2, 3

All tasks:
  [pending] #1: Learn psycopg
  [pending] #2: Build REST API
  [pending] #3: Write tests

Completed task #1

Pending tasks:
  #2: Build REST API
  #3: Write tests

Deleted task #3

Total tasks remaining: 2

This task manager demonstrates every concept from the article: connecting with a pool, parameterized queries for safety, dict_row for readable results, RETURNING clauses for getting generated IDs, and proper transaction handling. You could extend this into a full web application by wrapping these functions in FastAPI or Flask endpoints.

Frequently Asked Questions

Should I use psycopg2 or psycopg (v3)?

For new projects, always use psycopg v3 (installed as pip install psycopg). It has better async support, built-in connection pooling, a cleaner API, and is actively developed. psycopg2 is in maintenance mode -- it still works, but new features and improvements only land in v3. The migration is straightforward since the core concepts (parameterized queries, cursors, context managers) are the same.

How do I prevent SQL injection with psycopg?

Always use parameterized queries with %s placeholders: cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)). Never use f-strings, string concatenation, or format() to build SQL. psycopg handles escaping and type conversion automatically, making injection impossible as long as you use placeholders consistently.

Can I use psycopg with async/await?

Yes. psycopg v3 has a built-in async module: from psycopg import AsyncConnection. Use await AsyncConnection.connect() and await cursor.execute(). It works with asyncio, FastAPI, and any other async framework. The async connection pool is AsyncConnectionPool from psycopg_pool.

How many connections should my pool have?

A good starting point is min_size=2, max_size=10 for small applications. The PostgreSQL documentation suggests a formula: max_connections = (core_count * 2) + effective_spindle_count. In practice, most web applications work well with 10-20 connections in the pool. Monitor your PostgreSQL pg_stat_activity view to see actual connection usage and tune from there.

How do I store database credentials securely?

Never hardcode credentials in your source code. Use environment variables (os.environ['DATABASE_URL']), a .env file loaded with python-dotenv, or a secrets manager (AWS Secrets Manager, HashiCorp Vault). PostgreSQL also supports a ~/.pgpass file for local development. For connection strings, the standard DATABASE_URL environment variable works with most frameworks and deployment platforms.

Conclusion

You now have a solid foundation for connecting Python to PostgreSQL with psycopg. We covered the essential workflow: installing psycopg[binary], establishing connections with context managers, running all four CRUD operations with parameterized queries, handling database errors gracefully, and using connection pooling for production performance. The task manager project ties all these concepts into a practical, extensible application.

From here, try extending the task manager with features like priority levels, due dates, or full-text search using PostgreSQL's tsvector type. Psycopg handles all of these naturally since it passes your SQL through to PostgreSQL without limiting which features you can use.

For the complete API reference and advanced topics like COPY operations, async usage, and custom type adapters, check the official psycopg documentation at www.psycopg.org/psycopg3/docs/.