File size: 3,992 Bytes
59edb07 10292a0 59edb07 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | """WebSocket — real-time event stream for live clients and future game UI."""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
ws_router = APIRouter()
class ConnectionManager:
"""Manages WebSocket connections for real-time event streaming."""
def __init__(self) -> None:
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"WebSocket client connected. Total: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket) -> None:
if websocket in self.active_connections:
self.active_connections.remove(websocket)
logger.info(f"WebSocket client disconnected. Total: {len(self.active_connections)}")
async def broadcast(self, message: dict) -> None:
"""Send a message to all connected clients."""
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(message)
except Exception:
disconnected.append(connection)
for conn in disconnected:
self.disconnect(conn)
async def send_personal(self, websocket: WebSocket, message: dict) -> None:
await websocket.send_json(message)
manager = ConnectionManager()
def get_manager() -> ConnectionManager:
return manager
@ws_router.websocket("/ws/stream")
async def websocket_stream(websocket: WebSocket):
"""Real-time event stream.
Clients receive JSON messages with:
- type: "tick" — new simulation tick with summary
- type: "event" — world event occurred
- type: "conversation" — new conversation turn
- type: "action" — agent performed an action
"""
await manager.connect(websocket)
try:
# Set up event forwarding from the simulation
from soci.api.server import get_simulation
sim = get_simulation()
# Send the full current state immediately so the client is in sync
# before the next tick fires (avoids "Day 1 6:00" on fresh connects).
state = sim.get_state_summary()
await manager.send_personal(websocket, {
"type": "tick",
"tick": sim.clock.total_ticks,
"time": sim.clock.datetime_str,
"state": state,
})
last_tick = sim.clock.total_ticks
while True:
try:
# Wait for client messages (ping/pong or player input)
try:
data = await asyncio.wait_for(
websocket.receive_text(),
timeout=1.0,
)
# Handle client input
try:
msg = json.loads(data)
if msg.get("type") == "ping":
await manager.send_personal(websocket, {"type": "pong"})
except json.JSONDecodeError:
pass
except asyncio.TimeoutError:
pass
# Send state updates if tick advanced
current_tick = sim.clock.total_ticks
if current_tick > last_tick:
state = sim.get_state_summary()
await manager.send_personal(websocket, {
"type": "tick",
"tick": current_tick,
"time": sim.clock.datetime_str,
"state": state,
})
last_tick = current_tick
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
manager.disconnect(websocket)
|