eft_group_map_websocket / server_old.py
Sebastiankay's picture
Update server_old.py
f194199 verified
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import json
import uvicorn
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Set up Jinja2 templates
templates = Jinja2Templates(directory="templates")
with open("data.json", "r", encoding="utf8") as file:
maps_data = json.load(file)
@app.get("/", response_class=HTMLResponse)
async def read_index(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data})
@app.get("/map/{normalized_name}", response_class=HTMLResponse)
async def get_map_by_normalized_name(request: Request, normalized_name: str):
map_entry = maps_data.get(normalized_name)
if map_entry:
filtered_maps = [map_entry]
else:
filtered_maps = []
return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps})
class ConnectionManager:
def __init__(self):
self.groups: dict[str, set[WebSocket]] = {}
self.connections: dict[WebSocket, str] = {}
async def connect(self, websocket: WebSocket):
await websocket.accept()
def disconnect(self, websocket: WebSocket):
if websocket in self.connections:
group_name = self.connections[websocket]
self.remove_from_group(websocket, group_name)
async def join_group(self, websocket: WebSocket, group_name: str):
# Remove from previous group if exists
if websocket in self.connections:
old_group = self.connections[websocket]
self.remove_from_group(websocket, old_group)
# Add to new group
if group_name not in self.groups:
self.groups[group_name] = set()
self.groups[group_name].add(websocket)
self.connections[websocket] = group_name
print(f"Client joined group '{group_name}'")
def remove_from_group(self, websocket: WebSocket, group_name: str):
if group_name in self.groups and websocket in self.groups[group_name]:
self.groups[group_name].remove(websocket)
if len(self.groups[group_name]) == 0:
del self.groups[group_name]
print(f"Group '{group_name}' deleted (no members)")
if websocket in self.connections:
del self.connections[websocket]
print(f"Client removed from group '{group_name}'")
async def broadcast_to_group(self, message: str, group_name: str):
if group_name in self.groups:
for connection in self.groups[group_name]:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
# Handle join messages
if message.get('type') == 'join':
group_name = message.get('group')
if group_name:
await manager.join_group(websocket, group_name)
else:
error = json.dumps({
"type": "error",
"message": "Missing group name in join request"
})
await websocket.send_text(error)
# Broadcast all other messages to the group
else:
if websocket in manager.connections:
group_name = manager.connections[websocket]
await manager.broadcast_to_group(data, group_name)
else:
error = json.dumps({
"type": "error",
"message": "Join a group before sending messages"
})
await websocket.send_text(error)
except json.JSONDecodeError:
# Handle non-JSON messages by broadcasting directly
if websocket in manager.connections:
group_name = manager.connections[websocket]
await manager.broadcast_to_group(data, group_name)
except WebSocketDisconnect:
manager.disconnect(websocket)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)