eft_group_map_websocket / server_old.py
Sebastiankay's picture
Update server_old.py
f194199 verified
raw
history blame
4.83 kB
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import json
import uvicorn
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Set up Jinja2 templates
templates = Jinja2Templates(directory="templates")
with open("data.json", "r", encoding="utf8") as file:
maps_data = json.load(file)
@app.get("/", response_class=HTMLResponse)
async def read_index(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "maps": maps_data})
@app.get("/map/{normalized_name}", response_class=HTMLResponse)
async def get_map_by_normalized_name(request: Request, normalized_name: str):
map_entry = maps_data.get(normalized_name)
if map_entry:
filtered_maps = [map_entry]
else:
filtered_maps = []
return templates.TemplateResponse("map.html", {"request": request, "maps": filtered_maps})
class ConnectionManager:
def __init__(self):
self.groups: dict[str, set[WebSocket]] = {}
self.connections: dict[WebSocket, str] = {}
async def connect(self, websocket: WebSocket):
await websocket.accept()
def disconnect(self, websocket: WebSocket):
if websocket in self.connections:
group_name = self.connections[websocket]
self.remove_from_group(websocket, group_name)
async def join_group(self, websocket: WebSocket, group_name: str):
# Remove from previous group if exists
if websocket in self.connections:
old_group = self.connections[websocket]
self.remove_from_group(websocket, old_group)
# Add to new group
if group_name not in self.groups:
self.groups[group_name] = set()
self.groups[group_name].add(websocket)
self.connections[websocket] = group_name
print(f"Client joined group '{group_name}'")
def remove_from_group(self, websocket: WebSocket, group_name: str):
if group_name in self.groups and websocket in self.groups[group_name]:
self.groups[group_name].remove(websocket)
if len(self.groups[group_name]) == 0:
del self.groups[group_name]
print(f"Group '{group_name}' deleted (no members)")
if websocket in self.connections:
del self.connections[websocket]
print(f"Client removed from group '{group_name}'")
async def broadcast_to_group(self, message: str, group_name: str):
if group_name in self.groups:
for connection in self.groups[group_name]:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
try:
message = json.loads(data)
# Handle join messages
if message.get('type') == 'join':
group_name = message.get('group')
if group_name:
await manager.join_group(websocket, group_name)
else:
error = json.dumps({
"type": "error",
"message": "Missing group name in join request"
})
await websocket.send_text(error)
# Broadcast all other messages to the group
else:
if websocket in manager.connections:
group_name = manager.connections[websocket]
await manager.broadcast_to_group(data, group_name)
else:
error = json.dumps({
"type": "error",
"message": "Join a group before sending messages"
})
await websocket.send_text(error)
except json.JSONDecodeError:
# Handle non-JSON messages by broadcasting directly
if websocket in manager.connections:
group_name = manager.connections[websocket]
await manager.broadcast_to_group(data, group_name)
except WebSocketDisconnect:
manager.disconnect(websocket)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)