Intermediate

If you have ever built a chat application, a live dashboard, or a multiplayer game, you know that regular HTTP requests fall short when you need real-time, bidirectional communication. Polling the server every few seconds wastes bandwidth and adds latency. WebSockets solve this by keeping a persistent connection open between client and server, allowing both sides to send messages at any time.

Python’s websockets library makes building WebSocket servers and clients remarkably straightforward. It is built on top of asyncio, so it integrates naturally with Python’s async ecosystem. You only need to install one package — pip install websockets — and you are ready to go.

In this tutorial, you will learn how to create a WebSocket server, build a client that connects to it, implement broadcast messaging for chat-style applications, handle connection lifecycle events, and add basic authentication. By the end, you will have a working real-time chat server that you can extend for your own projects.

WebSocket Echo Server: Quick Example

Here is the simplest possible WebSocket server — it echoes back whatever the client sends:

# echo_server.py
import asyncio
import websockets

async def echo(websocket):
    async for message in websocket:
        print(f"Received: {message}")
        await websocket.send(f"Echo: {message}")

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        print("Echo server running on ws://localhost:8765")
        await asyncio.Future()  # Run forever

asyncio.run(main())

Output (server):

Echo server running on ws://localhost:8765
Received: Hello WebSocket!
Received: How are you?

The websockets.serve() function starts a server that calls the echo handler for every new connection. The async for message in websocket pattern automatically handles the connection lifecycle — it reads messages until the client disconnects, then exits cleanly. We will build on this pattern throughout the tutorial.

What Are WebSockets and Why Use Them?

WebSockets are a communication protocol that provides full-duplex (two-way) communication channels over a single TCP connection. Unlike HTTP, where the client must initiate every request, WebSockets allow either side to send data at any time after the initial handshake.

The WebSocket protocol starts with an HTTP upgrade request. The client sends a regular HTTP request with an Upgrade: websocket header, and if the server agrees, the connection is upgraded from HTTP to WebSocket. From that point on, both sides communicate using lightweight WebSocket frames instead of full HTTP requests.

FeatureHTTPWebSocket
DirectionClient to server onlyBidirectional
ConnectionNew connection per requestPersistent single connection
OverheadHeaders on every requestMinimal frame overhead (2-14 bytes)
LatencyRound-trip per requestNear real-time
Use caseREST APIs, page loadsChat, live feeds, gaming

Use WebSockets when you need low-latency, real-time updates. Use HTTP when you need stateless request-response patterns (like REST APIs).

Setting Up the websockets Library

Install the library with pip:

# install.sh
pip install websockets

Output:

Successfully installed websockets-13.1

The websockets library requires Python 3.8 or later. It has no dependencies beyond the standard library’s asyncio module, keeping your dependency tree clean.

Building a WebSocket Client

To test our servers, we need a client. Here is a simple interactive client that sends messages and prints responses:

# client.py
import asyncio
import websockets

async def chat_client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        print("Connected to server. Type messages (Ctrl+C to quit):")
        
        # Send a greeting
        await websocket.send("Hello from Python client!")
        response = await websocket.recv()
        print(f"Server says: {response}")
        
        # Interactive loop
        while True:
            message = input("> ")
            await websocket.send(message)
            response = await websocket.recv()
            print(f"Server says: {response}")

asyncio.run(chat_client())

Output:

Connected to server. Type messages (Ctrl+C to quit):
Server says: Echo: Hello from Python client!
> Testing 123
Server says: Echo: Testing 123

The websockets.connect() context manager handles the connection lifecycle automatically. When you exit the async with block (or the program ends), the connection closes cleanly with a proper WebSocket close handshake.

Building a Broadcast Chat Server

A real chat server needs to broadcast messages from one client to all connected clients. This requires tracking active connections in a set:

# chat_server.py
import asyncio
import websockets
import json
from datetime import datetime

connected_clients = set()

async def broadcast(message, sender=None):
    """Send a message to all connected clients except the sender."""
    disconnected = set()
    for client in connected_clients:
        if client != sender:
            try:
                await client.send(message)
            except websockets.ConnectionClosed:
                disconnected.add(client)
    # Clean up disconnected clients
    connected_clients -= disconnected

async def chat_handler(websocket):
    """Handle a single client connection."""
    # Register client
    connected_clients.add(websocket)
    client_id = f"User-{id(websocket) % 1000}"
    print(f"{client_id} connected. Total clients: {len(connected_clients)}")
    
    # Notify others
    join_msg = json.dumps({
        "type": "system",
        "message": f"{client_id} joined the chat",
        "timestamp": datetime.now().isoformat()
    })
    await broadcast(join_msg, sender=websocket)
    
    try:
        async for message in websocket:
            # Wrap message with metadata
            chat_msg = json.dumps({
                "type": "chat",
                "sender": client_id,
                "message": message,
                "timestamp": datetime.now().isoformat()
            })
            print(f"{client_id}: {message}")
            await broadcast(chat_msg)
    except websockets.ConnectionClosed:
        pass
    finally:
        # Unregister client
        connected_clients.discard(websocket)
        leave_msg = json.dumps({
            "type": "system",
            "message": f"{client_id} left the chat",
            "timestamp": datetime.now().isoformat()
        })
        await broadcast(leave_msg)
        print(f"{client_id} disconnected. Total clients: {len(connected_clients)}")

