import json import asyncio import os import time import random from typing import Dict, List from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse app = FastAPI() # --- CONFIGURATION --- DATA_FILE = "/data/leaderboard.json" if os.path.exists("/data") else "leaderboard.json" # A fixed daily seed ensures everyone sees the same level layout LEVEL_SEED = int(time.time() / (24 * 3600)) class ConnectionManager: def __init__(self): self.active_connections: Dict[WebSocket, dict] = {} self.leaderboard: List[dict] = self.load_leaderboard() def load_leaderboard(self): if os.path.exists(DATA_FILE): try: with open(DATA_FILE, "r") as f: return json.load(f) except: return [] return [] def save_leaderboard(self): # Sort by score desc, keep top 10 self.leaderboard.sort(key=lambda x: x['score'], reverse=True) self.leaderboard = self.leaderboard[:10] try: with open(DATA_FILE, "w") as f: json.dump(self.leaderboard, f) except Exception as e: print(f"Save Error: {e}") async def connect(self, websocket: WebSocket): await websocket.accept() # Assign a random skin/color to the player skin_id = random.randint(0, 3) self.active_connections[websocket] = { "u": "Loading...", "x": 0, "y": 0, "vx": 0, "vy": 0, "s": 0, # score "f": 1, # faceRight (1 or -1) "sk": skin_id, "d": False # dead } await websocket.send_json({ "t": "init", "seed": LEVEL_SEED, "lb": self.leaderboard, "skin": skin_id }) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: data = self.active_connections[websocket] self.update_leaderboard(data['u'], data['s']) del self.active_connections[websocket] def update_leaderboard(self, username, score): if score < 50 or username == "Guest": return # Check existing for entry in self.leaderboard: if entry['username'] == username: if score > entry['score']: entry['score'] = score self.save_leaderboard() return # Add new self.leaderboard.append({"username": username, "score": score}) self.save_leaderboard() async def handle_message(self, websocket: WebSocket, data: dict): if websocket not in self.active_connections: return p = self.active_connections[websocket] if data['t'] == 'u': # Update p['u'] = data.get('n', 'Guest')[:12] p['x'] = data.get('x', 0) p['y'] = data.get('y', 0) p['vx'] = data.get('vx', 0) p['vy'] = data.get('vy', 0) p['f'] = data.get('f', 1) p['s'] = data.get('s', 0) p['d'] = False elif data['t'] == 'd': # Death p['d'] = True self.update_leaderboard(p['u'], data.get('s', 0)) async def broadcast(self): if not self.active_connections: return # Compress data for bandwidth (1 char keys) players_packed = [] for ws, p in self.active_connections.items(): if not p['d']: # Only send living players players_packed.append({ "i": id(ws), # Unique temporary ID for interpolation "n": p['u'], "x": int(p['x']), "y": int(p['y']), "vx": round(p['vx'], 1), "vy": round(p['vy'], 1), "f": p['f'], "s": p['s'], "sk": p['sk'] }) msg = { "t": "w", "p": players_packed, "lb": self.leaderboard } # w = world update disconnected = [] for ws in self.active_connections: try: await ws.send_json(msg) except: disconnected.append(ws) for ws in disconnected: self.disconnect(ws) manager = ConnectionManager() @app.on_event("startup") async def start_loop(): asyncio.create_task(broadcast_task()) async def broadcast_task(): # 30 Server Ticks per second is enough for smooth interpolation # Sending 60fps packets over WebSocket often causes congestion/lag while True: start = time.time() await manager.broadcast() elapsed = time.time() - start await asyncio.sleep(max(0, 0.033 - elapsed)) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_json() await manager.handle_message(websocket, data) except WebSocketDisconnect: manager.disconnect(websocket) except: manager.disconnect(websocket) @app.get("/", response_class=HTMLResponse) async def get(): return """
Multiplayer Edition