""" Entry point — FastAPI + Socket.IO ASGI app. Run with: uvicorn main:app --reload --port 8000 Socket.IO events ──────────────── Client → Server create_room { name } join_room { room_id, name } quick_match { name } player_ready {} get_playing_count {} — server responds with playing_count observe {} — server responds with observe_room or error voice_input { audio_b64, mime_type? } text_input { text } disconnect (automatic) Server → Client room_created { room_id, room } room_joined { room_id, room } room_update { room } — broadcast to room on any lobby change match_found { room_id, room } — to both players in quick match game_start { game_state } — when both ready game_update { game_state } — every tick (~250ms) voice_result { transcription, feedback_text, feedback_audio_b64, results } game_over { winner_id, winner_name } error { message } playing_count { count } — in response to get_playing_count observe_room { room_id } — in response to observe (spectator) """ from __future__ import annotations import asyncio import base64 import logging import os import re import subprocess import sys from pathlib import Path from typing import Any, Optional import socketio from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from game.bot import BOT_PLAYER_ID, BOT_PLAYER_NAME, BotPlayer from game.commands import ActionType from game.engine import GameEngine from game.map import MAP_LANDMARKS from game.state import GameState, TUTORIAL_DUMMY_ID from lobby.manager import LobbyManager from lobby.safe_name import sanitize_player_name from voice import command_parser, stt import config as _config import stats as _stats def require_debug(): """FastAPI dependency — raises 403 in production (DEBUG=False).""" if not _config.DEBUG: raise HTTPException(status_code=403, detail="Not available in production") BOT_OFFER_DELAY = 10 # seconds before offering bot opponent MAX_SIMULTANEOUS_GAMES = 20 logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) def _record_game_over(room, winner_id: str, winner_name: str) -> None: """Persist stats for tutorial completion or PvP win.""" # Never record tutorial dummy wins if winner_id == TUTORIAL_DUMMY_ID: return import time as _time duration = int(_time.time() - room.started_at) if room and room.started_at else None engine = engines.get(room.room_id) if room else None is_tutorial = bool(engine and engine.state.is_tutorial) if engine else False event_type = "tutorial_complete" if is_tutorial else "game_won" # When the bot wins, record the match with the human opponent so it appears # in match history — bot wins are excluded from the leaderboard in stats.py if winner_id == BOT_PLAYER_ID: opponent_name: str | None = None if engine: for pid, pstate in engine.state.players.items(): if pid != BOT_PLAYER_ID: opponent_name = pstate.player_name break try: _stats.record(BOT_PLAYER_NAME, event_type, duration, opponent_name) log.info("Stat recorded: %s %s vs %s %ss", BOT_PLAYER_NAME, event_type, opponent_name, duration) except Exception: log.exception("Failed to record stat") return # Resolve opponent name for PvP/bot matches opponent_name = None if not is_tutorial and engine: for pid, pstate in engine.state.players.items(): if pid != winner_id and pid != BOT_PLAYER_ID: opponent_name = pstate.player_name break # If opponent was the bot, label it if opponent_name is None and any(pid == BOT_PLAYER_ID for pid in engine.state.players): opponent_name = BOT_PLAYER_NAME try: _stats.record(winner_name, event_type, duration, opponent_name) log.info("Stat recorded: %s %s vs %s %ss", winner_name, event_type, opponent_name, duration) except Exception: log.exception("Failed to record stat") def _fill_template(template: str, data: dict[str, Any]) -> str: """Fill placeholders {key} in template with data; missing keys become empty string.""" placeholders = re.findall(r"\{(\w+)\}", template) safe = {k: str(data.get(k, "")) for k in placeholders} try: return template.format(**safe) except KeyError: return template def _compute_feedback_level(cmd_result) -> str: """Derive ok/warning/error from command results.""" if cmd_result.feedback_override: return "error" results = cmd_result.results if not results: return "error" successes = sum(1 for r in results if r.success) if successes == 0: return "error" if successes < len(results): return "warning" # All succeeded — check if any result carries an error key for r in results: if r.data and "error" in r.data: return "warning" return "ok" # --------------------------------------------------------------------------- # Socket.IO + FastAPI setup # --------------------------------------------------------------------------- sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins="*", ping_timeout=60, ping_interval=25, logger=False, engineio_logger=False, ) fastapi_app = FastAPI(title="ChatCraft API") fastapi_app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Paths for static assets (must serve before any catch-all mount) _STATIC_DIR = Path(__file__).parent / "static" _SPRITES_DIR = Path(__file__).parent / "static" / "sprites" _static_router = APIRouter() @_static_router.get("/static/MAP.png", response_class=FileResponse) def _serve_map_png(): p = _STATIC_DIR / "MAP.png" if not p.is_file(): raise HTTPException(404, "MAP.png not found") return FileResponse(p, media_type="image/png") @_static_router.get("/sprites/{kind}/{filename:path}", response_class=FileResponse) def _serve_sprite(kind: str, filename: str): p = _SPRITES_DIR / kind / filename if not p.is_file(): raise HTTPException(404, "Sprite not found") return FileResponse( p, media_type="image/png", headers={"Cache-Control": "no-store"}, ) fastapi_app.include_router(_static_router) _stats_router = APIRouter(prefix="/api") @_stats_router.get("/stats") def get_stats(): return { "tutorial_leaderboard": _stats.get_tutorial_leaderboard(), "wins_leaderboard": _stats.get_wins_leaderboard(), "recent_matches": _stats.get_recent_matches(), } @_stats_router.get("/stats/recent") def get_stats_recent(): """Lightweight endpoint for landing-page toast notifications.""" return {"events": _stats.get_recent_events(limit=10)} fastapi_app.include_router(_stats_router) @fastapi_app.get("/api/config") def get_config(): return {"debug": _config.DEBUG} # Unit sound effects (generated by scripts/generate_unit_sounds.py) _SOUNDS_DIR = Path(__file__).parent / "static" / "sounds" _UNITS_SOUNDS_DIR = _SOUNDS_DIR / "units" _SOUNDS_DIR.mkdir(parents=True, exist_ok=True) _UNITS_SOUNDS_DIR.mkdir(parents=True, exist_ok=True) fastapi_app.mount("/sounds", StaticFiles(directory=str(_SOUNDS_DIR)), name="sounds") # Sprites (unit/building icons generated via Mistral image API) _SPRITES_DIR.mkdir(parents=True, exist_ok=True) if _SPRITES_DIR.exists(): fastapi_app.mount("/sprites", StaticFiles(directory=str(_SPRITES_DIR)), name="sprites") # Map image, map.json, walkable polygon (fallback for other static files) if _STATIC_DIR.exists(): fastapi_app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static") _MAP_JSON = _STATIC_DIR / "map.json" _WALKABLE_JSON = _STATIC_DIR / "walkable.json" _COMPILED_MAP_JSON = _STATIC_DIR / "compiled_map.json" _GAME_POSITIONS_JSON = _STATIC_DIR / "game_positions.json" # Game grid size (must match game/map.py) _MAP_GRID_W = 80 _MAP_GRID_H = 80 # Init stats DB (creates /data/stats.db or local fallback) _stats.init_db() log.info("Stats DB ready at %s", _stats.DB_PATH) # Compile walkable → exterior + holes once at server startup if _WALKABLE_JSON.exists(): try: from game.map import MAP_WIDTH as _MAP_W, MAP_HEIGHT as _MAP_H from game.map_compiler import run_compiler run_compiler(_WALKABLE_JSON, _COMPILED_MAP_JSON, map_width=_MAP_W, map_height=_MAP_H) log.info("Map compiled: %s", _COMPILED_MAP_JSON) except Exception as e: log.warning("Map compiler skipped: %s", e) def _load_game_positions() -> dict: if not _GAME_POSITIONS_JSON.exists(): return {} import json with open(_GAME_POSITIONS_JSON, encoding="utf-8") as f: return json.load(f) @fastapi_app.get("/api/map") async def get_map_data(): """Données de la carte : nom, zones (locations), polygones marchables, positions de départ/expansion.""" import json data = {"map_name": "World Map Cleaned", "locations": [], "walkable_polygons": [], "walkable_polygon": [], "starting_positions": [], "expansion_positions": []} if _MAP_JSON.exists(): with open(_MAP_JSON, encoding="utf-8") as f: file_data = json.load(f) data["map_name"] = file_data.get("map_name", data["map_name"]) data["locations"] = file_data.get("locations", []) if _WALKABLE_JSON.exists(): with open(_WALKABLE_JSON, encoding="utf-8") as f: walk = json.load(f) polygons = walk.get("polygons", []) if not polygons and walk.get("polygon"): polygons = [walk["polygon"]] data["walkable_polygons"] = polygons data["walkable_polygon"] = _merged_walkable_polygon(polygons) pos = _load_game_positions() data["starting_positions"] = pos.get("starting_positions") or [] data["expansion_positions"] = pos.get("expansion_positions") or [] return data def _validate_polygon(polygon: list, name: str) -> None: if not isinstance(polygon, list) or not polygon: raise HTTPException(400, f"{name} requis (liste de [x, y])") for i, pt in enumerate(polygon): if not isinstance(pt, (list, tuple)) or len(pt) != 2: raise HTTPException(400, f"{name} point {i} invalide") x, y = float(pt[0]), float(pt[1]) if not (0 <= x <= 100 and 0 <= y <= 100): raise HTTPException(400, f"{name} point {i} hors [0,100]: {x}, {y}") def _merged_walkable_polygon(polygons: list) -> list: """Fusionne plusieurs polygones en un seul (union) pour le moteur de jeu. Sans shapely, on renvoie le premier ou une concaténation simple (le jeu pourra tester 'point dans l\'un des polygones').""" if not polygons: return [] if len(polygons) == 1: return polygons[0] try: from shapely.geometry import Polygon from shapely.ops import unary_union shapes = [Polygon(p) for p in polygons if len(p) >= 3] if not shapes: return polygons[0] merged = unary_union(shapes) if merged.is_empty: return polygons[0] if merged.geom_type == "Polygon" and not merged.is_empty: ext = list(merged.exterior.coords) return [[round(x, 2), round(y, 2)] for x, y in ext[:-1]] if merged.geom_type == "MultiPolygon": largest = max(merged.geoms, key=lambda g: g.area) ext = list(largest.exterior.coords) return [[round(x, 2), round(y, 2)] for x, y in ext[:-1]] except ImportError: pass return polygons[0] @fastapi_app.put("/api/map/walkable", dependencies=[Depends(require_debug)]) async def save_walkable_polygon(body: dict = Body(...)): """Enregistre les polygones marchables (liste de polygones [x, y] en 0-100). Le jeu reçoit la fusion via GET.""" polygons = body.get("polygons") if not isinstance(polygons, list) or not polygons: raise HTTPException(400, "polygons requis (liste de polygones)") for pi, poly in enumerate(polygons): _validate_polygon(poly, f"polygons[{pi}]") import json _STATIC_DIR.mkdir(parents=True, exist_ok=True) with open(_WALKABLE_JSON, "w", encoding="utf-8") as f: json.dump({"polygons": polygons}, f, indent=2) return {"status": "ok"} def _admin_to_game_x(x: float) -> int: return max(0, min(_MAP_GRID_W - 1, int(round(x * _MAP_GRID_W / 100.0)))) def _admin_to_game_y(y: float) -> int: return max(0, min(_MAP_GRID_H - 1, int(round(y * _MAP_GRID_H / 100.0)))) def _generate_minerals_around(cx: int, cy: int, count: int = 7, radius: float = 3.5) -> list[dict]: """Generate count mineral patches in game grid around (cx, cy).""" import random out = [] used = set() for _ in range(count * 2): if len(out) >= count: break dx = random.randint(-int(radius), int(radius)) dy = random.randint(-int(radius), int(radius)) if dx * dx + dy * dy > radius * radius: continue x = max(0, min(_MAP_GRID_W - 1, cx + dx)) y = max(0, min(_MAP_GRID_H - 1, cy + dy)) if (x, y) in used: continue used.add((x, y)) out.append({"x": x, "y": y}) return out def _generate_geysers_around(cx: int, cy: int, count: int = 1, radius: float = 4.0) -> list[dict]: """Generate count geyser(s) around (cx, cy).""" import random out = [] used = set() for _ in range(count * 3): if len(out) >= count: break dx = random.randint(-int(radius), int(radius)) dy = random.randint(-int(radius), int(radius)) x = max(0, min(_MAP_GRID_W - 1, cx + dx)) y = max(0, min(_MAP_GRID_H - 1, cy + dy)) if (x, y) in used: continue used.add((x, y)) out.append({"x": x, "y": y}) return out @fastapi_app.put("/api/map/positions", dependencies=[Depends(require_debug)]) async def save_map_positions(body: dict = Body(...)): """Enregistre positions de départ (2 ou 3, si 3 alors 2 seront tirées au sort par partie) et expansions.""" import json starts = body.get("starting_positions") expansions = body.get("expansion_positions") if not isinstance(starts, list) or len(starts) < 2: raise HTTPException(400, "starting_positions requis (au moins 2 positions {x, y} en 0-100)") if not isinstance(expansions, list): expansions = [] for i, p in enumerate(starts): if not isinstance(p, dict) or "x" not in p or "y" not in p: raise HTTPException(400, f"starting_positions[{i}] doit être {{x, y}}") x, y = float(p["x"]), float(p["y"]) if not (0 <= x <= 100 and 0 <= y <= 100): raise HTTPException(400, f"starting_positions[{i}] hors [0,100]") for i, p in enumerate(expansions): if not isinstance(p, dict) or "x" not in p or "y" not in p: raise HTTPException(400, f"expansion_positions[{i}] doit être {{x, y}}") x, y = float(p["x"]), float(p["y"]) if not (0 <= x <= 100 and 0 <= y <= 100): raise HTTPException(400, f"expansion_positions[{i}] hors [0,100]") # Keep positions in 0-100 for admin display; embed minerals/geysers per start so we load only the 2 chosen bases' resources starting_positions = [] total_minerals = 0 total_geysers = 0 for p in starts: x, y = float(p["x"]), float(p["y"]) gx, gy = _admin_to_game_x(x), _admin_to_game_y(y) minerals = _generate_minerals_around(gx, gy, count=7) geysers = _generate_geysers_around(gx, gy, count=1) total_minerals += len(minerals) total_geysers += len(geysers) starting_positions.append({"x": x, "y": y, "minerals": minerals, "geysers": geysers}) expansion_positions = [] for p in expansions: x, y = float(p["x"]), float(p["y"]) gx, gy = _admin_to_game_x(x), _admin_to_game_y(y) minerals = _generate_minerals_around(gx, gy, count=7) geysers = _generate_geysers_around(gx, gy, count=1) total_minerals += len(minerals) total_geysers += len(geysers) expansion_positions.append({"x": x, "y": y, "minerals": minerals, "geysers": geysers}) payload = { "starting_positions": starting_positions, "expansion_positions": expansion_positions, } _STATIC_DIR.mkdir(parents=True, exist_ok=True) with open(_GAME_POSITIONS_JSON, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2) return {"status": "ok", "minerals_count": total_minerals, "geysers_count": total_geysers} @fastapi_app.put("/api/map/locations", dependencies=[Depends(require_debug)]) async def save_map_locations(body: dict = Body(...)): """Enregistre les zones clés (locations) dans map.json.""" import json locations = body.get("locations") if not isinstance(locations, list): raise HTTPException(400, "locations doit être une liste de {name, x, y}") for i, loc in enumerate(locations): if not isinstance(loc, dict) or "name" not in loc or "x" not in loc or "y" not in loc: raise HTTPException(400, f"locations[{i}] doit avoir name, x, y") x, y = float(loc["x"]), float(loc["y"]) if not (0 <= x <= 100 and 0 <= y <= 100): raise HTTPException(400, f"locations[{i}] coordonnées hors [0,100]") _STATIC_DIR.mkdir(parents=True, exist_ok=True) existing: dict = {} if _MAP_JSON.exists(): with open(_MAP_JSON, encoding="utf-8") as f: existing = json.load(f) existing["locations"] = [{"name": loc["name"], "x": float(loc["x"]), "y": float(loc["y"])} for loc in locations] with open(_MAP_JSON, "w", encoding="utf-8") as f: json.dump(existing, f, indent=2, ensure_ascii=False) return {"status": "ok", "count": len(locations)} @fastapi_app.get("/api/sounds/units") async def list_unit_sounds(): """Liste tous les sons d'unités présents (unit, kind). Pour l'admin : jouer / supprimer.""" if not _UNITS_SOUNDS_DIR.exists(): return {"sounds": []} sounds: list[dict] = [] for unit_dir in sorted(_UNITS_SOUNDS_DIR.iterdir()): if not unit_dir.is_dir(): continue unit = unit_dir.name for f in unit_dir.iterdir(): if f.suffix.lower() == ".mp3": kind = f.stem sounds.append({"unit": unit, "kind": kind}) return {"sounds": sounds} @fastapi_app.delete("/api/sounds/units/{unit}/{kind}", dependencies=[Depends(require_debug)]) async def delete_unit_sound(unit: str, kind: str): """Supprime un son. Relancer le script régénère uniquement les manquants.""" if not unit.replace("-", "").isalnum() or not kind.replace("-", "").isalnum(): raise HTTPException(400, "unit/kind invalides") path = _UNITS_SOUNDS_DIR / unit / f"{kind}.mp3" if not path.exists(): raise HTTPException(404, "Son introuvable") path.unlink() return None def _list_sprites(subdir: Path, suffix: str = ".png") -> list[dict]: if not subdir.exists(): return [] out = [] for f in sorted(subdir.iterdir()): if f.suffix.lower() == suffix: out.append({"id": f.stem, "name": f.stem}) return out @fastapi_app.get("/api/sprites/units") async def list_unit_sprites(): """Liste les sprites d'unités présents (générés par Mistral).""" return {"sprites": _list_sprites(_SPRITES_DIR / "units")} @fastapi_app.get("/api/sprites/buildings") async def list_building_sprites(): """Liste les sprites de bâtiments présents.""" return {"sprites": _list_sprites(_SPRITES_DIR / "buildings")} @fastapi_app.post("/api/sprites/generate", dependencies=[Depends(require_debug)]) async def generate_sprites(): """Lance la génération des sprites (unités + bâtiments) via l'API Mistral. Peut prendre plusieurs minutes.""" import subprocess def run(): return subprocess.run( [sys.executable, "-m", "scripts.generate_sprites", "--skip-existing"], cwd=str(Path(__file__).parent), capture_output=True, text=True, timeout=600, ) loop = asyncio.get_event_loop() try: result = await loop.run_in_executor(None, run) except subprocess.TimeoutExpired: raise HTTPException(504, "Génération trop longue (timeout 10 min)") except Exception as e: raise HTTPException(500, str(e)) if result.returncode != 0: raise HTTPException(500, result.stderr or result.stdout or "Script failed") return {"status": "ok", "message": "Sprites générés. Recharge la page admin."} def _run_sprite_script(extra_args: list[str]) -> subprocess.CompletedProcess: return subprocess.run( [sys.executable, "-m", "scripts.generate_sprites"] + extra_args, cwd=str(Path(__file__).parent), capture_output=True, text=True, timeout=120, ) @fastapi_app.post("/api/sprites/generate/units/{unit_id}", dependencies=[Depends(require_debug)]) async def generate_one_unit_sprite(unit_id: str): """Régénère une seule image d'unité.""" unit_id = unit_id.strip().lower() valid = [u.value for u in __import__("game.units", fromlist=["UnitType"]).UnitType] if unit_id not in valid: raise HTTPException(400, f"Unit invalide. Valides: {valid}") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--unit", unit_id])) if result.returncode != 0: raise HTTPException(500, result.stderr or result.stdout or "Échec génération") return {"status": "ok", "id": unit_id} @fastapi_app.post("/api/sprites/generate/buildings/{building_id}", dependencies=[Depends(require_debug)]) async def generate_one_building_sprite(building_id: str): """Régénère une seule image de bâtiment.""" building_id = building_id.strip().lower().replace(" ", "_") valid = [b.value for b in __import__("game.buildings", fromlist=["BuildingType"]).BuildingType] if building_id not in valid: raise HTTPException(400, f"Building invalide. Valides: {valid}") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--building", building_id])) if result.returncode != 0: raise HTTPException(500, result.stderr or result.stdout or "Échec génération") return {"status": "ok", "id": building_id} @fastapi_app.get("/api/sprites/resources") async def list_resource_sprites(): """Liste les sprites de ressources présents.""" return {"sprites": _list_sprites(_SPRITES_DIR / "resources")} @fastapi_app.post("/api/sprites/generate/resources/{resource_id}", dependencies=[Depends(require_debug)]) async def generate_one_resource_sprite(resource_id: str): """Régénère une seule icône de ressource (mineral, geyser).""" resource_id = resource_id.strip().lower() valid = ["mineral", "geyser"] if resource_id not in valid: raise HTTPException(400, f"Resource invalide. Valides: {valid}") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--resource", resource_id])) if result.returncode != 0: raise HTTPException(500, result.stderr or result.stdout or "Échec génération") return {"status": "ok", "id": resource_id} _ICONS_DIR = _SPRITES_DIR / "icons" @fastapi_app.get("/api/icons") async def list_icons(): """Liste les icônes UI présentes.""" return {"icons": _list_sprites(_ICONS_DIR)} @fastapi_app.post("/api/sprites/rotate/{kind}/{sprite_id}", dependencies=[Depends(require_debug)]) async def rotate_sprite(kind: str, sprite_id: str): """Fait pivoter un sprite de 90° dans le sens antihoraire (rotate CCW = tourne l'arme vers le haut).""" valid_kinds = {"units": _SPRITES_DIR / "units", "buildings": _SPRITES_DIR / "buildings", "resources": _SPRITES_DIR / "resources"} if kind not in valid_kinds: raise HTTPException(400, f"Kind invalide. Valides: {list(valid_kinds.keys())}") sprite_dir = valid_kinds[kind] sprite_id = sprite_id.strip().lower() path = sprite_dir / f"{sprite_id}.png" if not path.is_file(): raise HTTPException(404, f"Sprite non trouvé: {path.name}") try: from PIL import Image img = Image.open(path) rotated = img.rotate(90, expand=True) rotated.save(path) except Exception as e: raise HTTPException(500, str(e)) return {"status": "ok", "id": sprite_id, "kind": kind} @fastapi_app.post("/api/icons/rotate/{icon_id}", dependencies=[Depends(require_debug)]) async def rotate_icon(icon_id: str): """Fait pivoter une icône UI de 90° dans le sens antihoraire.""" icon_id = icon_id.strip().lower() path = _ICONS_DIR / f"{icon_id}.png" if not path.is_file(): raise HTTPException(404, f"Icône non trouvée: {path.name}") try: from PIL import Image img = Image.open(path) rotated = img.rotate(90, expand=True) rotated.save(path) except Exception as e: raise HTTPException(500, str(e)) return {"status": "ok", "id": icon_id} @fastapi_app.post("/api/icons/generate/{icon_id}", dependencies=[Depends(require_debug)]) async def generate_one_icon(icon_id: str): """Génère une icône UI symbolique (mineral, gas, supply).""" icon_id = icon_id.strip().lower() valid = ["mineral", "gas", "supply"] if icon_id not in valid: raise HTTPException(400, f"Icône invalide. Valides: {valid}") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--icon", icon_id])) if result.returncode != 0: raise HTTPException(500, result.stderr or result.stdout or "Échec génération") return {"status": "ok", "id": icon_id} # Serve SvelteKit static build if present (production / HF Spaces) _FRONTEND_BUILD = Path(__file__).parent.parent / "frontend" / "build" if _FRONTEND_BUILD.exists(): fastapi_app.mount( "/", StaticFiles(directory=str(_FRONTEND_BUILD), html=True), name="frontend", ) # ASGI app: Socket.IO wraps FastAPI app = socketio.ASGIApp(sio, fastapi_app) # --------------------------------------------------------------------------- # Global state # --------------------------------------------------------------------------- lobby = LobbyManager() engines: dict[str, GameEngine] = {} # room_id → GameEngine _bot_timers: dict[str, asyncio.Task] = {} # sid → pending bot-offer timer # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _player_name_in_room(sid: str, room_id: str) -> str: room = lobby.get_room(room_id) if not room: return sid player = room.get_player(sid) return player.name if player else sid async def _emit_error(sid: str, message: str) -> None: await sio.emit("error", {"message": message}, to=sid) def _build_resource_zones(engine: Any, player_id: str) -> list[str]: """Return sorted resource zone names (mineral_1…N, geyser_1…M) by proximity to player base.""" from game.map import ResourceType as _RT player = engine.state.players.get(player_id) cc = player.command_center() if player else None base_x = float(cc.x) + 2 if cc else 40.0 base_y = float(cc.y) + 2 if cc else 40.0 resources = engine.state.game_map.resources minerals = sorted( [r for r in resources if r.resource_type == _RT.MINERAL and not r.is_depleted], key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2, ) geysers = sorted( [r for r in resources if r.resource_type == _RT.GEYSER], key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2, ) zones = [f"mineral_{i + 1}" for i in range(len(minerals))] zones += [f"geyser_{i + 1}" for i in range(len(geysers))] return zones async def _start_game(room_id: str) -> None: """Create game state + engine, emit game_start to human players.""" room = lobby.get_room(room_id) if not room or len(room.players) != 2: return if len(engines) >= MAX_SIMULTANEOUS_GAMES: log.warning("Limite de %d parties simultanées atteinte, impossible de démarrer la room %s", MAX_SIMULTANEOUS_GAMES, room_id) for p in room.players: if p.sid != BOT_PLAYER_ID: await _emit_error(p.sid, "Le serveur est plein (limite de parties simultanées atteinte). Réessaie dans quelques instants.") return p1, p2 = room.players[0], room.players[1] game_state = GameState.create_new( room_id=room_id, player1_id=p1.sid, player1_name=p1.name, player2_id=p2.sid, player2_name=p2.name, ) engine = GameEngine(game_state, sio) engines[room_id] = engine room.engine = engine # Wire up bot if one of the players is the bot for p in (p1, p2): if p.sid == BOT_PLAYER_ID: engine.bot = BotPlayer(engine, BOT_PLAYER_ID) log.info("Bot wired for room %s", room_id) payload = game_state.model_dump(mode="json") # Send game_start only to human players for p in (p1, p2): if p.sid != BOT_PLAYER_ID: await sio.emit("game_start", {"game_state": payload, "your_id": p.sid}, to=p.sid) import time as _time room.started_at = _time.time() engine.start() log.info("Game started in room %s: %s vs %s | parties actives: %d/%d", room_id, p1.name, p2.name, len(engines), MAX_SIMULTANEOUS_GAMES) def _cancel_bot_timer(sid: str) -> None: task = _bot_timers.pop(sid, None) if task and not task.done(): task.cancel() async def _schedule_bot_offer(sid: str, room_id: str) -> None: """Wait BOT_OFFER_DELAY seconds then offer bot to the waiting player.""" await asyncio.sleep(BOT_OFFER_DELAY) _bot_timers.pop(sid, None) # Still in the same room and still waiting for opponent? room = lobby.get_room(room_id) if room and room.status == "waiting" and len(room.players) == 1: await sio.emit("bot_offer", {}, to=sid) log.info("Bot offer sent to %s (room %s)", sid, room_id) async def _schedule_bot_offer_queue(sid: str) -> None: """Wait BOT_OFFER_DELAY seconds then offer bot to a queued player.""" await asyncio.sleep(BOT_OFFER_DELAY) _bot_timers.pop(sid, None) # Still in queue? if lobby._sid_to_room.get(sid) == "__queue__": await sio.emit("bot_offer", {}, to=sid) log.info("Bot offer sent to queued player %s", sid) # --------------------------------------------------------------------------- # Connection lifecycle # --------------------------------------------------------------------------- @sio.event async def connect(sid: str, environ: dict, auth: Optional[dict] = None) -> None: log.info("Client connected: %s", sid) @sio.event async def disconnect(sid: str) -> None: log.info("Client disconnected: %s", sid) _cancel_bot_timer(sid) room = lobby.disconnect(sid) if room: if room.status == "finished" and room.room_id in engines: await engines[room.room_id].stop() del engines[room.room_id] log.info("Partie terminée (déconnexion) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) # Notify remaining human players for player in room.players: if player.sid == BOT_PLAYER_ID: continue await sio.emit( "room_update", {"room": room.to_dict(), "message": "Un joueur a quitté la partie."}, to=player.sid, ) if room.status == "finished": _record_game_over(room, player.sid, player.name) await sio.emit( "game_over", {"winner_id": player.sid, "winner_name": player.name, "reason": "opponent_disconnected"}, to=player.sid, ) # --------------------------------------------------------------------------- # Lobby events # --------------------------------------------------------------------------- @sio.event async def get_playing_count(sid: str, data: dict) -> None: count = lobby.get_playing_count() await sio.emit("playing_count", {"count": count}, to=sid) @sio.event async def get_playing_rooms(sid: str, data: dict) -> None: rooms = lobby.get_playing_rooms() await sio.emit("playing_rooms", {"rooms": rooms}, to=sid) @sio.event async def observe(sid: str, data: dict) -> None: room_id = data.get("room_id") or lobby.get_a_playing_room_id() if not room_id: await _emit_error(sid, "Aucune partie en cours à observer.") return room = lobby.get_room(room_id) if not room or room.status != "playing": await _emit_error(sid, "Cette partie n'est plus disponible.") return await sio.enter_room(sid, room_id) engine = engines.get(room_id) payload = {"room_id": room_id} if engine: payload["game_state"] = engine.state.model_dump(mode="json") await sio.emit("observe_room", payload, to=sid) @sio.event async def create_room(sid: str, data: dict) -> None: name = str(data.get("name", "Player")).strip() or "Player" name = await sanitize_player_name(name) _cancel_bot_timer(sid) await _cleanup_old_game(sid) room = lobby.create_room(sid, name) await sio.enter_room(sid, room.room_id) await sio.emit("room_created", {"room_id": room.room_id, "room": room.to_dict()}, to=sid) log.info("Room %s created by %s", room.room_id, name) # Start bot-offer countdown _bot_timers[sid] = asyncio.create_task(_schedule_bot_offer(sid, room.room_id)) @sio.event async def join_room(sid: str, data: dict) -> None: room_id = str(data.get("room_id", "")).upper().strip() name = str(data.get("name", "Player")).strip() or "Player" name = await sanitize_player_name(name) room, err = lobby.join_room(sid, room_id, name) if err: await _emit_error(sid, err) return # Cancel the host's bot-offer timer — a real opponent arrived host_sid = next((p.sid for p in room.players if p.sid != sid), None) if host_sid: _cancel_bot_timer(host_sid) await sio.enter_room(sid, room_id) await sio.emit("room_joined", {"room_id": room_id, "room": room.to_dict()}, to=sid) # Notify host await sio.emit("room_update", {"room": room.to_dict()}, room=room_id) @sio.event async def quick_match(sid: str, data: dict) -> None: name = str(data.get("name", "Player")).strip() or "Player" name = await sanitize_player_name(name) _cancel_bot_timer(sid) await _cleanup_old_game(sid) room, is_new = lobby.quick_match(sid, name) if not is_new: await sio.emit("match_queued", {"message": "En attente d'un adversaire…"}, to=sid) # Start bot-offer countdown for queued player _cancel_bot_timer(sid) _bot_timers[sid] = asyncio.create_task(_schedule_bot_offer_queue(sid)) return # Match found — cancel timers for both players for player in room.players: _cancel_bot_timer(player.sid) # Notify both players await sio.enter_room(room.players[0].sid, room.room_id) await sio.enter_room(room.players[1].sid, room.room_id) for player in room.players: await sio.emit( "match_found", {"room_id": room.room_id, "room": room.to_dict()}, to=player.sid, ) # Auto-ready both players in quick match and start the game for player in room.players: player.ready = True room.status = "playing" await _start_game(room.room_id) @sio.event async def accept_bot(sid: str, data: dict) -> None: """Player accepts the bot offer — add bot to their room and start the game.""" _cancel_bot_timer(sid) # Find or create room for this player room = lobby.get_room_for_sid(sid) if not room: # Player was in queue — create a room for them first # Find their name from lobby queue state queued = next((name for s, name in lobby._queue if s == sid), None) if queued is None: await _emit_error(sid, "Session expirée, relance une partie.") return # Remove from queue and create a private room lobby._queue = [(s, n) for s, n in lobby._queue if s != sid] lobby._sid_to_room.pop(sid, None) room = lobby.create_room(sid, queued) await sio.enter_room(sid, room.room_id) room_obj, err = lobby.add_bot(room.room_id) if err: await _emit_error(sid, err) return await sio.emit("room_update", {"room": room_obj.to_dict()}, to=sid) # Auto-ready human and start human = room_obj.get_player(sid) if human: human.ready = True room_obj.status = "playing" await _start_game(room_obj.room_id) @sio.event async def cancel_construction(sid: str, data: dict) -> None: """Cancel an in-progress building construction and refund 75% of resources.""" room = lobby.get_room_for_sid(sid) if not room: await _emit_error(sid, "Tu n'es dans aucune room.") return engine = engines.get(room.room_id) if not engine: await _emit_error(sid, "La partie n'a pas encore commencé.") return building_id: str = data.get("building_id", "") if not building_id: await _emit_error(sid, "ID de bâtiment manquant.") return result = engine.cancel_building_construction(sid, building_id) if not result.success: await _emit_error(sid, f"Impossible d'annuler: {result.data.get('error', '?')}") async def _cleanup_old_game(sid: str) -> None: """Leave socket room and stop engine for any game the player is currently in.""" old_room = lobby.get_room_for_sid(sid) if old_room: old_room_id = old_room.room_id await sio.leave_room(sid, old_room_id) if old_room_id in engines: await engines[old_room_id].stop() del engines[old_room_id] log.info("Partie précédente stoppée room %s | parties actives: %d/%d", old_room_id, len(engines), MAX_SIMULTANEOUS_GAMES) @sio.event async def start_tutorial(sid: str, data: dict) -> None: """Start a solo tutorial game for the player.""" name = str(data.get("name", "Player")).strip() or "Player" name = await sanitize_player_name(name) # Cancel any pending timers _cancel_bot_timer(sid) # Leave & stop any previous game/tutorial so stale game_update events # don't bleed into the new tutorial session. await _cleanup_old_game(sid) if len(engines) >= MAX_SIMULTANEOUS_GAMES: log.warning("Limite de %d parties simultanées atteinte, impossible de démarrer le tutoriel pour %s", MAX_SIMULTANEOUS_GAMES, sid) await _emit_error(sid, "Le serveur est plein (limite de parties simultanées atteinte). Réessaie dans quelques instants.") return # Create a private room room = lobby.create_room(sid, name) await sio.enter_room(sid, room.room_id) room.status = "playing" game_state = GameState.create_tutorial( room_id=room.room_id, player_id=sid, player_name=name, ) engine = GameEngine(game_state, sio) engines[room.room_id] = engine room.engine = engine payload = game_state.model_dump(mode="json") await sio.emit("game_start", {"game_state": payload, "your_id": sid}, to=sid) import time as _time room.started_at = _time.time() engine.start() log.info("Tutorial started for %s (%s) in room %s | parties actives: %d/%d", name, sid, room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) @sio.event async def tutorial_complete(sid: str, data: dict) -> None: """Frontend signals that all tutorial objectives are done. Record the stat.""" import time as _time room = lobby.get_room_for_sid(sid) if not room: return duration = int(_time.time() - room.started_at) if room.started_at else None engine = engines.get(room.room_id) player_name = "Player" if engine: pstate = engine.state.players.get(sid) if pstate: player_name = pstate.player_name try: _stats.record(player_name, "tutorial_complete", duration) log.info("Tutorial complete: %s in %ss", player_name, duration) except Exception: log.exception("Failed to record tutorial stat") @sio.event async def player_ready(sid: str, data: dict) -> None: room, all_ready = lobby.set_ready(sid) if not room: await _emit_error(sid, "Tu n'es dans aucune room.") return await sio.emit("room_update", {"room": room.to_dict()}, room=room.room_id) if all_ready: await _start_game(room.room_id) # --------------------------------------------------------------------------- # Voice pipeline # --------------------------------------------------------------------------- @sio.event async def voice_input(sid: str, data: dict) -> None: """ Receives base64-encoded audio from the client. Pipeline: STT → Mistral → game commands → TTS → response """ room = lobby.get_room_for_sid(sid) if not room: await _emit_error(sid, "Tu n'es dans aucune room.") return engine = engines.get(room.room_id) if not engine: await _emit_error(sid, "La partie n'a pas encore commencé.") return player = engine.state.players.get(sid) if not player: await _emit_error(sid, "Joueur introuvable dans la partie.") return audio_b64: str = data.get("audio_b64", "") mime_type: str = data.get("mime_type", "audio/webm") if not audio_b64: await _emit_error(sid, "Aucune donnée audio reçue.") return try: audio_bytes = base64.b64decode(audio_b64) except Exception: await _emit_error(sid, "Données audio invalides (base64 incorrect).") return # 1. Speech-to-text try: transcription = await stt.transcribe(audio_bytes, mime_type) except Exception as exc: log.exception("STT failed") await _emit_error(sid, f"Erreur de reconnaissance vocale: {exc}") return if not transcription: await sio.emit("voice_result", { "transcription": "", "feedback_text": "Je n'ai rien entendu. Appuie et parle!", "feedback_level": "warning", "results": [], }, to=sid) return # 2. Parse command with Mistral resource_zones = _build_resource_zones(engine, sid) try: parsed = await command_parser.parse(transcription, player, resource_zones=resource_zones, landmarks=MAP_LANDMARKS) except Exception as exc: log.exception("Command parsing failed") await _emit_error(sid, f"Erreur d'interprétation: {exc}") return # 3a. Intercept resign — ask for confirmation before executing if any(a.type == ActionType.RESIGN for a in parsed.actions): await sio.emit("resign_confirm_request", {"transcription": transcription}, to=sid) return # 3. Apply commands to game engine cmd_result = engine.apply_command(sid, parsed) if cmd_result.feedback_override: feedback_text = await command_parser.generate_feedback( cmd_result.feedback_override, parsed.language ) else: merged: dict[str, Any] = {} for r in cmd_result.results: merged.update(r.data or {}) if "error" in merged: feedback_text = await command_parser.generate_feedback( merged["error"], parsed.language ) else: feedback_text = _fill_template(parsed.feedback_template, merged) sound_events = [] for r in cmd_result.results: sound_events.extend(getattr(r, "sound_events", []) or []) await sio.emit("voice_result", { "transcription": transcription, "feedback_text": feedback_text, "feedback_level": _compute_feedback_level(cmd_result), "results": [r.model_dump() for r in cmd_result.results], "sound_events": sound_events, }, to=sid) # Check win condition after command if engine.state.winner: winner_state = engine.state.players.get(engine.state.winner) winner_name = winner_state.player_name if winner_state else engine.state.winner _record_game_over(room, engine.state.winner, winner_name) await sio.emit("game_over", { "winner_id": engine.state.winner, "winner_name": winner_name, }, room=room.room_id) await engines[room.room_id].stop() del engines[room.room_id] lobby.finish_room(room.room_id) log.info("Partie terminée (victoire voice) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) async def _process_text_command(sid: str, transcription: str) -> None: """Shared pipeline for text-based commands (skip STT).""" room = lobby.get_room_for_sid(sid) if not room: await _emit_error(sid, "Tu n'es dans aucune room.") return engine = engines.get(room.room_id) if not engine: await _emit_error(sid, "La partie n'a pas encore commencé.") return player = engine.state.players.get(sid) if not player: await _emit_error(sid, "Joueur introuvable dans la partie.") return # Parse command with Mistral resource_zones = _build_resource_zones(engine, sid) try: parsed = await command_parser.parse(transcription, player, resource_zones=resource_zones, landmarks=MAP_LANDMARKS) except Exception as exc: log.exception("Command parsing failed") await _emit_error(sid, f"Erreur d'interprétation: {exc}") return # Intercept resign — ask for confirmation before executing if any(a.type == ActionType.RESIGN for a in parsed.actions): await sio.emit("resign_confirm_request", {"transcription": transcription}, to=sid) return # Apply commands to game engine cmd_result = engine.apply_command(sid, parsed) if cmd_result.feedback_override: feedback_text = await command_parser.generate_feedback( cmd_result.feedback_override, parsed.language ) else: merged = {} for r in cmd_result.results: merged.update(r.data or {}) if "error" in merged: feedback_text = await command_parser.generate_feedback( merged["error"], parsed.language ) else: feedback_text = _fill_template(parsed.feedback_template, merged) sound_events = [] for r in cmd_result.results: sound_events.extend(getattr(r, "sound_events", []) or []) await sio.emit("voice_result", { "transcription": transcription, "feedback_text": feedback_text, "feedback_level": _compute_feedback_level(cmd_result), "results": [r.model_dump() for r in cmd_result.results], "sound_events": sound_events, }, to=sid) if engine.state.winner: winner_state = engine.state.players.get(engine.state.winner) winner_name = winner_state.player_name if winner_state else engine.state.winner _record_game_over(room, engine.state.winner, winner_name) await sio.emit("game_over", { "winner_id": engine.state.winner, "winner_name": winner_name, }, room=room.room_id) await engines[room.room_id].stop() del engines[room.room_id] lobby.finish_room(room.room_id) log.info("Partie terminée (victoire text) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) @sio.event async def text_input(sid: str, data: dict) -> None: """ Receives a plain-text command from the client. Pipeline: Mistral → game commands → response (no STT) """ text: str = str(data.get("text", "")).strip() if not text: await _emit_error(sid, "Aucun texte reçu.") return await _process_text_command(sid, text) @sio.event async def resign_confirm(sid: str, data: dict) -> None: """Player confirmed they want to resign.""" room = lobby.get_room_for_sid(sid) if not room: return engine = engines.get(room.room_id) if not engine: return player = engine.state.players.get(sid) if not player: return result = engine._cmd_resign(player) if not result.success: await _emit_error(sid, result.data.get("error", "Erreur lors de l'abandon.")) return await sio.emit("voice_result", { "transcription": "", "feedback_text": "Vous avez abandonné la partie.", "feedback_level": "ok", "results": [result.model_dump()], "sound_events": [], }, to=sid) if engine.state.winner: winner_state = engine.state.players.get(engine.state.winner) winner_name = winner_state.player_name if winner_state else engine.state.winner _record_game_over(room, engine.state.winner, winner_name) await sio.emit("game_over", { "winner_id": engine.state.winner, "winner_name": winner_name, }, room=room.room_id) await engines[room.room_id].stop() del engines[room.room_id] lobby.finish_room(room.room_id) log.info("Partie terminée (resign) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES)