File size: 4,833 Bytes
7ba87ed f194199 7ba87ed f194199 7ba87ed f194199 7ba87ed f194199 7ba87ed f194199 7ba87ed f194199 7ba87ed f194199 7ba87ed f194199 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import json
import uvicorn
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Set up Jinja2 templates
templates = Jinja2Templates(directory="templates")
with open("data.json", "r", encoding="utf8") as file:
maps_data = json.load(file)
@app.get("/", response_class=HTMLResponse)
async def read_index(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data})
@app.get("/map/{normalized_name}", response_class=HTMLResponse)
async def get_map_by_normalized_name(request: Request, normalized_name: str):
map_entry = maps_data.get(normalized_name)
if map_entry:
filtered_maps = [map_entry]
else:
filtered_maps = []
return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps})
class ConnectionManager:
def __init__(self):
self.groups: dict[str, set[WebSocket]] = {}
self.connections: dict[WebSocket, str] = {}
async def connect(self, websocket: WebSocket):
await websocket.accept()
def disconnect(self, websocket: WebSocket):
if websocket in self.connections:
group_name = self.connections[websocket]
self.remove_from_group(websocket, group_name)
async def join_group(self, websocket: WebSocket, group_name: str):
# Remove from previous group if exists
if websocket in self.connections:
old_group = self.connections[websocket]
self.remove_from_group(websocket, old_group)
# Add to new group
if group_name not in self.groups:
self.groups[group_name] = set()
self.groups[group_name].add(websocket)
self.connections[websocket] = group_name
print(f"Client joined group '{group_name}'")
def remove_from_group(self, websocket: WebSocket, group_name: str):
if group_name in self.groups and websocket in self.groups[group_name]:
self.groups[group_name].remove(websocket)
if len(self.groups[group_name]) == 0:
del self.groups[group_name]
print(f"Group '{group_name}' deleted (no members)")
if websocket in self.connections:
del self.connections[websocket]
print(f"Client removed from group '{group_name}'")
async def broadcast_to_group(self, message: str, group_name: str):
if group_name in self.groups:
for connection in self.groups[group_name]:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
# Handle join messages
if message.get('type') == 'join':
group_name = message.get('group')
if group_name:
await manager.join_group(websocket, group_name)
else:
error = json.dumps({
"type": "error",
"message": "Missing group name in join request"
})
await websocket.send_text(error)
# Broadcast all other messages to the group
else:
if websocket in manager.connections:
group_name = manager.connections[websocket]
await manager.broadcast_to_group(data, group_name)
else:
error = json.dumps({
"type": "error",
"message": "Join a group before sending messages"
})
await websocket.send_text(error)
except json.JSONDecodeError:
# Handle non-JSON messages by broadcasting directly
if websocket in manager.connections:
group_name = manager.connections[websocket]
await manager.broadcast_to_group(data, group_name)
except WebSocketDisconnect:
manager.disconnect(websocket)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000) |