|
|
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import HTMLResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.templating import Jinja2Templates |
|
|
import json |
|
|
import uvicorn |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
|
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
with open("data.json", "r", encoding="utf8") as file: |
|
|
maps_data = json.load(file) |
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def read_index(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data}) |
|
|
|
|
|
|
|
|
@app.get("/map/{normalized_name}", response_class=HTMLResponse) |
|
|
async def get_map_by_normalized_name(request: Request, normalized_name: str): |
|
|
map_entry = maps_data.get(normalized_name) |
|
|
if map_entry: |
|
|
filtered_maps = [map_entry] |
|
|
else: |
|
|
filtered_maps = [] |
|
|
return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps}) |
|
|
|
|
|
|
|
|
class ConnectionManager: |
|
|
def __init__(self): |
|
|
self.groups: dict[str, set[WebSocket]] = {} |
|
|
self.connections: dict[WebSocket, str] = {} |
|
|
|
|
|
async def connect(self, websocket: WebSocket): |
|
|
await websocket.accept() |
|
|
|
|
|
def disconnect(self, websocket: WebSocket): |
|
|
if websocket in self.connections: |
|
|
group_name = self.connections[websocket] |
|
|
self.remove_from_group(websocket, group_name) |
|
|
|
|
|
async def join_group(self, websocket: WebSocket, group_name: str): |
|
|
|
|
|
if websocket in self.connections: |
|
|
old_group = self.connections[websocket] |
|
|
self.remove_from_group(websocket, old_group) |
|
|
|
|
|
|
|
|
if group_name not in self.groups: |
|
|
self.groups[group_name] = set() |
|
|
self.groups[group_name].add(websocket) |
|
|
self.connections[websocket] = group_name |
|
|
print(f"Client joined group '{group_name}'") |
|
|
|
|
|
def remove_from_group(self, websocket: WebSocket, group_name: str): |
|
|
if group_name in self.groups and websocket in self.groups[group_name]: |
|
|
self.groups[group_name].remove(websocket) |
|
|
if len(self.groups[group_name]) == 0: |
|
|
del self.groups[group_name] |
|
|
print(f"Group '{group_name}' deleted (no members)") |
|
|
if websocket in self.connections: |
|
|
del self.connections[websocket] |
|
|
print(f"Client removed from group '{group_name}'") |
|
|
|
|
|
async def broadcast_to_group(self, message: str, group_name: str): |
|
|
if group_name in self.groups: |
|
|
for connection in self.groups[group_name]: |
|
|
await connection.send_text(message) |
|
|
|
|
|
|
|
|
manager = ConnectionManager() |
|
|
|
|
|
|
|
|
@app.websocket("/ws") |
|
|
async def websocket_endpoint(websocket: WebSocket): |
|
|
await manager.connect(websocket) |
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_text() |
|
|
try: |
|
|
message = json.loads(data) |
|
|
|
|
|
if message.get('type') == 'join': |
|
|
group_name = message.get('group') |
|
|
if group_name: |
|
|
await manager.join_group(websocket, group_name) |
|
|
else: |
|
|
error = json.dumps({ |
|
|
"type": "error", |
|
|
"message": "Missing group name in join request" |
|
|
}) |
|
|
await websocket.send_text(error) |
|
|
|
|
|
|
|
|
else: |
|
|
if websocket in manager.connections: |
|
|
group_name = manager.connections[websocket] |
|
|
await manager.broadcast_to_group(data, group_name) |
|
|
else: |
|
|
error = json.dumps({ |
|
|
"type": "error", |
|
|
"message": "Join a group before sending messages" |
|
|
}) |
|
|
await websocket.send_text(error) |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
|
|
|
if websocket in manager.connections: |
|
|
group_name = manager.connections[websocket] |
|
|
await manager.broadcast_to_group(data, group_name) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
manager.disconnect(websocket) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |