File size: 5,601 Bytes
a152b95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# WebSockets in FastAPI

FastAPI supports WebSocket connections through Starlette's WebSocket implementation, enabling full-duplex, bidirectional communication between clients and servers. WebSockets are ideal for real-time features such as chat applications, live dashboards, and streaming updates.

## Basic WebSocket Endpoint

```python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    while True:
        data = await ws.receive_text()
        await ws.send_text(f"Echo: {data}")
```

The `@app.websocket()` decorator registers a WebSocket route. The handler receives a `WebSocket` object, which must be explicitly accepted by calling `await ws.accept()` before any data can be sent or received. The `accept()` method sends the HTTP 101 Switching Protocols response to the client.

## Send and Receive Methods

The `WebSocket` object provides several methods for communication:

| Method              | Description                              |
|---------------------|------------------------------------------|
| `receive_text()`    | Receive a text (string) message          |
| `receive_bytes()`   | Receive a binary message                 |
| `receive_json()`    | Receive and parse a JSON message         |
| `send_text(data)`   | Send a text message                      |
| `send_bytes(data)`  | Send binary data                         |
| `send_json(data)`   | Serialize and send a JSON message        |
| `close(code=1000)`  | Close the connection with a status code  |

The default close code is `1000` (normal closure). Other common codes are `1001` (going away), `1008` (policy violation), and `1011` (unexpected condition). The maximum WebSocket message size defaults to 16 MB in Uvicorn, configurable via the `--ws-max-size` flag.

## Handling Disconnects

Clients can disconnect at any time. Handle this with `WebSocketDisconnect`:

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            await manager.broadcast(f"User says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(ws)
        await manager.broadcast("A user has left the chat")
```

The `WebSocketDisconnect` exception is raised when `receive_text()`, `receive_bytes()`, or `receive_json()` detects that the client has closed the connection. The exception has a `code` attribute containing the close code sent by the client.

## WebSocket with Path Parameters and Dependencies

WebSocket endpoints support path parameters, query parameters, and dependency injection:

```python
from fastapi import FastAPI, WebSocket, Depends, Query, Path, Cookie, Header

app = FastAPI()

async def get_token(
    websocket: WebSocket,
    token: str | None = Query(default=None),
    x_token: str | None = Header(default=None),
):
    if token is None and x_token is None:
        await websocket.close(code=1008)
        return None
    return token or x_token

@app.websocket("/ws/{room_id}")
async def room_websocket(
    ws: WebSocket,
    room_id: int = Path(ge=1, le=1000),
    token: str | None = Depends(get_token),
):
    if token is None:
        return
    await ws.accept()
    await ws.send_text(f"Connected to room {room_id}")
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"[Room {room_id}] {data}")
    except WebSocketDisconnect:
        pass
```

Dependencies for WebSocket endpoints work the same as for HTTP endpoints, including `Depends()`, `Path()`, `Query()`, `Header()`, and `Cookie()`. However, WebSocket endpoints do not support `Body()` parameters since WebSocket communication uses its own message protocol rather than HTTP request bodies.

## WebSocket with JSON Messages

For structured communication, use JSON messages with Pydantic validation:

```python
from pydantic import BaseModel, ValidationError

class ChatMessage(BaseModel):
    username: str
    content: str
    channel: str = "general"

@app.websocket("/ws/json")
async def json_websocket(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            raw_data = await ws.receive_json()
            try:
                message = ChatMessage(**raw_data)
                await ws.send_json({
                    "status": "ok",
                    "echo": message.model_dump(),
                })
            except ValidationError as e:
                await ws.send_json({
                    "status": "error",
                    "errors": e.errors(),
                })
    except WebSocketDisconnect:
        pass
```

The `receive_json()` method parses the incoming text message as JSON. If the message is not valid JSON, it raises a `json.JSONDecodeError`. Pydantic validation is applied manually since FastAPI does not automatically validate WebSocket message payloads the way it validates HTTP request bodies.