Spaces:
Runtime error
Runtime error
| """ | |
| Entry point — FastAPI + Socket.IO ASGI app. | |
| Run with: | |
| uvicorn main:app --reload --port 8000 | |
| Socket.IO events | |
| ──────────────── | |
| Client → Server | |
| create_room { name } | |
| join_room { room_id, name } | |
| quick_match { name } | |
| player_ready {} | |
| get_playing_count {} — server responds with playing_count | |
| observe {} — server responds with observe_room or error | |
| voice_input { audio_b64, mime_type? } | |
| text_input { text } | |
| disconnect (automatic) | |
| Server → Client | |
| room_created { room_id, room } | |
| room_joined { room_id, room } | |
| room_update { room } — broadcast to room on any lobby change | |
| match_found { room_id, room } — to both players in quick match | |
| game_start { game_state } — when both ready | |
| game_update { game_state } — every tick (~250ms) | |
| voice_result { transcription, feedback_text, feedback_audio_b64, results } | |
| game_over { winner_id, winner_name } | |
| error { message } | |
| playing_count { count } — in response to get_playing_count | |
| observe_room { room_id } — in response to observe (spectator) | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import logging | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| import socketio | |
| from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from game.bot import BOT_PLAYER_ID, BOT_PLAYER_NAME, BotPlayer | |
| from game.commands import ActionType | |
| from game.engine import GameEngine | |
| from game.map import MAP_LANDMARKS | |
| from game.state import GameState, TUTORIAL_DUMMY_ID | |
| from lobby.manager import LobbyManager | |
| from lobby.safe_name import sanitize_player_name | |
| from voice import command_parser, stt | |
| import config as _config | |
| import stats as _stats | |
| def require_debug(): | |
| """FastAPI dependency — raises 403 in production (DEBUG=False).""" | |
| if not _config.DEBUG: | |
| raise HTTPException(status_code=403, detail="Not available in production") | |
| BOT_OFFER_DELAY = 10 # seconds before offering bot opponent | |
| MAX_SIMULTANEOUS_GAMES = 20 | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger(__name__) | |
| def _record_game_over(room, winner_id: str, winner_name: str) -> None: | |
| """Persist stats for tutorial completion or PvP win.""" | |
| # Never record tutorial dummy wins | |
| if winner_id == TUTORIAL_DUMMY_ID: | |
| return | |
| import time as _time | |
| duration = int(_time.time() - room.started_at) if room and room.started_at else None | |
| engine = engines.get(room.room_id) if room else None | |
| is_tutorial = bool(engine and engine.state.is_tutorial) if engine else False | |
| event_type = "tutorial_complete" if is_tutorial else "game_won" | |
| # When the bot wins, record the match with the human opponent so it appears | |
| # in match history — bot wins are excluded from the leaderboard in stats.py | |
| if winner_id == BOT_PLAYER_ID: | |
| opponent_name: str | None = None | |
| if engine: | |
| for pid, pstate in engine.state.players.items(): | |
| if pid != BOT_PLAYER_ID: | |
| opponent_name = pstate.player_name | |
| break | |
| try: | |
| _stats.record(BOT_PLAYER_NAME, event_type, duration, opponent_name) | |
| log.info("Stat recorded: %s %s vs %s %ss", BOT_PLAYER_NAME, event_type, opponent_name, duration) | |
| except Exception: | |
| log.exception("Failed to record stat") | |
| return | |
| # Resolve opponent name for PvP/bot matches | |
| opponent_name = None | |
| if not is_tutorial and engine: | |
| for pid, pstate in engine.state.players.items(): | |
| if pid != winner_id and pid != BOT_PLAYER_ID: | |
| opponent_name = pstate.player_name | |
| break | |
| # If opponent was the bot, label it | |
| if opponent_name is None and any(pid == BOT_PLAYER_ID for pid in engine.state.players): | |
| opponent_name = BOT_PLAYER_NAME | |
| try: | |
| _stats.record(winner_name, event_type, duration, opponent_name) | |
| log.info("Stat recorded: %s %s vs %s %ss", winner_name, event_type, opponent_name, duration) | |
| except Exception: | |
| log.exception("Failed to record stat") | |
| def _fill_template(template: str, data: dict[str, Any]) -> str: | |
| """Fill placeholders {key} in template with data; missing keys become empty string.""" | |
| placeholders = re.findall(r"\{(\w+)\}", template) | |
| safe = {k: str(data.get(k, "")) for k in placeholders} | |
| try: | |
| return template.format(**safe) | |
| except KeyError: | |
| return template | |
| def _compute_feedback_level(cmd_result) -> str: | |
| """Derive ok/warning/error from command results.""" | |
| if cmd_result.feedback_override: | |
| return "error" | |
| results = cmd_result.results | |
| if not results: | |
| return "error" | |
| successes = sum(1 for r in results if r.success) | |
| if successes == 0: | |
| return "error" | |
| if successes < len(results): | |
| return "warning" | |
| # All succeeded — check if any result carries an error key | |
| for r in results: | |
| if r.data and "error" in r.data: | |
| return "warning" | |
| return "ok" | |
| # --------------------------------------------------------------------------- | |
| # Socket.IO + FastAPI setup | |
| # --------------------------------------------------------------------------- | |
| sio = socketio.AsyncServer( | |
| async_mode="asgi", | |
| cors_allowed_origins="*", | |
| ping_timeout=60, | |
| ping_interval=25, | |
| logger=False, | |
| engineio_logger=False, | |
| ) | |
| fastapi_app = FastAPI(title="ChatCraft API") | |
| fastapi_app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Paths for static assets (must serve before any catch-all mount) | |
| _STATIC_DIR = Path(__file__).parent / "static" | |
| _SPRITES_DIR = Path(__file__).parent / "static" / "sprites" | |
| _static_router = APIRouter() | |
| def _serve_map_png(): | |
| p = _STATIC_DIR / "MAP.png" | |
| if not p.is_file(): | |
| raise HTTPException(404, "MAP.png not found") | |
| return FileResponse(p, media_type="image/png") | |
| def _serve_sprite(kind: str, filename: str): | |
| p = _SPRITES_DIR / kind / filename | |
| if not p.is_file(): | |
| raise HTTPException(404, "Sprite not found") | |
| return FileResponse( | |
| p, | |
| media_type="image/png", | |
| headers={"Cache-Control": "no-store"}, | |
| ) | |
| fastapi_app.include_router(_static_router) | |
| _stats_router = APIRouter(prefix="/api") | |
| def get_stats(): | |
| return { | |
| "tutorial_leaderboard": _stats.get_tutorial_leaderboard(), | |
| "wins_leaderboard": _stats.get_wins_leaderboard(), | |
| "recent_matches": _stats.get_recent_matches(), | |
| } | |
| def get_stats_recent(): | |
| """Lightweight endpoint for landing-page toast notifications.""" | |
| return {"events": _stats.get_recent_events(limit=10)} | |
| fastapi_app.include_router(_stats_router) | |
| def get_config(): | |
| return {"debug": _config.DEBUG} | |
| # Unit sound effects (generated by scripts/generate_unit_sounds.py) | |
| _SOUNDS_DIR = Path(__file__).parent / "static" / "sounds" | |
| _UNITS_SOUNDS_DIR = _SOUNDS_DIR / "units" | |
| _SOUNDS_DIR.mkdir(parents=True, exist_ok=True) | |
| _UNITS_SOUNDS_DIR.mkdir(parents=True, exist_ok=True) | |
| fastapi_app.mount("/sounds", StaticFiles(directory=str(_SOUNDS_DIR)), name="sounds") | |
| # Sprites (unit/building icons generated via Mistral image API) | |
| _SPRITES_DIR.mkdir(parents=True, exist_ok=True) | |
| if _SPRITES_DIR.exists(): | |
| fastapi_app.mount("/sprites", StaticFiles(directory=str(_SPRITES_DIR)), name="sprites") | |
| # Map image, map.json, walkable polygon (fallback for other static files) | |
| if _STATIC_DIR.exists(): | |
| fastapi_app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static") | |
| _MAP_JSON = _STATIC_DIR / "map.json" | |
| _WALKABLE_JSON = _STATIC_DIR / "walkable.json" | |
| _COMPILED_MAP_JSON = _STATIC_DIR / "compiled_map.json" | |
| _GAME_POSITIONS_JSON = _STATIC_DIR / "game_positions.json" | |
| # Game grid size (must match game/map.py) | |
| _MAP_GRID_W = 80 | |
| _MAP_GRID_H = 80 | |
| # Init stats DB (creates /data/stats.db or local fallback) | |
| _stats.init_db() | |
| log.info("Stats DB ready at %s", _stats.DB_PATH) | |
| # Compile walkable → exterior + holes once at server startup | |
| if _WALKABLE_JSON.exists(): | |
| try: | |
| from game.map import MAP_WIDTH as _MAP_W, MAP_HEIGHT as _MAP_H | |
| from game.map_compiler import run_compiler | |
| run_compiler(_WALKABLE_JSON, _COMPILED_MAP_JSON, map_width=_MAP_W, map_height=_MAP_H) | |
| log.info("Map compiled: %s", _COMPILED_MAP_JSON) | |
| except Exception as e: | |
| log.warning("Map compiler skipped: %s", e) | |
| def _load_game_positions() -> dict: | |
| if not _GAME_POSITIONS_JSON.exists(): | |
| return {} | |
| import json | |
| with open(_GAME_POSITIONS_JSON, encoding="utf-8") as f: | |
| return json.load(f) | |
| async def get_map_data(): | |
| """Données de la carte : nom, zones (locations), polygones marchables, positions de départ/expansion.""" | |
| import json | |
| data = {"map_name": "World Map Cleaned", "locations": [], "walkable_polygons": [], "walkable_polygon": [], "starting_positions": [], "expansion_positions": []} | |
| if _MAP_JSON.exists(): | |
| with open(_MAP_JSON, encoding="utf-8") as f: | |
| file_data = json.load(f) | |
| data["map_name"] = file_data.get("map_name", data["map_name"]) | |
| data["locations"] = file_data.get("locations", []) | |
| if _WALKABLE_JSON.exists(): | |
| with open(_WALKABLE_JSON, encoding="utf-8") as f: | |
| walk = json.load(f) | |
| polygons = walk.get("polygons", []) | |
| if not polygons and walk.get("polygon"): | |
| polygons = [walk["polygon"]] | |
| data["walkable_polygons"] = polygons | |
| data["walkable_polygon"] = _merged_walkable_polygon(polygons) | |
| pos = _load_game_positions() | |
| data["starting_positions"] = pos.get("starting_positions") or [] | |
| data["expansion_positions"] = pos.get("expansion_positions") or [] | |
| return data | |
| def _validate_polygon(polygon: list, name: str) -> None: | |
| if not isinstance(polygon, list) or not polygon: | |
| raise HTTPException(400, f"{name} requis (liste de [x, y])") | |
| for i, pt in enumerate(polygon): | |
| if not isinstance(pt, (list, tuple)) or len(pt) != 2: | |
| raise HTTPException(400, f"{name} point {i} invalide") | |
| x, y = float(pt[0]), float(pt[1]) | |
| if not (0 <= x <= 100 and 0 <= y <= 100): | |
| raise HTTPException(400, f"{name} point {i} hors [0,100]: {x}, {y}") | |
| def _merged_walkable_polygon(polygons: list) -> list: | |
| """Fusionne plusieurs polygones en un seul (union) pour le moteur de jeu. Sans shapely, on renvoie le premier ou une concaténation simple (le jeu pourra tester 'point dans l\'un des polygones').""" | |
| if not polygons: | |
| return [] | |
| if len(polygons) == 1: | |
| return polygons[0] | |
| try: | |
| from shapely.geometry import Polygon | |
| from shapely.ops import unary_union | |
| shapes = [Polygon(p) for p in polygons if len(p) >= 3] | |
| if not shapes: | |
| return polygons[0] | |
| merged = unary_union(shapes) | |
| if merged.is_empty: | |
| return polygons[0] | |
| if merged.geom_type == "Polygon" and not merged.is_empty: | |
| ext = list(merged.exterior.coords) | |
| return [[round(x, 2), round(y, 2)] for x, y in ext[:-1]] | |
| if merged.geom_type == "MultiPolygon": | |
| largest = max(merged.geoms, key=lambda g: g.area) | |
| ext = list(largest.exterior.coords) | |
| return [[round(x, 2), round(y, 2)] for x, y in ext[:-1]] | |
| except ImportError: | |
| pass | |
| return polygons[0] | |
| async def save_walkable_polygon(body: dict = Body(...)): | |
| """Enregistre les polygones marchables (liste de polygones [x, y] en 0-100). Le jeu reçoit la fusion via GET.""" | |
| polygons = body.get("polygons") | |
| if not isinstance(polygons, list) or not polygons: | |
| raise HTTPException(400, "polygons requis (liste de polygones)") | |
| for pi, poly in enumerate(polygons): | |
| _validate_polygon(poly, f"polygons[{pi}]") | |
| import json | |
| _STATIC_DIR.mkdir(parents=True, exist_ok=True) | |
| with open(_WALKABLE_JSON, "w", encoding="utf-8") as f: | |
| json.dump({"polygons": polygons}, f, indent=2) | |
| return {"status": "ok"} | |
| def _admin_to_game_x(x: float) -> int: | |
| return max(0, min(_MAP_GRID_W - 1, int(round(x * _MAP_GRID_W / 100.0)))) | |
| def _admin_to_game_y(y: float) -> int: | |
| return max(0, min(_MAP_GRID_H - 1, int(round(y * _MAP_GRID_H / 100.0)))) | |
| def _generate_minerals_around(cx: int, cy: int, count: int = 7, radius: float = 3.5) -> list[dict]: | |
| """Generate count mineral patches in game grid around (cx, cy).""" | |
| import random | |
| out = [] | |
| used = set() | |
| for _ in range(count * 2): | |
| if len(out) >= count: | |
| break | |
| dx = random.randint(-int(radius), int(radius)) | |
| dy = random.randint(-int(radius), int(radius)) | |
| if dx * dx + dy * dy > radius * radius: | |
| continue | |
| x = max(0, min(_MAP_GRID_W - 1, cx + dx)) | |
| y = max(0, min(_MAP_GRID_H - 1, cy + dy)) | |
| if (x, y) in used: | |
| continue | |
| used.add((x, y)) | |
| out.append({"x": x, "y": y}) | |
| return out | |
| def _generate_geysers_around(cx: int, cy: int, count: int = 1, radius: float = 4.0) -> list[dict]: | |
| """Generate count geyser(s) around (cx, cy).""" | |
| import random | |
| out = [] | |
| used = set() | |
| for _ in range(count * 3): | |
| if len(out) >= count: | |
| break | |
| dx = random.randint(-int(radius), int(radius)) | |
| dy = random.randint(-int(radius), int(radius)) | |
| x = max(0, min(_MAP_GRID_W - 1, cx + dx)) | |
| y = max(0, min(_MAP_GRID_H - 1, cy + dy)) | |
| if (x, y) in used: | |
| continue | |
| used.add((x, y)) | |
| out.append({"x": x, "y": y}) | |
| return out | |
| async def save_map_positions(body: dict = Body(...)): | |
| """Enregistre positions de départ (2 ou 3, si 3 alors 2 seront tirées au sort par partie) et expansions.""" | |
| import json | |
| starts = body.get("starting_positions") | |
| expansions = body.get("expansion_positions") | |
| if not isinstance(starts, list) or len(starts) < 2: | |
| raise HTTPException(400, "starting_positions requis (au moins 2 positions {x, y} en 0-100)") | |
| if not isinstance(expansions, list): | |
| expansions = [] | |
| for i, p in enumerate(starts): | |
| if not isinstance(p, dict) or "x" not in p or "y" not in p: | |
| raise HTTPException(400, f"starting_positions[{i}] doit être {{x, y}}") | |
| x, y = float(p["x"]), float(p["y"]) | |
| if not (0 <= x <= 100 and 0 <= y <= 100): | |
| raise HTTPException(400, f"starting_positions[{i}] hors [0,100]") | |
| for i, p in enumerate(expansions): | |
| if not isinstance(p, dict) or "x" not in p or "y" not in p: | |
| raise HTTPException(400, f"expansion_positions[{i}] doit être {{x, y}}") | |
| x, y = float(p["x"]), float(p["y"]) | |
| if not (0 <= x <= 100 and 0 <= y <= 100): | |
| raise HTTPException(400, f"expansion_positions[{i}] hors [0,100]") | |
| # Keep positions in 0-100 for admin display; embed minerals/geysers per start so we load only the 2 chosen bases' resources | |
| starting_positions = [] | |
| total_minerals = 0 | |
| total_geysers = 0 | |
| for p in starts: | |
| x, y = float(p["x"]), float(p["y"]) | |
| gx, gy = _admin_to_game_x(x), _admin_to_game_y(y) | |
| minerals = _generate_minerals_around(gx, gy, count=7) | |
| geysers = _generate_geysers_around(gx, gy, count=1) | |
| total_minerals += len(minerals) | |
| total_geysers += len(geysers) | |
| starting_positions.append({"x": x, "y": y, "minerals": minerals, "geysers": geysers}) | |
| expansion_positions = [] | |
| for p in expansions: | |
| x, y = float(p["x"]), float(p["y"]) | |
| gx, gy = _admin_to_game_x(x), _admin_to_game_y(y) | |
| minerals = _generate_minerals_around(gx, gy, count=7) | |
| geysers = _generate_geysers_around(gx, gy, count=1) | |
| total_minerals += len(minerals) | |
| total_geysers += len(geysers) | |
| expansion_positions.append({"x": x, "y": y, "minerals": minerals, "geysers": geysers}) | |
| payload = { | |
| "starting_positions": starting_positions, | |
| "expansion_positions": expansion_positions, | |
| } | |
| _STATIC_DIR.mkdir(parents=True, exist_ok=True) | |
| with open(_GAME_POSITIONS_JSON, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, indent=2) | |
| return {"status": "ok", "minerals_count": total_minerals, "geysers_count": total_geysers} | |
| async def save_map_locations(body: dict = Body(...)): | |
| """Enregistre les zones clés (locations) dans map.json.""" | |
| import json | |
| locations = body.get("locations") | |
| if not isinstance(locations, list): | |
| raise HTTPException(400, "locations doit être une liste de {name, x, y}") | |
| for i, loc in enumerate(locations): | |
| if not isinstance(loc, dict) or "name" not in loc or "x" not in loc or "y" not in loc: | |
| raise HTTPException(400, f"locations[{i}] doit avoir name, x, y") | |
| x, y = float(loc["x"]), float(loc["y"]) | |
| if not (0 <= x <= 100 and 0 <= y <= 100): | |
| raise HTTPException(400, f"locations[{i}] coordonnées hors [0,100]") | |
| _STATIC_DIR.mkdir(parents=True, exist_ok=True) | |
| existing: dict = {} | |
| if _MAP_JSON.exists(): | |
| with open(_MAP_JSON, encoding="utf-8") as f: | |
| existing = json.load(f) | |
| existing["locations"] = [{"name": loc["name"], "x": float(loc["x"]), "y": float(loc["y"])} for loc in locations] | |
| with open(_MAP_JSON, "w", encoding="utf-8") as f: | |
| json.dump(existing, f, indent=2, ensure_ascii=False) | |
| return {"status": "ok", "count": len(locations)} | |
| async def list_unit_sounds(): | |
| """Liste tous les sons d'unités présents (unit, kind). Pour l'admin : jouer / supprimer.""" | |
| if not _UNITS_SOUNDS_DIR.exists(): | |
| return {"sounds": []} | |
| sounds: list[dict] = [] | |
| for unit_dir in sorted(_UNITS_SOUNDS_DIR.iterdir()): | |
| if not unit_dir.is_dir(): | |
| continue | |
| unit = unit_dir.name | |
| for f in unit_dir.iterdir(): | |
| if f.suffix.lower() == ".mp3": | |
| kind = f.stem | |
| sounds.append({"unit": unit, "kind": kind}) | |
| return {"sounds": sounds} | |
| async def delete_unit_sound(unit: str, kind: str): | |
| """Supprime un son. Relancer le script régénère uniquement les manquants.""" | |
| if not unit.replace("-", "").isalnum() or not kind.replace("-", "").isalnum(): | |
| raise HTTPException(400, "unit/kind invalides") | |
| path = _UNITS_SOUNDS_DIR / unit / f"{kind}.mp3" | |
| if not path.exists(): | |
| raise HTTPException(404, "Son introuvable") | |
| path.unlink() | |
| return None | |
| def _list_sprites(subdir: Path, suffix: str = ".png") -> list[dict]: | |
| if not subdir.exists(): | |
| return [] | |
| out = [] | |
| for f in sorted(subdir.iterdir()): | |
| if f.suffix.lower() == suffix: | |
| out.append({"id": f.stem, "name": f.stem}) | |
| return out | |
| async def list_unit_sprites(): | |
| """Liste les sprites d'unités présents (générés par Mistral).""" | |
| return {"sprites": _list_sprites(_SPRITES_DIR / "units")} | |
| async def list_building_sprites(): | |
| """Liste les sprites de bâtiments présents.""" | |
| return {"sprites": _list_sprites(_SPRITES_DIR / "buildings")} | |
| async def generate_sprites(): | |
| """Lance la génération des sprites (unités + bâtiments) via l'API Mistral. Peut prendre plusieurs minutes.""" | |
| import subprocess | |
| def run(): | |
| return subprocess.run( | |
| [sys.executable, "-m", "scripts.generate_sprites", "--skip-existing"], | |
| cwd=str(Path(__file__).parent), | |
| capture_output=True, | |
| text=True, | |
| timeout=600, | |
| ) | |
| loop = asyncio.get_event_loop() | |
| try: | |
| result = await loop.run_in_executor(None, run) | |
| except subprocess.TimeoutExpired: | |
| raise HTTPException(504, "Génération trop longue (timeout 10 min)") | |
| except Exception as e: | |
| raise HTTPException(500, str(e)) | |
| if result.returncode != 0: | |
| raise HTTPException(500, result.stderr or result.stdout or "Script failed") | |
| return {"status": "ok", "message": "Sprites générés. Recharge la page admin."} | |
| def _run_sprite_script(extra_args: list[str]) -> subprocess.CompletedProcess: | |
| return subprocess.run( | |
| [sys.executable, "-m", "scripts.generate_sprites"] + extra_args, | |
| cwd=str(Path(__file__).parent), | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| ) | |
| async def generate_one_unit_sprite(unit_id: str): | |
| """Régénère une seule image d'unité.""" | |
| unit_id = unit_id.strip().lower() | |
| valid = [u.value for u in __import__("game.units", fromlist=["UnitType"]).UnitType] | |
| if unit_id not in valid: | |
| raise HTTPException(400, f"Unit invalide. Valides: {valid}") | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--unit", unit_id])) | |
| if result.returncode != 0: | |
| raise HTTPException(500, result.stderr or result.stdout or "Échec génération") | |
| return {"status": "ok", "id": unit_id} | |
| async def generate_one_building_sprite(building_id: str): | |
| """Régénère une seule image de bâtiment.""" | |
| building_id = building_id.strip().lower().replace(" ", "_") | |
| valid = [b.value for b in __import__("game.buildings", fromlist=["BuildingType"]).BuildingType] | |
| if building_id not in valid: | |
| raise HTTPException(400, f"Building invalide. Valides: {valid}") | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--building", building_id])) | |
| if result.returncode != 0: | |
| raise HTTPException(500, result.stderr or result.stdout or "Échec génération") | |
| return {"status": "ok", "id": building_id} | |
| async def list_resource_sprites(): | |
| """Liste les sprites de ressources présents.""" | |
| return {"sprites": _list_sprites(_SPRITES_DIR / "resources")} | |
| async def generate_one_resource_sprite(resource_id: str): | |
| """Régénère une seule icône de ressource (mineral, geyser).""" | |
| resource_id = resource_id.strip().lower() | |
| valid = ["mineral", "geyser"] | |
| if resource_id not in valid: | |
| raise HTTPException(400, f"Resource invalide. Valides: {valid}") | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--resource", resource_id])) | |
| if result.returncode != 0: | |
| raise HTTPException(500, result.stderr or result.stdout or "Échec génération") | |
| return {"status": "ok", "id": resource_id} | |
| _ICONS_DIR = _SPRITES_DIR / "icons" | |
| async def list_icons(): | |
| """Liste les icônes UI présentes.""" | |
| return {"icons": _list_sprites(_ICONS_DIR)} | |
| async def rotate_sprite(kind: str, sprite_id: str): | |
| """Fait pivoter un sprite de 90° dans le sens antihoraire (rotate CCW = tourne l'arme vers le haut).""" | |
| valid_kinds = {"units": _SPRITES_DIR / "units", "buildings": _SPRITES_DIR / "buildings", "resources": _SPRITES_DIR / "resources"} | |
| if kind not in valid_kinds: | |
| raise HTTPException(400, f"Kind invalide. Valides: {list(valid_kinds.keys())}") | |
| sprite_dir = valid_kinds[kind] | |
| sprite_id = sprite_id.strip().lower() | |
| path = sprite_dir / f"{sprite_id}.png" | |
| if not path.is_file(): | |
| raise HTTPException(404, f"Sprite non trouvé: {path.name}") | |
| try: | |
| from PIL import Image | |
| img = Image.open(path) | |
| rotated = img.rotate(90, expand=True) | |
| rotated.save(path) | |
| except Exception as e: | |
| raise HTTPException(500, str(e)) | |
| return {"status": "ok", "id": sprite_id, "kind": kind} | |
| async def rotate_icon(icon_id: str): | |
| """Fait pivoter une icône UI de 90° dans le sens antihoraire.""" | |
| icon_id = icon_id.strip().lower() | |
| path = _ICONS_DIR / f"{icon_id}.png" | |
| if not path.is_file(): | |
| raise HTTPException(404, f"Icône non trouvée: {path.name}") | |
| try: | |
| from PIL import Image | |
| img = Image.open(path) | |
| rotated = img.rotate(90, expand=True) | |
| rotated.save(path) | |
| except Exception as e: | |
| raise HTTPException(500, str(e)) | |
| return {"status": "ok", "id": icon_id} | |
| async def generate_one_icon(icon_id: str): | |
| """Génère une icône UI symbolique (mineral, gas, supply).""" | |
| icon_id = icon_id.strip().lower() | |
| valid = ["mineral", "gas", "supply"] | |
| if icon_id not in valid: | |
| raise HTTPException(400, f"Icône invalide. Valides: {valid}") | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--icon", icon_id])) | |
| if result.returncode != 0: | |
| raise HTTPException(500, result.stderr or result.stdout or "Échec génération") | |
| return {"status": "ok", "id": icon_id} | |
| # Serve SvelteKit static build if present (production / HF Spaces) | |
| _FRONTEND_BUILD = Path(__file__).parent.parent / "frontend" / "build" | |
| if _FRONTEND_BUILD.exists(): | |
| fastapi_app.mount( | |
| "/", | |
| StaticFiles(directory=str(_FRONTEND_BUILD), html=True), | |
| name="frontend", | |
| ) | |
| # ASGI app: Socket.IO wraps FastAPI | |
| app = socketio.ASGIApp(sio, fastapi_app) | |
| # --------------------------------------------------------------------------- | |
| # Global state | |
| # --------------------------------------------------------------------------- | |
| lobby = LobbyManager() | |
| engines: dict[str, GameEngine] = {} # room_id → GameEngine | |
| _bot_timers: dict[str, asyncio.Task] = {} # sid → pending bot-offer timer | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _player_name_in_room(sid: str, room_id: str) -> str: | |
| room = lobby.get_room(room_id) | |
| if not room: | |
| return sid | |
| player = room.get_player(sid) | |
| return player.name if player else sid | |
| async def _emit_error(sid: str, message: str) -> None: | |
| await sio.emit("error", {"message": message}, to=sid) | |
| def _build_resource_zones(engine: Any, player_id: str) -> list[str]: | |
| """Return sorted resource zone names (mineral_1…N, geyser_1…M) by proximity to player base.""" | |
| from game.map import ResourceType as _RT | |
| player = engine.state.players.get(player_id) | |
| cc = player.command_center() if player else None | |
| base_x = float(cc.x) + 2 if cc else 40.0 | |
| base_y = float(cc.y) + 2 if cc else 40.0 | |
| resources = engine.state.game_map.resources | |
| minerals = sorted( | |
| [r for r in resources if r.resource_type == _RT.MINERAL and not r.is_depleted], | |
| key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2, | |
| ) | |
| geysers = sorted( | |
| [r for r in resources if r.resource_type == _RT.GEYSER], | |
| key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2, | |
| ) | |
| zones = [f"mineral_{i + 1}" for i in range(len(minerals))] | |
| zones += [f"geyser_{i + 1}" for i in range(len(geysers))] | |
| return zones | |
| async def _start_game(room_id: str) -> None: | |
| """Create game state + engine, emit game_start to human players.""" | |
| room = lobby.get_room(room_id) | |
| if not room or len(room.players) != 2: | |
| return | |
| if len(engines) >= MAX_SIMULTANEOUS_GAMES: | |
| log.warning("Limite de %d parties simultanées atteinte, impossible de démarrer la room %s", MAX_SIMULTANEOUS_GAMES, room_id) | |
| for p in room.players: | |
| if p.sid != BOT_PLAYER_ID: | |
| await _emit_error(p.sid, "Le serveur est plein (limite de parties simultanées atteinte). Réessaie dans quelques instants.") | |
| return | |
| p1, p2 = room.players[0], room.players[1] | |
| game_state = GameState.create_new( | |
| room_id=room_id, | |
| player1_id=p1.sid, | |
| player1_name=p1.name, | |
| player2_id=p2.sid, | |
| player2_name=p2.name, | |
| ) | |
| engine = GameEngine(game_state, sio) | |
| engines[room_id] = engine | |
| room.engine = engine | |
| # Wire up bot if one of the players is the bot | |
| for p in (p1, p2): | |
| if p.sid == BOT_PLAYER_ID: | |
| engine.bot = BotPlayer(engine, BOT_PLAYER_ID) | |
| log.info("Bot wired for room %s", room_id) | |
| payload = game_state.model_dump(mode="json") | |
| # Send game_start only to human players | |
| for p in (p1, p2): | |
| if p.sid != BOT_PLAYER_ID: | |
| await sio.emit("game_start", {"game_state": payload, "your_id": p.sid}, to=p.sid) | |
| import time as _time | |
| room.started_at = _time.time() | |
| engine.start() | |
| log.info("Game started in room %s: %s vs %s | parties actives: %d/%d", room_id, p1.name, p2.name, len(engines), MAX_SIMULTANEOUS_GAMES) | |
| def _cancel_bot_timer(sid: str) -> None: | |
| task = _bot_timers.pop(sid, None) | |
| if task and not task.done(): | |
| task.cancel() | |
| async def _schedule_bot_offer(sid: str, room_id: str) -> None: | |
| """Wait BOT_OFFER_DELAY seconds then offer bot to the waiting player.""" | |
| await asyncio.sleep(BOT_OFFER_DELAY) | |
| _bot_timers.pop(sid, None) | |
| # Still in the same room and still waiting for opponent? | |
| room = lobby.get_room(room_id) | |
| if room and room.status == "waiting" and len(room.players) == 1: | |
| await sio.emit("bot_offer", {}, to=sid) | |
| log.info("Bot offer sent to %s (room %s)", sid, room_id) | |
| async def _schedule_bot_offer_queue(sid: str) -> None: | |
| """Wait BOT_OFFER_DELAY seconds then offer bot to a queued player.""" | |
| await asyncio.sleep(BOT_OFFER_DELAY) | |
| _bot_timers.pop(sid, None) | |
| # Still in queue? | |
| if lobby._sid_to_room.get(sid) == "__queue__": | |
| await sio.emit("bot_offer", {}, to=sid) | |
| log.info("Bot offer sent to queued player %s", sid) | |
| # --------------------------------------------------------------------------- | |
| # Connection lifecycle | |
| # --------------------------------------------------------------------------- | |
| async def connect(sid: str, environ: dict, auth: Optional[dict] = None) -> None: | |
| log.info("Client connected: %s", sid) | |
| async def disconnect(sid: str) -> None: | |
| log.info("Client disconnected: %s", sid) | |
| _cancel_bot_timer(sid) | |
| room = lobby.disconnect(sid) | |
| if room: | |
| if room.status == "finished" and room.room_id in engines: | |
| await engines[room.room_id].stop() | |
| del engines[room.room_id] | |
| log.info("Partie terminée (déconnexion) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) | |
| # Notify remaining human players | |
| for player in room.players: | |
| if player.sid == BOT_PLAYER_ID: | |
| continue | |
| await sio.emit( | |
| "room_update", | |
| {"room": room.to_dict(), "message": "Un joueur a quitté la partie."}, | |
| to=player.sid, | |
| ) | |
| if room.status == "finished": | |
| _record_game_over(room, player.sid, player.name) | |
| await sio.emit( | |
| "game_over", | |
| {"winner_id": player.sid, "winner_name": player.name, | |
| "reason": "opponent_disconnected"}, | |
| to=player.sid, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Lobby events | |
| # --------------------------------------------------------------------------- | |
| async def get_playing_count(sid: str, data: dict) -> None: | |
| count = lobby.get_playing_count() | |
| await sio.emit("playing_count", {"count": count}, to=sid) | |
| async def get_playing_rooms(sid: str, data: dict) -> None: | |
| rooms = lobby.get_playing_rooms() | |
| await sio.emit("playing_rooms", {"rooms": rooms}, to=sid) | |
| async def observe(sid: str, data: dict) -> None: | |
| room_id = data.get("room_id") or lobby.get_a_playing_room_id() | |
| if not room_id: | |
| await _emit_error(sid, "Aucune partie en cours à observer.") | |
| return | |
| room = lobby.get_room(room_id) | |
| if not room or room.status != "playing": | |
| await _emit_error(sid, "Cette partie n'est plus disponible.") | |
| return | |
| await sio.enter_room(sid, room_id) | |
| engine = engines.get(room_id) | |
| payload = {"room_id": room_id} | |
| if engine: | |
| payload["game_state"] = engine.state.model_dump(mode="json") | |
| await sio.emit("observe_room", payload, to=sid) | |
| async def create_room(sid: str, data: dict) -> None: | |
| name = str(data.get("name", "Player")).strip() or "Player" | |
| name = await sanitize_player_name(name) | |
| _cancel_bot_timer(sid) | |
| await _cleanup_old_game(sid) | |
| room = lobby.create_room(sid, name) | |
| await sio.enter_room(sid, room.room_id) | |
| await sio.emit("room_created", {"room_id": room.room_id, "room": room.to_dict()}, to=sid) | |
| log.info("Room %s created by %s", room.room_id, name) | |
| # Start bot-offer countdown | |
| _bot_timers[sid] = asyncio.create_task(_schedule_bot_offer(sid, room.room_id)) | |
| async def join_room(sid: str, data: dict) -> None: | |
| room_id = str(data.get("room_id", "")).upper().strip() | |
| name = str(data.get("name", "Player")).strip() or "Player" | |
| name = await sanitize_player_name(name) | |
| room, err = lobby.join_room(sid, room_id, name) | |
| if err: | |
| await _emit_error(sid, err) | |
| return | |
| # Cancel the host's bot-offer timer — a real opponent arrived | |
| host_sid = next((p.sid for p in room.players if p.sid != sid), None) | |
| if host_sid: | |
| _cancel_bot_timer(host_sid) | |
| await sio.enter_room(sid, room_id) | |
| await sio.emit("room_joined", {"room_id": room_id, "room": room.to_dict()}, to=sid) | |
| # Notify host | |
| await sio.emit("room_update", {"room": room.to_dict()}, room=room_id) | |
| async def quick_match(sid: str, data: dict) -> None: | |
| name = str(data.get("name", "Player")).strip() or "Player" | |
| name = await sanitize_player_name(name) | |
| _cancel_bot_timer(sid) | |
| await _cleanup_old_game(sid) | |
| room, is_new = lobby.quick_match(sid, name) | |
| if not is_new: | |
| await sio.emit("match_queued", {"message": "En attente d'un adversaire…"}, to=sid) | |
| # Start bot-offer countdown for queued player | |
| _cancel_bot_timer(sid) | |
| _bot_timers[sid] = asyncio.create_task(_schedule_bot_offer_queue(sid)) | |
| return | |
| # Match found — cancel timers for both players | |
| for player in room.players: | |
| _cancel_bot_timer(player.sid) | |
| # Notify both players | |
| await sio.enter_room(room.players[0].sid, room.room_id) | |
| await sio.enter_room(room.players[1].sid, room.room_id) | |
| for player in room.players: | |
| await sio.emit( | |
| "match_found", | |
| {"room_id": room.room_id, "room": room.to_dict()}, | |
| to=player.sid, | |
| ) | |
| # Auto-ready both players in quick match and start the game | |
| for player in room.players: | |
| player.ready = True | |
| room.status = "playing" | |
| await _start_game(room.room_id) | |
| async def accept_bot(sid: str, data: dict) -> None: | |
| """Player accepts the bot offer — add bot to their room and start the game.""" | |
| _cancel_bot_timer(sid) | |
| # Find or create room for this player | |
| room = lobby.get_room_for_sid(sid) | |
| if not room: | |
| # Player was in queue — create a room for them first | |
| # Find their name from lobby queue state | |
| queued = next((name for s, name in lobby._queue if s == sid), None) | |
| if queued is None: | |
| await _emit_error(sid, "Session expirée, relance une partie.") | |
| return | |
| # Remove from queue and create a private room | |
| lobby._queue = [(s, n) for s, n in lobby._queue if s != sid] | |
| lobby._sid_to_room.pop(sid, None) | |
| room = lobby.create_room(sid, queued) | |
| await sio.enter_room(sid, room.room_id) | |
| room_obj, err = lobby.add_bot(room.room_id) | |
| if err: | |
| await _emit_error(sid, err) | |
| return | |
| await sio.emit("room_update", {"room": room_obj.to_dict()}, to=sid) | |
| # Auto-ready human and start | |
| human = room_obj.get_player(sid) | |
| if human: | |
| human.ready = True | |
| room_obj.status = "playing" | |
| await _start_game(room_obj.room_id) | |
| async def cancel_construction(sid: str, data: dict) -> None: | |
| """Cancel an in-progress building construction and refund 75% of resources.""" | |
| room = lobby.get_room_for_sid(sid) | |
| if not room: | |
| await _emit_error(sid, "Tu n'es dans aucune room.") | |
| return | |
| engine = engines.get(room.room_id) | |
| if not engine: | |
| await _emit_error(sid, "La partie n'a pas encore commencé.") | |
| return | |
| building_id: str = data.get("building_id", "") | |
| if not building_id: | |
| await _emit_error(sid, "ID de bâtiment manquant.") | |
| return | |
| result = engine.cancel_building_construction(sid, building_id) | |
| if not result.success: | |
| await _emit_error(sid, f"Impossible d'annuler: {result.data.get('error', '?')}") | |
| async def _cleanup_old_game(sid: str) -> None: | |
| """Leave socket room and stop engine for any game the player is currently in.""" | |
| old_room = lobby.get_room_for_sid(sid) | |
| if old_room: | |
| old_room_id = old_room.room_id | |
| await sio.leave_room(sid, old_room_id) | |
| if old_room_id in engines: | |
| await engines[old_room_id].stop() | |
| del engines[old_room_id] | |
| log.info("Partie précédente stoppée room %s | parties actives: %d/%d", old_room_id, len(engines), MAX_SIMULTANEOUS_GAMES) | |
| async def start_tutorial(sid: str, data: dict) -> None: | |
| """Start a solo tutorial game for the player.""" | |
| name = str(data.get("name", "Player")).strip() or "Player" | |
| name = await sanitize_player_name(name) | |
| # Cancel any pending timers | |
| _cancel_bot_timer(sid) | |
| # Leave & stop any previous game/tutorial so stale game_update events | |
| # don't bleed into the new tutorial session. | |
| await _cleanup_old_game(sid) | |
| if len(engines) >= MAX_SIMULTANEOUS_GAMES: | |
| log.warning("Limite de %d parties simultanées atteinte, impossible de démarrer le tutoriel pour %s", MAX_SIMULTANEOUS_GAMES, sid) | |
| await _emit_error(sid, "Le serveur est plein (limite de parties simultanées atteinte). Réessaie dans quelques instants.") | |
| return | |
| # Create a private room | |
| room = lobby.create_room(sid, name) | |
| await sio.enter_room(sid, room.room_id) | |
| room.status = "playing" | |
| game_state = GameState.create_tutorial( | |
| room_id=room.room_id, | |
| player_id=sid, | |
| player_name=name, | |
| ) | |
| engine = GameEngine(game_state, sio) | |
| engines[room.room_id] = engine | |
| room.engine = engine | |
| payload = game_state.model_dump(mode="json") | |
| await sio.emit("game_start", {"game_state": payload, "your_id": sid}, to=sid) | |
| import time as _time | |
| room.started_at = _time.time() | |
| engine.start() | |
| log.info("Tutorial started for %s (%s) in room %s | parties actives: %d/%d", name, sid, room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) | |
| async def tutorial_complete(sid: str, data: dict) -> None: | |
| """Frontend signals that all tutorial objectives are done. Record the stat.""" | |
| import time as _time | |
| room = lobby.get_room_for_sid(sid) | |
| if not room: | |
| return | |
| duration = int(_time.time() - room.started_at) if room.started_at else None | |
| engine = engines.get(room.room_id) | |
| player_name = "Player" | |
| if engine: | |
| pstate = engine.state.players.get(sid) | |
| if pstate: | |
| player_name = pstate.player_name | |
| try: | |
| _stats.record(player_name, "tutorial_complete", duration) | |
| log.info("Tutorial complete: %s in %ss", player_name, duration) | |
| except Exception: | |
| log.exception("Failed to record tutorial stat") | |
| async def player_ready(sid: str, data: dict) -> None: | |
| room, all_ready = lobby.set_ready(sid) | |
| if not room: | |
| await _emit_error(sid, "Tu n'es dans aucune room.") | |
| return | |
| await sio.emit("room_update", {"room": room.to_dict()}, room=room.room_id) | |
| if all_ready: | |
| await _start_game(room.room_id) | |
| # --------------------------------------------------------------------------- | |
| # Voice pipeline | |
| # --------------------------------------------------------------------------- | |
| async def voice_input(sid: str, data: dict) -> None: | |
| """ | |
| Receives base64-encoded audio from the client. | |
| Pipeline: STT → Mistral → game commands → TTS → response | |
| """ | |
| room = lobby.get_room_for_sid(sid) | |
| if not room: | |
| await _emit_error(sid, "Tu n'es dans aucune room.") | |
| return | |
| engine = engines.get(room.room_id) | |
| if not engine: | |
| await _emit_error(sid, "La partie n'a pas encore commencé.") | |
| return | |
| player = engine.state.players.get(sid) | |
| if not player: | |
| await _emit_error(sid, "Joueur introuvable dans la partie.") | |
| return | |
| audio_b64: str = data.get("audio_b64", "") | |
| mime_type: str = data.get("mime_type", "audio/webm") | |
| if not audio_b64: | |
| await _emit_error(sid, "Aucune donnée audio reçue.") | |
| return | |
| try: | |
| audio_bytes = base64.b64decode(audio_b64) | |
| except Exception: | |
| await _emit_error(sid, "Données audio invalides (base64 incorrect).") | |
| return | |
| # 1. Speech-to-text | |
| try: | |
| transcription = await stt.transcribe(audio_bytes, mime_type) | |
| except Exception as exc: | |
| log.exception("STT failed") | |
| await _emit_error(sid, f"Erreur de reconnaissance vocale: {exc}") | |
| return | |
| if not transcription: | |
| await sio.emit("voice_result", { | |
| "transcription": "", | |
| "feedback_text": "Je n'ai rien entendu. Appuie et parle!", | |
| "feedback_level": "warning", | |
| "results": [], | |
| }, to=sid) | |
| return | |
| # 2. Parse command with Mistral | |
| resource_zones = _build_resource_zones(engine, sid) | |
| try: | |
| parsed = await command_parser.parse(transcription, player, resource_zones=resource_zones, landmarks=MAP_LANDMARKS) | |
| except Exception as exc: | |
| log.exception("Command parsing failed") | |
| await _emit_error(sid, f"Erreur d'interprétation: {exc}") | |
| return | |
| # 3a. Intercept resign — ask for confirmation before executing | |
| if any(a.type == ActionType.RESIGN for a in parsed.actions): | |
| await sio.emit("resign_confirm_request", {"transcription": transcription}, to=sid) | |
| return | |
| # 3. Apply commands to game engine | |
| cmd_result = engine.apply_command(sid, parsed) | |
| if cmd_result.feedback_override: | |
| feedback_text = await command_parser.generate_feedback( | |
| cmd_result.feedback_override, parsed.language | |
| ) | |
| else: | |
| merged: dict[str, Any] = {} | |
| for r in cmd_result.results: | |
| merged.update(r.data or {}) | |
| if "error" in merged: | |
| feedback_text = await command_parser.generate_feedback( | |
| merged["error"], parsed.language | |
| ) | |
| else: | |
| feedback_text = _fill_template(parsed.feedback_template, merged) | |
| sound_events = [] | |
| for r in cmd_result.results: | |
| sound_events.extend(getattr(r, "sound_events", []) or []) | |
| await sio.emit("voice_result", { | |
| "transcription": transcription, | |
| "feedback_text": feedback_text, | |
| "feedback_level": _compute_feedback_level(cmd_result), | |
| "results": [r.model_dump() for r in cmd_result.results], | |
| "sound_events": sound_events, | |
| }, to=sid) | |
| # Check win condition after command | |
| if engine.state.winner: | |
| winner_state = engine.state.players.get(engine.state.winner) | |
| winner_name = winner_state.player_name if winner_state else engine.state.winner | |
| _record_game_over(room, engine.state.winner, winner_name) | |
| await sio.emit("game_over", { | |
| "winner_id": engine.state.winner, | |
| "winner_name": winner_name, | |
| }, room=room.room_id) | |
| await engines[room.room_id].stop() | |
| del engines[room.room_id] | |
| lobby.finish_room(room.room_id) | |
| log.info("Partie terminée (victoire voice) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) | |
| async def _process_text_command(sid: str, transcription: str) -> None: | |
| """Shared pipeline for text-based commands (skip STT).""" | |
| room = lobby.get_room_for_sid(sid) | |
| if not room: | |
| await _emit_error(sid, "Tu n'es dans aucune room.") | |
| return | |
| engine = engines.get(room.room_id) | |
| if not engine: | |
| await _emit_error(sid, "La partie n'a pas encore commencé.") | |
| return | |
| player = engine.state.players.get(sid) | |
| if not player: | |
| await _emit_error(sid, "Joueur introuvable dans la partie.") | |
| return | |
| # Parse command with Mistral | |
| resource_zones = _build_resource_zones(engine, sid) | |
| try: | |
| parsed = await command_parser.parse(transcription, player, resource_zones=resource_zones, landmarks=MAP_LANDMARKS) | |
| except Exception as exc: | |
| log.exception("Command parsing failed") | |
| await _emit_error(sid, f"Erreur d'interprétation: {exc}") | |
| return | |
| # Intercept resign — ask for confirmation before executing | |
| if any(a.type == ActionType.RESIGN for a in parsed.actions): | |
| await sio.emit("resign_confirm_request", {"transcription": transcription}, to=sid) | |
| return | |
| # Apply commands to game engine | |
| cmd_result = engine.apply_command(sid, parsed) | |
| if cmd_result.feedback_override: | |
| feedback_text = await command_parser.generate_feedback( | |
| cmd_result.feedback_override, parsed.language | |
| ) | |
| else: | |
| merged = {} | |
| for r in cmd_result.results: | |
| merged.update(r.data or {}) | |
| if "error" in merged: | |
| feedback_text = await command_parser.generate_feedback( | |
| merged["error"], parsed.language | |
| ) | |
| else: | |
| feedback_text = _fill_template(parsed.feedback_template, merged) | |
| sound_events = [] | |
| for r in cmd_result.results: | |
| sound_events.extend(getattr(r, "sound_events", []) or []) | |
| await sio.emit("voice_result", { | |
| "transcription": transcription, | |
| "feedback_text": feedback_text, | |
| "feedback_level": _compute_feedback_level(cmd_result), | |
| "results": [r.model_dump() for r in cmd_result.results], | |
| "sound_events": sound_events, | |
| }, to=sid) | |
| if engine.state.winner: | |
| winner_state = engine.state.players.get(engine.state.winner) | |
| winner_name = winner_state.player_name if winner_state else engine.state.winner | |
| _record_game_over(room, engine.state.winner, winner_name) | |
| await sio.emit("game_over", { | |
| "winner_id": engine.state.winner, | |
| "winner_name": winner_name, | |
| }, room=room.room_id) | |
| await engines[room.room_id].stop() | |
| del engines[room.room_id] | |
| lobby.finish_room(room.room_id) | |
| log.info("Partie terminée (victoire text) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) | |
| async def text_input(sid: str, data: dict) -> None: | |
| """ | |
| Receives a plain-text command from the client. | |
| Pipeline: Mistral → game commands → response (no STT) | |
| """ | |
| text: str = str(data.get("text", "")).strip() | |
| if not text: | |
| await _emit_error(sid, "Aucun texte reçu.") | |
| return | |
| await _process_text_command(sid, text) | |
| async def resign_confirm(sid: str, data: dict) -> None: | |
| """Player confirmed they want to resign.""" | |
| room = lobby.get_room_for_sid(sid) | |
| if not room: | |
| return | |
| engine = engines.get(room.room_id) | |
| if not engine: | |
| return | |
| player = engine.state.players.get(sid) | |
| if not player: | |
| return | |
| result = engine._cmd_resign(player) | |
| if not result.success: | |
| await _emit_error(sid, result.data.get("error", "Erreur lors de l'abandon.")) | |
| return | |
| await sio.emit("voice_result", { | |
| "transcription": "", | |
| "feedback_text": "Vous avez abandonné la partie.", | |
| "feedback_level": "ok", | |
| "results": [result.model_dump()], | |
| "sound_events": [], | |
| }, to=sid) | |
| if engine.state.winner: | |
| winner_state = engine.state.players.get(engine.state.winner) | |
| winner_name = winner_state.player_name if winner_state else engine.state.winner | |
| _record_game_over(room, engine.state.winner, winner_name) | |
| await sio.emit("game_over", { | |
| "winner_id": engine.state.winner, | |
| "winner_name": winner_name, | |
| }, room=room.room_id) | |
| await engines[room.room_id].stop() | |
| del engines[room.room_id] | |
| lobby.finish_room(room.room_id) | |
| log.info("Partie terminée (resign) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES) | |