Intermediate
If you have ever built a chat application, a live dashboard, or a multiplayer game, you know that regular HTTP requests fall short when you need real-time, bidirectional communication. Polling the server every few seconds wastes bandwidth and adds latency. WebSockets solve this by keeping a persistent connection open between client and server, allowing both sides to send messages at any time.
Python’s websockets library makes building WebSocket servers and clients remarkably straightforward. It is built on top of asyncio, so it integrates naturally with Python’s async ecosystem. You only need to install one package — pip install websockets — and you are ready to go.
In this tutorial, you will learn how to create a WebSocket server, build a client that connects to it, implement broadcast messaging for chat-style applications, handle connection lifecycle events, and add basic authentication. By the end, you will have a working real-time chat server that you can extend for your own projects.
WebSocket Echo Server: Quick Example
Here is the simplest possible WebSocket server — it echoes back whatever the client sends:
# echo_server.py
import asyncio
import websockets
async def echo(websocket):
async for message in websocket:
print(f"Received: {message}")
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("Echo server running on ws://localhost:8765")
await asyncio.Future() # Run forever
asyncio.run(main())
Output (server):
Echo server running on ws://localhost:8765
Received: Hello WebSocket!
Received: How are you?
The websockets.serve() function starts a server that calls the echo handler for every new connection. The async for message in websocket pattern automatically handles the connection lifecycle — it reads messages until the client disconnects, then exits cleanly. We will build on this pattern throughout the tutorial.
What Are WebSockets and Why Use Them?
WebSockets are a communication protocol that provides full-duplex (two-way) communication channels over a single TCP connection. Unlike HTTP, where the client must initiate every request, WebSockets allow either side to send data at any time after the initial handshake.
The WebSocket protocol starts with an HTTP upgrade request. The client sends a regular HTTP request with an Upgrade: websocket header, and if the server agrees, the connection is upgraded from HTTP to WebSocket. From that point on, both sides communicate using lightweight WebSocket frames instead of full HTTP requests.
| Feature | HTTP | WebSocket |
|---|---|---|
| Direction | Client to server only | Bidirectional |
| Connection | New connection per request | Persistent single connection |
| Overhead | Headers on every request | Minimal frame overhead (2-14 bytes) |
| Latency | Round-trip per request | Near real-time |
| Use case | REST APIs, page loads | Chat, live feeds, gaming |
Use WebSockets when you need low-latency, real-time updates. Use HTTP when you need stateless request-response patterns (like REST APIs).
Setting Up the websockets Library
Install the library with pip:
# install.sh
pip install websockets
Output:
Successfully installed websockets-13.1
The websockets library requires Python 3.8 or later. It has no dependencies beyond the standard library’s asyncio module, keeping your dependency tree clean.
Building a WebSocket Client
To test our servers, we need a client. Here is a simple interactive client that sends messages and prints responses:
# client.py
import asyncio
import websockets
async def chat_client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
print("Connected to server. Type messages (Ctrl+C to quit):")
# Send a greeting
await websocket.send("Hello from Python client!")
response = await websocket.recv()
print(f"Server says: {response}")
# Interactive loop
while True:
message = input("> ")
await websocket.send(message)
response = await websocket.recv()
print(f"Server says: {response}")
asyncio.run(chat_client())
Output:
Connected to server. Type messages (Ctrl+C to quit):
Server says: Echo: Hello from Python client!
> Testing 123
Server says: Echo: Testing 123
The websockets.connect() context manager handles the connection lifecycle automatically. When you exit the async with block (or the program ends), the connection closes cleanly with a proper WebSocket close handshake.
Building a Broadcast Chat Server
A real chat server needs to broadcast messages from one client to all connected clients. This requires tracking active connections in a set:
# chat_server.py
import asyncio
import websockets
import json
from datetime import datetime
connected_clients = set()
async def broadcast(message, sender=None):
"""Send a message to all connected clients except the sender."""
disconnected = set()
for client in connected_clients:
if client != sender:
try:
await client.send(message)
except websockets.ConnectionClosed:
disconnected.add(client)
# Clean up disconnected clients
connected_clients -= disconnected
async def chat_handler(websocket):
"""Handle a single client connection."""
# Register client
connected_clients.add(websocket)
client_id = f"User-{id(websocket) % 1000}"
print(f"{client_id} connected. Total clients: {len(connected_clients)}")
# Notify others
join_msg = json.dumps({
"type": "system",
"message": f"{client_id} joined the chat",
"timestamp": datetime.now().isoformat()
})
await broadcast(join_msg, sender=websocket)
try:
async for message in websocket:
# Wrap message with metadata
chat_msg = json.dumps({
"type": "chat",
"sender": client_id,
"message": message,
"timestamp": datetime.now().isoformat()
})
print(f"{client_id}: {message}")
await broadcast(chat_msg)
except websockets.ConnectionClosed:
pass
finally:
# Unregister client
connected_clients.discard(websocket)
leave_msg = json.dumps({
"type": "system",
"message": f"{client_id} left the chat",
"timestamp": datetime.now().isoformat()
})
await broadcast(leave_msg)
print(f"{client_id} disconnected. Total clients: {len(connected_clients)}")
async def main():
async with websockets.serve(chat_handler, "localhost", 8765):
print("Chat server running on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())
Output (server with two clients):
Chat server running on ws://localhost:8765
User-142 connected. Total clients: 1
User-857 connected. Total clients: 2
User-142: Hello everyone!
User-857: Hey there!
User-142 disconnected. Total clients: 1
The connected_clients set tracks all active connections. When a client sends a message, broadcast() forwards it to every other connected client. The try/finally block ensures we clean up the client set even if the connection drops unexpectedly.
Handling Connection Errors Gracefully
Real-world WebSocket connections drop unexpectedly due to network issues, client crashes, or timeouts. Robust error handling is essential:
# robust_server.py
import asyncio
import websockets
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("websocket-server")
async def robust_handler(websocket):
"""Handler with comprehensive error management."""
remote = websocket.remote_address
logger.info(f"New connection from {remote}")
try:
# Set a ping interval to detect dead connections
async for message in websocket:
if len(message) > 10000:
await websocket.send("Error: Message too long (max 10000 chars)")
continue
await websocket.send(f"Processed: {message}")
except websockets.ConnectionClosedError as e:
logger.warning(f"Connection closed with error: {e.code} {e.reason}")
except websockets.ConnectionClosedOK:
logger.info(f"Connection closed normally from {remote}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
logger.info(f"Cleanup complete for {remote}")
async def main():
async with websockets.serve(
robust_handler,
"localhost",
8765,
ping_interval=20, # Send ping every 20 seconds
ping_timeout=10, # Wait 10 seconds for pong
max_size=2**20, # Max message size: 1MB
close_timeout=5 # Wait 5 seconds for close handshake
):
logger.info("Robust server running on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())
Output:
INFO:websocket-server:Robust server running on ws://localhost:8765
INFO:websocket-server:New connection from ('127.0.0.1', 54321)
WARNING:websocket-server:Connection closed with error: 1006
INFO:websocket-server:Cleanup complete for ('127.0.0.1', 54321)
The ping_interval and ping_timeout parameters enable automatic keepalive detection. If a client stops responding to pings within the timeout, the server closes the connection and triggers cleanup. The max_size parameter prevents clients from sending oversized messages that could exhaust server memory.
Adding Basic Authentication
You can authenticate WebSocket connections using query parameters, headers, or an initial authentication message. Here is a token-based approach:
# auth_server.py
import asyncio
import websockets
import json
import secrets
# Valid tokens (in production, use a database)
VALID_TOKENS = {
"demo-token-abc123": "alice",
"demo-token-def456": "bob"
}
async def authenticate(websocket):
"""Authenticate the client using the first message."""
try:
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
data = json.loads(auth_msg)
token = data.get("token", "")
for valid_token, username in VALID_TOKENS.items():
if secrets.compare_digest(token, valid_token):
await websocket.send(json.dumps({
"type": "auth",
"status": "ok",
"username": username
}))
return username
await websocket.send(json.dumps({
"type": "auth",
"status": "error",
"message": "Invalid token"
}))
return None
except asyncio.TimeoutError:
await websocket.send(json.dumps({
"type": "auth",
"status": "error",
"message": "Authentication timeout"
}))
return None
async def secure_handler(websocket):
"""Handler that requires authentication."""
username = await authenticate(websocket)
if not username:
await websocket.close(1008, "Authentication failed")
return
print(f"{username} authenticated successfully")
async for message in websocket:
response = f"[{username}] {message}"
await websocket.send(response)
async def main():
async with websockets.serve(secure_handler, "localhost", 8765):
print("Secure server running on ws://localhost:8765")
await asyncio.Future()
asyncio.run(main())
Output:
Secure server running on ws://localhost:8765
alice authenticated successfully
The server expects the first message to contain a JSON object with a token field. It uses secrets.compare_digest() for timing-safe comparison and gives the client 5 seconds to authenticate before timing out. If authentication fails, the connection is closed with WebSocket status code 1008 (Policy Violation).
Real-Life Example: Live Notification System
Let us build a notification system where a server pushes real-time alerts to connected clients based on their subscribed topics:
# notification_server.py
import asyncio
import websockets
import json
from datetime import datetime
from collections import defaultdict
class NotificationServer:
"""Real-time notification server with topic subscriptions."""
def __init__(self):
self.subscribers = defaultdict(set) # topic -> set of websockets
self.clients = {} # websocket -> client info
async def handle_subscribe(self, websocket, topics):
"""Subscribe a client to one or more topics."""
for topic in topics:
self.subscribers[topic].add(websocket)
self.clients[websocket]["topics"] = topics
await websocket.send(json.dumps({
"type": "subscribed",
"topics": topics
}))
async def notify(self, topic, message):
"""Send a notification to all subscribers of a topic."""
if topic not in self.subscribers:
return 0
notification = json.dumps({
"type": "notification",
"topic": topic,
"message": message,
"timestamp": datetime.now().isoformat()
})
sent = 0
disconnected = set()
for client in self.subscribers[topic]:
try:
await client.send(notification)
sent += 1
except websockets.ConnectionClosed:
disconnected.add(client)
# Cleanup
self.subscribers[topic] -= disconnected
return sent
async def handler(self, websocket):
"""Main connection handler."""
self.clients[websocket] = {"topics": [], "connected": datetime.now()}
try:
async for raw_message in websocket:
data = json.loads(raw_message)
action = data.get("action")
if action == "subscribe":
await self.handle_subscribe(
websocket, data.get("topics", [])
)
elif action == "publish":
topic = data.get("topic")
message = data.get("message")
count = await self.notify(topic, message)
await websocket.send(json.dumps({
"type": "published",
"topic": topic,
"recipients": count
}))
except websockets.ConnectionClosed:
pass
finally:
# Remove from all subscriptions
for topic_subs in self.subscribers.values():
topic_subs.discard(websocket)
self.clients.pop(websocket, None)
server = NotificationServer()
async def main():
async with websockets.serve(server.handler, "localhost", 8765):
print("Notification server running on ws://localhost:8765")
# Simulate periodic system notifications
async def system_alerts():
while True:
await asyncio.sleep(30)
await server.notify("system", "Heartbeat check - all systems normal")
await asyncio.gather(
asyncio.Future(), # Run forever
system_alerts()
)
asyncio.run(main())
Output:
Notification server running on ws://localhost:8765
This server supports topic-based subscriptions. Clients subscribe to topics like “alerts”, “updates”, or “system”, and only receive notifications for their subscribed topics. The notify() method handles broadcasting to subscribers and automatically cleans up disconnected clients. You could extend this with persistent message queues, delivery confirmation, or priority levels.
Frequently Asked Questions
How many concurrent connections can a Python WebSocket server handle?
A single Python process using asyncio can typically handle 10,000-50,000 concurrent WebSocket connections, depending on message frequency and server resources. The websockets library is efficient with memory, using roughly 10KB per connection. For higher scale, use multiple processes behind a load balancer like Nginx.
How do I add TLS/SSL encryption (wss://)?
Pass an ssl context to websockets.serve(): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER), then load your certificate with ssl_context.load_cert_chain(certfile, keyfile). In production, use a reverse proxy like Nginx to handle TLS termination instead.
How do I handle client reconnection?
The websockets library does not auto-reconnect. On the client side, wrap your connection in a retry loop with exponential backoff. The websockets.connect() context manager closes cleanly, so you can simply reconnect in a while True loop with a try/except block catching ConnectionClosed errors.
Can I send binary data over WebSockets?
Yes. The websockets library automatically detects whether you send str (text frames) or bytes (binary frames). Use await websocket.send(b"\x00\x01\x02") for binary data. This is useful for sending images, audio streams, or protobuf-encoded messages.
Can I use WebSockets with Django or Flask?
Django supports WebSockets through Django Channels, which adds an ASGI layer. Flask does not natively support WebSockets, but you can use Flask-SocketIO or run a separate websockets server alongside Flask. For new projects needing WebSocket support, consider FastAPI, which has native WebSocket support built on Starlette.
Conclusion
You now have a solid foundation for building WebSocket applications in Python. We covered creating echo servers, building broadcast chat systems, handling connection errors gracefully, adding authentication, and implementing topic-based notification systems. The websockets library’s async-first design makes it easy to handle thousands of concurrent connections with clean, readable code.
Try extending the notification server with features like message persistence, delivery acknowledgments, or a web-based admin dashboard. For the complete API reference, see the official websockets documentation.