from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import json import uvicorn app = FastAPI() # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Serve static files app.mount("/static", StaticFiles(directory="static"), name="static") # Set up Jinja2 templates templates = Jinja2Templates(directory="templates") with open("data.json", "r", encoding="utf8") as file: maps_data = json.load(file) with open("static/map.html", "r", encoding="utf-8") as f: map_html_template = f.read() @app.get("/api/map/{normalized_name}") async def get_map_data(normalized_name: str): map_entry = maps_data.get(normalized_name) if map_entry: return map_entry return {"error": "Map not found"} @app.get("/", response_class=HTMLResponse) async def read_index(request: Request): return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data}) @app.get("/map/{normalized_name}", response_class=HTMLResponse) async def get_map_page(normalized_name: str): # Optional: Prüfen, ob die Map existiert if normalized_name not in maps_data: return HTMLResponse(content="

Map not found

", status_code=404) # Gib einfach die statische HTML-Seite zurück return HTMLResponse(content=map_html_template) # @app.get("/map/{normalized_name}", response_class=HTMLResponse) # async def get_map_by_normalized_name(request: Request, normalized_name: str): # map_entry = maps_data.get(normalized_name) # if map_entry: # filtered_maps = [map_entry] # else: # filtered_maps = [] # return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps}) class ConnectionManager: def __init__(self): self.groups: dict[str, set[WebSocket]] = {} self.connections: dict[WebSocket, str] = {} async def connect(self, websocket: WebSocket): await websocket.accept() def disconnect(self, websocket: WebSocket): if websocket in self.connections: group_name = self.connections[websocket] self.remove_from_group(websocket, group_name) async def join_group(self, websocket: WebSocket, group_name: str): # Remove from previous group if exists if websocket in self.connections: old_group = self.connections[websocket] self.remove_from_group(websocket, old_group) # Add to new group if group_name not in self.groups: self.groups[group_name] = set() self.groups[group_name].add(websocket) self.connections[websocket] = group_name print(f"Client joined group '{group_name}'") def remove_from_group(self, websocket: WebSocket, group_name: str): if group_name in self.groups and websocket in self.groups[group_name]: self.groups[group_name].remove(websocket) if len(self.groups[group_name]) == 0: del self.groups[group_name] print(f"Group '{group_name}' deleted (no members)") if websocket in self.connections: del self.connections[websocket] print(f"Client removed from group '{group_name}'") async def broadcast_to_group(self, message: str, group_name: str): if group_name in self.groups: for connection in self.groups[group_name]: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() try: message = json.loads(data) # Handle join messages if message.get('type') == 'join': group_name = message.get('group') if group_name: await manager.join_group(websocket, group_name) else: error = json.dumps({ "type": "error", "message": "Missing group name in join request" }) await websocket.send_text(error) # Broadcast all other messages to the group else: if websocket in manager.connections: group_name = manager.connections[websocket] await manager.broadcast_to_group(data, group_name) else: error = json.dumps({ "type": "error", "message": "Join a group before sending messages" }) await websocket.send_text(error) except json.JSONDecodeError: # Handle non-JSON messages by broadcasting directly if websocket in manager.connections: group_name = manager.connections[websocket] await manager.broadcast_to_group(data, group_name) except WebSocketDisconnect: manager.disconnect(websocket) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)