async def main():
    async with websockets.serve(chat_handler, "localhost", 8765):
        print("Chat server running on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Output (server with two clients):

Chat server running on ws://localhost:8765
User-142 connected. Total clients: 1
User-857 connected. Total clients: 2
User-142: Hello everyone!
User-857: Hey there!
User-142 disconnected. Total clients: 1

The connected_clients set tracks all active connections. When a client sends a message, broadcast() forwards it to every other connected client. The try/finally block ensures we clean up the client set even if the connection drops unexpectedly.

Handling Connection Errors Gracefully

Real-world WebSocket connections drop unexpectedly due to network issues, client crashes, or timeouts. Robust error handling is essential:

# robust_server.py
import asyncio
import websockets
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("websocket-server")

async def robust_handler(websocket):
    """Handler with comprehensive error management."""
    remote = websocket.remote_address
    logger.info(f"New connection from {remote}")
    
    try:
        # Set a ping interval to detect dead connections
        async for message in websocket:
            if len(message) > 10000:
                await websocket.send("Error: Message too long (max 10000 chars)")
                continue
            
            await websocket.send(f"Processed: {message}")
            
    except websockets.ConnectionClosedError as e:
        logger.warning(f"Connection closed with error: {e.code} {e.reason}")
    except websockets.ConnectionClosedOK:
        logger.info(f"Connection closed normally from {remote}")
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
    finally:
        logger.info(f"Cleanup complete for {remote}")

async def main():
    async with websockets.serve(
        robust_handler,
        "localhost",
        8765,
        ping_interval=20,     # Send ping every 20 seconds
        ping_timeout=10,      # Wait 10 seconds for pong
        max_size=2**20,       # Max message size: 1MB
        close_timeout=5       # Wait 5 seconds for close handshake
    ):
        logger.info("Robust server running on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Output:

INFO:websocket-server:Robust server running on ws://localhost:8765
INFO:websocket-server:New connection from ('127.0.0.1', 54321)
WARNING:websocket-server:Connection closed with error: 1006 
INFO:websocket-server:Cleanup complete for ('127.0.0.1', 54321)

The ping_interval and ping_timeout parameters enable automatic keepalive detection. If a client stops responding to pings within the timeout, the server closes the connection and triggers cleanup. The max_size parameter prevents clients from sending oversized messages that could exhaust server memory.

Adding Basic Authentication

You can authenticate WebSocket connections using query parameters, headers, or an initial authentication message. Here is a token-based approach:

# auth_server.py
import asyncio
import websockets
import json
import secrets

# Valid tokens (in production, use a database)
VALID_TOKENS = {
    "demo-token-abc123": "alice",
    "demo-token-def456": "bob"
}

async def authenticate(websocket):
    """Authenticate the client using the first message."""
    try:
        auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
        data = json.loads(auth_msg)
        token = data.get("token", "")
        
        for valid_token, username in VALID_TOKENS.items():
            if secrets.compare_digest(token, valid_token):
                await websocket.send(json.dumps({
                    "type": "auth", 
                    "status": "ok",
                    "username": username
                }))
                return username
        
        await websocket.send(json.dumps({
            "type": "auth",
            "status": "error", 
            "message": "Invalid token"
        }))
        return None
        
    except asyncio.TimeoutError:
        await websocket.send(json.dumps({
            "type": "auth",
            "status": "error",
            "message": "Authentication timeout"
        }))
        return None

async def secure_handler(websocket):
    """Handler that requires authentication."""
    username = await authenticate(websocket)
    if not username:
        await websocket.close(1008, "Authentication failed")
        return
    
    print(f"{username} authenticated successfully")
    
    async for message in websocket:
        response = f"[{username}] {message}"
        await websocket.send(response)

async def main():
    async with websockets.serve(secure_handler, "localhost", 8765):
        print("Secure server running on ws://localhost:8765")
        await asyncio.Future()

asyncio.run(main())

Output:

Secure server running on ws://localhost:8765
alice authenticated successfully

The server expects the first message to contain a JSON object with a token field. It uses secrets.compare_digest() for timing-safe comparison and gives the client 5 seconds to authenticate before timing out. If authentication fails, the connection is closed with WebSocket status code 1008 (Policy Violation).

Real-Life Example: Live Notification System

Let us build a notification system where a server pushes real-time alerts to connected clients based on their subscribed topics:

# notification_server.py
import asyncio
import websockets
import json
from datetime import datetime
from collections import defaultdict

class NotificationServer:
    """Real-time notification server with topic subscriptions."""
    
    def __init__(self):
        self.subscribers = defaultdict(set)  # topic -> set of websockets
        self.clients = {}  # websocket -> client info
    
    async def handle_subscribe(self, websocket, topics):
        """Subscribe a client to one or more topics."""
        for topic in topics:
            self.subscribers[topic].add(websocket)
        self.clients[websocket]["topics"] = topics
        await websocket.send(json.dumps({
            "type": "subscribed",
            "topics": topics
        }))
    
    async def notify(self, topic, message):
        """Send a notification to all subscribers of a topic."""
        if topic not in self.subscribers:
            return 0
        
        notification = json.dumps({
            "type": "notification",
            "topic": topic,
            "message": message,
            "timestamp": datetime.now().isoformat()
        })
        
        sent = 0
        disconnected = set()
        for client in self.subscribers[topic]:
            try:
                await client.send(notification)
                sent += 1
            except websockets.ConnectionClosed:
                disconnected.add(client)
        
        # Cleanup
        self.subscribers[topic] -= disconnected
        return sent
    
    async def handler(self, websocket):
        """Main connection handler."""
        self.clients[websocket] = {"topics": [], "connected": datetime.now()}
        
        try:
            async for raw_message in websocket:
                data = json.loads(raw_message)
                action = data.get("action")
                
                if action == "subscribe":
                    await self.handle_subscribe(
                        websocket, data.get("topics", [])
                    )
                elif action == "publish":
                    topic = data.get("topic")
                    message = data.get("message")
                    count = await self.notify(topic, message)
                    await websocket.send(json.dumps({
                        "type": "published",
                        "topic": topic,
                        "recipients": count
                    }))
        except websockets.ConnectionClosed:
            pass
        finally:
            # Remove from all subscriptions
            for topic_subs in self.subscribers.values():
                topic_subs.discard(websocket)
            self.clients.pop(websocket, None)

server = NotificationServer()

async def main():
    async with websockets.serve(server.handler, "localhost", 8765):
        print("Notification server running on ws://localhost:8765")
        
        # Simulate periodic system notifications
        async def system_alerts():
            while True:
                await asyncio.sleep(30)
                await server.notify("system", "Heartbeat check - all systems normal")
        
        await asyncio.gather(
            asyncio.Future(),  # Run forever
            system_alerts()
        )

asyncio.run(main())

Output:

Notification server running on ws://localhost:8765

This server supports topic-based subscriptions. Clients subscribe to topics like “alerts”, “updates”, or “system”, and only receive notifications for their subscribed topics. The notify() method handles broadcasting to subscribers and automatically cleans up disconnected clients. You could extend this with persistent message queues, delivery confirmation, or priority levels.

Frequently Asked Questions

How many concurrent connections can a Python WebSocket server handle?

A single Python process using asyncio can typically handle 10,000-50,000 concurrent WebSocket connections, depending on message frequency and server resources. The websockets library is efficient with memory, using roughly 10KB per connection. For higher scale, use multiple processes behind a load balancer like Nginx.

How do I add TLS/SSL encryption (wss://)?

Pass an ssl context to websockets.serve(): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER), then load your certificate with ssl_context.load_cert_chain(certfile, keyfile). In production, use a reverse proxy like Nginx to handle TLS termination instead.

How do I handle client reconnection?

The websockets library does not auto-reconnect. On the client side, wrap your connection in a retry loop with exponential backoff. The websockets.connect() context manager closes cleanly, so you can simply reconnect in a while True loop with a try/except block catching ConnectionClosed errors.

Can I send binary data over WebSockets?

Yes. The websockets library automatically detects whether you send str (text frames) or bytes (binary frames). Use await websocket.send(b"\x00\x01\x02") for binary data. This is useful for sending images, audio streams, or protobuf-encoded messages.

Can I use WebSockets with Django or Flask?

Django supports WebSockets through Django Channels, which adds an ASGI layer. Flask does not natively support WebSockets, but you can use Flask-SocketIO or run a separate websockets server alongside Flask. For new projects needing WebSocket support, consider FastAPI, which has native WebSocket support built on Starlette.

Conclusion

You now have a solid foundation for building WebSocket applications in Python. We covered creating echo servers, building broadcast chat systems, handling connection errors gracefully, adding authentication, and implementing topic-based notification systems. The websockets library’s async-first design makes it easy to handle thousands of concurrent connections with clean, readable code.

Try extending the notification server with features like message persistence, delivery acknowledgments, or a web-based admin dashboard. For the complete API reference, see the official websockets documentation.