agentbench / data /tech_docs /fastapi_websockets.md
Nomearod's picture
feat: Day 4 — corpus, ingest script, first 10 golden questions
a152b95

WebSockets in FastAPI

FastAPI supports WebSocket connections through Starlette's WebSocket implementation, enabling full-duplex, bidirectional communication between clients and servers. WebSockets are ideal for real-time features such as chat applications, live dashboards, and streaming updates.

Basic WebSocket Endpoint

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    while True:
        data = await ws.receive_text()
        await ws.send_text(f"Echo: {data}")

The @app.websocket() decorator registers a WebSocket route. The handler receives a WebSocket object, which must be explicitly accepted by calling await ws.accept() before any data can be sent or received. The accept() method sends the HTTP 101 Switching Protocols response to the client.

Send and Receive Methods

The WebSocket object provides several methods for communication:

Method Description
receive_text() Receive a text (string) message
receive_bytes() Receive a binary message
receive_json() Receive and parse a JSON message
send_text(data) Send a text message
send_bytes(data) Send binary data
send_json(data) Serialize and send a JSON message
close(code=1000) Close the connection with a status code

The default close code is 1000 (normal closure). Other common codes are 1001 (going away), 1008 (policy violation), and 1011 (unexpected condition). The maximum WebSocket message size defaults to 16 MB in Uvicorn, configurable via the --ws-max-size flag.

Handling Disconnects

Clients can disconnect at any time. Handle this with WebSocketDisconnect:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            await manager.broadcast(f"User says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(ws)
        await manager.broadcast("A user has left the chat")

The WebSocketDisconnect exception is raised when receive_text(), receive_bytes(), or receive_json() detects that the client has closed the connection. The exception has a code attribute containing the close code sent by the client.

WebSocket with Path Parameters and Dependencies

WebSocket endpoints support path parameters, query parameters, and dependency injection:

from fastapi import FastAPI, WebSocket, Depends, Query, Path, Cookie, Header

app = FastAPI()

async def get_token(
    websocket: WebSocket,
    token: str | None = Query(default=None),
    x_token: str | None = Header(default=None),
):
    if token is None and x_token is None:
        await websocket.close(code=1008)
        return None
    return token or x_token

@app.websocket("/ws/{room_id}")
async def room_websocket(
    ws: WebSocket,
    room_id: int = Path(ge=1, le=1000),
    token: str | None = Depends(get_token),
):
    if token is None:
        return
    await ws.accept()
    await ws.send_text(f"Connected to room {room_id}")
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"[Room {room_id}] {data}")
    except WebSocketDisconnect:
        pass

Dependencies for WebSocket endpoints work the same as for HTTP endpoints, including Depends(), Path(), Query(), Header(), and Cookie(). However, WebSocket endpoints do not support Body() parameters since WebSocket communication uses its own message protocol rather than HTTP request bodies.

WebSocket with JSON Messages

For structured communication, use JSON messages with Pydantic validation:

from pydantic import BaseModel, ValidationError

class ChatMessage(BaseModel):
    username: str
    content: str
    channel: str = "general"

@app.websocket("/ws/json")
async def json_websocket(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            raw_data = await ws.receive_json()
            try:
                message = ChatMessage(**raw_data)
                await ws.send_json({
                    "status": "ok",
                    "echo": message.model_dump(),
                })
            except ValidationError as e:
                await ws.send_json({
                    "status": "error",
                    "errors": e.errors(),
                })
    except WebSocketDisconnect:
        pass

The receive_json() method parses the incoming text message as JSON. If the message is not valid JSON, it raises a json.JSONDecodeError. Pydantic validation is applied manually since FastAPI does not automatically validate WebSocket message payloads the way it validates HTTP request bodies.