from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import json import uvicorn app = FastAPI() # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Serve static files app.mount("/static", StaticFiles(directory="static"), name="static") # Set up Jinja2 templates templates = Jinja2Templates(directory="templates") with open("data.json", "r", encoding="utf8") as file: maps_data = json.load(file) @app.get("/", response_class=HTMLResponse) async def read_index(request: Request): return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data}) @app.get("/map/{normalized_name}", response_class=HTMLResponse) async def get_map_by_normalized_name(request: Request, normalized_name: str): map_entry = maps_data.get(normalized_name) if map_entry: filtered_maps = [map_entry] else: filtered_maps = [] return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps}) class ConnectionManager: def __init__(self): self.groups: dict[str, set[WebSocket]] = {} self.connections: dict[WebSocket, str] = {} async def connect(self, websocket: WebSocket): await websocket.accept() def disconnect(self, websocket: WebSocket): if websocket in self.connections: group_name = self.connections[websocket] self.remove_from_group(websocket, group_name) async def join_group(self, websocket: WebSocket, group_name: str): # Remove from previous group if exists if websocket in self.connections: old_group = self.connections[websocket] self.remove_from_group(websocket, old_group) # Add to new group if group_name not in self.groups: self.groups[group_name] = set() self.groups[group_name].add(websocket) self.connections[websocket] = group_name print(f"Client joined group '{group_name}'") def remove_from_group(self, websocket: WebSocket, group_name: str): if group_name in self.groups and websocket in self.groups[group_name]: self.groups[group_name].remove(websocket) if len(self.groups[group_name]) == 0: del self.groups[group_name] print(f"Group '{group_name}' deleted (no members)") if websocket in self.connections: del self.connections[websocket] print(f"Client removed from group '{group_name}'") async def broadcast_to_group(self, message: str, group_name: str): if group_name in self.groups: for connection in self.groups[group_name]: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() try: message = json.loads(data) # Handle join messages if message.get('type') == 'join': group_name = message.get('group') if group_name: await manager.join_group(websocket, group_name) else: error = json.dumps({ "type": "error", "message": "Missing group name in join request" }) await websocket.send_text(error) # Broadcast all other messages to the group else: if websocket in manager.connections: group_name = manager.connections[websocket] await manager.broadcast_to_group(data, group_name) else: error = json.dumps({ "type": "error", "message": "Join a group before sending messages" }) await websocket.send_text(error) except json.JSONDecodeError: # Handle non-JSON messages by broadcasting directly if websocket in manager.connections: group_name = manager.connections[websocket] await manager.broadcast_to_group(data, group_name) except WebSocketDisconnect: manager.disconnect(websocket) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)