ChatCraft / backend /main.py
gabraken's picture
Store results
b8912e3
"""
Entry point — FastAPI + Socket.IO ASGI app.
Run with:
uvicorn main:app --reload --port 8000
Socket.IO events
────────────────
Client → Server
create_room { name }
join_room { room_id, name }
quick_match { name }
player_ready {}
get_playing_count {} — server responds with playing_count
observe {} — server responds with observe_room or error
voice_input { audio_b64, mime_type? }
text_input { text }
disconnect (automatic)
Server → Client
room_created { room_id, room }
room_joined { room_id, room }
room_update { room } — broadcast to room on any lobby change
match_found { room_id, room } — to both players in quick match
game_start { game_state } — when both ready
game_update { game_state } — every tick (~250ms)
voice_result { transcription, feedback_text, feedback_audio_b64, results }
game_over { winner_id, winner_name }
error { message }
playing_count { count } — in response to get_playing_count
observe_room { room_id } — in response to observe (spectator)
"""
from __future__ import annotations
import asyncio
import base64
import logging
import os
import re
import subprocess
import sys
from pathlib import Path
from typing import Any, Optional
import socketio
from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from game.bot import BOT_PLAYER_ID, BOT_PLAYER_NAME, BotPlayer
from game.commands import ActionType
from game.engine import GameEngine
from game.map import MAP_LANDMARKS
from game.state import GameState, TUTORIAL_DUMMY_ID
from lobby.manager import LobbyManager
from lobby.safe_name import sanitize_player_name
from voice import command_parser, stt
import config as _config
import stats as _stats
def require_debug():
"""FastAPI dependency — raises 403 in production (DEBUG=False)."""
if not _config.DEBUG:
raise HTTPException(status_code=403, detail="Not available in production")
BOT_OFFER_DELAY = 10 # seconds before offering bot opponent
MAX_SIMULTANEOUS_GAMES = 20
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
def _record_game_over(room, winner_id: str, winner_name: str) -> None:
"""Persist stats for tutorial completion or PvP win."""
# Never record tutorial dummy wins
if winner_id == TUTORIAL_DUMMY_ID:
return
import time as _time
duration = int(_time.time() - room.started_at) if room and room.started_at else None
engine = engines.get(room.room_id) if room else None
is_tutorial = bool(engine and engine.state.is_tutorial) if engine else False
event_type = "tutorial_complete" if is_tutorial else "game_won"
# When the bot wins, record the match with the human opponent so it appears
# in match history — bot wins are excluded from the leaderboard in stats.py
if winner_id == BOT_PLAYER_ID:
opponent_name: str | None = None
if engine:
for pid, pstate in engine.state.players.items():
if pid != BOT_PLAYER_ID:
opponent_name = pstate.player_name
break
try:
_stats.record(BOT_PLAYER_NAME, event_type, duration, opponent_name)
log.info("Stat recorded: %s %s vs %s %ss", BOT_PLAYER_NAME, event_type, opponent_name, duration)
except Exception:
log.exception("Failed to record stat")
return
# Resolve opponent name for PvP/bot matches
opponent_name = None
if not is_tutorial and engine:
for pid, pstate in engine.state.players.items():
if pid != winner_id and pid != BOT_PLAYER_ID:
opponent_name = pstate.player_name
break
# If opponent was the bot, label it
if opponent_name is None and any(pid == BOT_PLAYER_ID for pid in engine.state.players):
opponent_name = BOT_PLAYER_NAME
try:
_stats.record(winner_name, event_type, duration, opponent_name)
log.info("Stat recorded: %s %s vs %s %ss", winner_name, event_type, opponent_name, duration)
except Exception:
log.exception("Failed to record stat")
def _fill_template(template: str, data: dict[str, Any]) -> str:
"""Fill placeholders {key} in template with data; missing keys become empty string."""
placeholders = re.findall(r"\{(\w+)\}", template)
safe = {k: str(data.get(k, "")) for k in placeholders}
try:
return template.format(**safe)
except KeyError:
return template
def _compute_feedback_level(cmd_result) -> str:
"""Derive ok/warning/error from command results."""
if cmd_result.feedback_override:
return "error"
results = cmd_result.results
if not results:
return "error"
successes = sum(1 for r in results if r.success)
if successes == 0:
return "error"
if successes < len(results):
return "warning"
# All succeeded — check if any result carries an error key
for r in results:
if r.data and "error" in r.data:
return "warning"
return "ok"
# ---------------------------------------------------------------------------
# Socket.IO + FastAPI setup
# ---------------------------------------------------------------------------
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*",
ping_timeout=60,
ping_interval=25,
logger=False,
engineio_logger=False,
)
fastapi_app = FastAPI(title="ChatCraft API")
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Paths for static assets (must serve before any catch-all mount)
_STATIC_DIR = Path(__file__).parent / "static"
_SPRITES_DIR = Path(__file__).parent / "static" / "sprites"
_static_router = APIRouter()
@_static_router.get("/static/MAP.png", response_class=FileResponse)
def _serve_map_png():
p = _STATIC_DIR / "MAP.png"
if not p.is_file():
raise HTTPException(404, "MAP.png not found")
return FileResponse(p, media_type="image/png")
@_static_router.get("/sprites/{kind}/{filename:path}", response_class=FileResponse)
def _serve_sprite(kind: str, filename: str):
p = _SPRITES_DIR / kind / filename
if not p.is_file():
raise HTTPException(404, "Sprite not found")
return FileResponse(
p,
media_type="image/png",
headers={"Cache-Control": "no-store"},
)
fastapi_app.include_router(_static_router)
_stats_router = APIRouter(prefix="/api")
@_stats_router.get("/stats")
def get_stats():
return {
"tutorial_leaderboard": _stats.get_tutorial_leaderboard(),
"wins_leaderboard": _stats.get_wins_leaderboard(),
"recent_matches": _stats.get_recent_matches(),
}
@_stats_router.get("/stats/recent")
def get_stats_recent():
"""Lightweight endpoint for landing-page toast notifications."""
return {"events": _stats.get_recent_events(limit=10)}
fastapi_app.include_router(_stats_router)
@fastapi_app.get("/api/config")
def get_config():
return {"debug": _config.DEBUG}
# Unit sound effects (generated by scripts/generate_unit_sounds.py)
_SOUNDS_DIR = Path(__file__).parent / "static" / "sounds"
_UNITS_SOUNDS_DIR = _SOUNDS_DIR / "units"
_SOUNDS_DIR.mkdir(parents=True, exist_ok=True)
_UNITS_SOUNDS_DIR.mkdir(parents=True, exist_ok=True)
fastapi_app.mount("/sounds", StaticFiles(directory=str(_SOUNDS_DIR)), name="sounds")
# Sprites (unit/building icons generated via Mistral image API)
_SPRITES_DIR.mkdir(parents=True, exist_ok=True)
if _SPRITES_DIR.exists():
fastapi_app.mount("/sprites", StaticFiles(directory=str(_SPRITES_DIR)), name="sprites")
# Map image, map.json, walkable polygon (fallback for other static files)
if _STATIC_DIR.exists():
fastapi_app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static")
_MAP_JSON = _STATIC_DIR / "map.json"
_WALKABLE_JSON = _STATIC_DIR / "walkable.json"
_COMPILED_MAP_JSON = _STATIC_DIR / "compiled_map.json"
_GAME_POSITIONS_JSON = _STATIC_DIR / "game_positions.json"
# Game grid size (must match game/map.py)
_MAP_GRID_W = 80
_MAP_GRID_H = 80
# Init stats DB (creates /data/stats.db or local fallback)
_stats.init_db()
log.info("Stats DB ready at %s", _stats.DB_PATH)
# Compile walkable → exterior + holes once at server startup
if _WALKABLE_JSON.exists():
try:
from game.map import MAP_WIDTH as _MAP_W, MAP_HEIGHT as _MAP_H
from game.map_compiler import run_compiler
run_compiler(_WALKABLE_JSON, _COMPILED_MAP_JSON, map_width=_MAP_W, map_height=_MAP_H)
log.info("Map compiled: %s", _COMPILED_MAP_JSON)
except Exception as e:
log.warning("Map compiler skipped: %s", e)
def _load_game_positions() -> dict:
if not _GAME_POSITIONS_JSON.exists():
return {}
import json
with open(_GAME_POSITIONS_JSON, encoding="utf-8") as f:
return json.load(f)
@fastapi_app.get("/api/map")
async def get_map_data():
"""Données de la carte : nom, zones (locations), polygones marchables, positions de départ/expansion."""
import json
data = {"map_name": "World Map Cleaned", "locations": [], "walkable_polygons": [], "walkable_polygon": [], "starting_positions": [], "expansion_positions": []}
if _MAP_JSON.exists():
with open(_MAP_JSON, encoding="utf-8") as f:
file_data = json.load(f)
data["map_name"] = file_data.get("map_name", data["map_name"])
data["locations"] = file_data.get("locations", [])
if _WALKABLE_JSON.exists():
with open(_WALKABLE_JSON, encoding="utf-8") as f:
walk = json.load(f)
polygons = walk.get("polygons", [])
if not polygons and walk.get("polygon"):
polygons = [walk["polygon"]]
data["walkable_polygons"] = polygons
data["walkable_polygon"] = _merged_walkable_polygon(polygons)
pos = _load_game_positions()
data["starting_positions"] = pos.get("starting_positions") or []
data["expansion_positions"] = pos.get("expansion_positions") or []
return data
def _validate_polygon(polygon: list, name: str) -> None:
if not isinstance(polygon, list) or not polygon:
raise HTTPException(400, f"{name} requis (liste de [x, y])")
for i, pt in enumerate(polygon):
if not isinstance(pt, (list, tuple)) or len(pt) != 2:
raise HTTPException(400, f"{name} point {i} invalide")
x, y = float(pt[0]), float(pt[1])
if not (0 <= x <= 100 and 0 <= y <= 100):
raise HTTPException(400, f"{name} point {i} hors [0,100]: {x}, {y}")
def _merged_walkable_polygon(polygons: list) -> list:
"""Fusionne plusieurs polygones en un seul (union) pour le moteur de jeu. Sans shapely, on renvoie le premier ou une concaténation simple (le jeu pourra tester 'point dans l\'un des polygones')."""
if not polygons:
return []
if len(polygons) == 1:
return polygons[0]
try:
from shapely.geometry import Polygon
from shapely.ops import unary_union
shapes = [Polygon(p) for p in polygons if len(p) >= 3]
if not shapes:
return polygons[0]
merged = unary_union(shapes)
if merged.is_empty:
return polygons[0]
if merged.geom_type == "Polygon" and not merged.is_empty:
ext = list(merged.exterior.coords)
return [[round(x, 2), round(y, 2)] for x, y in ext[:-1]]
if merged.geom_type == "MultiPolygon":
largest = max(merged.geoms, key=lambda g: g.area)
ext = list(largest.exterior.coords)
return [[round(x, 2), round(y, 2)] for x, y in ext[:-1]]
except ImportError:
pass
return polygons[0]
@fastapi_app.put("/api/map/walkable", dependencies=[Depends(require_debug)])
async def save_walkable_polygon(body: dict = Body(...)):
"""Enregistre les polygones marchables (liste de polygones [x, y] en 0-100). Le jeu reçoit la fusion via GET."""
polygons = body.get("polygons")
if not isinstance(polygons, list) or not polygons:
raise HTTPException(400, "polygons requis (liste de polygones)")
for pi, poly in enumerate(polygons):
_validate_polygon(poly, f"polygons[{pi}]")
import json
_STATIC_DIR.mkdir(parents=True, exist_ok=True)
with open(_WALKABLE_JSON, "w", encoding="utf-8") as f:
json.dump({"polygons": polygons}, f, indent=2)
return {"status": "ok"}
def _admin_to_game_x(x: float) -> int:
return max(0, min(_MAP_GRID_W - 1, int(round(x * _MAP_GRID_W / 100.0))))
def _admin_to_game_y(y: float) -> int:
return max(0, min(_MAP_GRID_H - 1, int(round(y * _MAP_GRID_H / 100.0))))
def _generate_minerals_around(cx: int, cy: int, count: int = 7, radius: float = 3.5) -> list[dict]:
"""Generate count mineral patches in game grid around (cx, cy)."""
import random
out = []
used = set()
for _ in range(count * 2):
if len(out) >= count:
break
dx = random.randint(-int(radius), int(radius))
dy = random.randint(-int(radius), int(radius))
if dx * dx + dy * dy > radius * radius:
continue
x = max(0, min(_MAP_GRID_W - 1, cx + dx))
y = max(0, min(_MAP_GRID_H - 1, cy + dy))
if (x, y) in used:
continue
used.add((x, y))
out.append({"x": x, "y": y})
return out
def _generate_geysers_around(cx: int, cy: int, count: int = 1, radius: float = 4.0) -> list[dict]:
"""Generate count geyser(s) around (cx, cy)."""
import random
out = []
used = set()
for _ in range(count * 3):
if len(out) >= count:
break
dx = random.randint(-int(radius), int(radius))
dy = random.randint(-int(radius), int(radius))
x = max(0, min(_MAP_GRID_W - 1, cx + dx))
y = max(0, min(_MAP_GRID_H - 1, cy + dy))
if (x, y) in used:
continue
used.add((x, y))
out.append({"x": x, "y": y})
return out
@fastapi_app.put("/api/map/positions", dependencies=[Depends(require_debug)])
async def save_map_positions(body: dict = Body(...)):
"""Enregistre positions de départ (2 ou 3, si 3 alors 2 seront tirées au sort par partie) et expansions."""
import json
starts = body.get("starting_positions")
expansions = body.get("expansion_positions")
if not isinstance(starts, list) or len(starts) < 2:
raise HTTPException(400, "starting_positions requis (au moins 2 positions {x, y} en 0-100)")
if not isinstance(expansions, list):
expansions = []
for i, p in enumerate(starts):
if not isinstance(p, dict) or "x" not in p or "y" not in p:
raise HTTPException(400, f"starting_positions[{i}] doit être {{x, y}}")
x, y = float(p["x"]), float(p["y"])
if not (0 <= x <= 100 and 0 <= y <= 100):
raise HTTPException(400, f"starting_positions[{i}] hors [0,100]")
for i, p in enumerate(expansions):
if not isinstance(p, dict) or "x" not in p or "y" not in p:
raise HTTPException(400, f"expansion_positions[{i}] doit être {{x, y}}")
x, y = float(p["x"]), float(p["y"])
if not (0 <= x <= 100 and 0 <= y <= 100):
raise HTTPException(400, f"expansion_positions[{i}] hors [0,100]")
# Keep positions in 0-100 for admin display; embed minerals/geysers per start so we load only the 2 chosen bases' resources
starting_positions = []
total_minerals = 0
total_geysers = 0
for p in starts:
x, y = float(p["x"]), float(p["y"])
gx, gy = _admin_to_game_x(x), _admin_to_game_y(y)
minerals = _generate_minerals_around(gx, gy, count=7)
geysers = _generate_geysers_around(gx, gy, count=1)
total_minerals += len(minerals)
total_geysers += len(geysers)
starting_positions.append({"x": x, "y": y, "minerals": minerals, "geysers": geysers})
expansion_positions = []
for p in expansions:
x, y = float(p["x"]), float(p["y"])
gx, gy = _admin_to_game_x(x), _admin_to_game_y(y)
minerals = _generate_minerals_around(gx, gy, count=7)
geysers = _generate_geysers_around(gx, gy, count=1)
total_minerals += len(minerals)
total_geysers += len(geysers)
expansion_positions.append({"x": x, "y": y, "minerals": minerals, "geysers": geysers})
payload = {
"starting_positions": starting_positions,
"expansion_positions": expansion_positions,
}
_STATIC_DIR.mkdir(parents=True, exist_ok=True)
with open(_GAME_POSITIONS_JSON, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
return {"status": "ok", "minerals_count": total_minerals, "geysers_count": total_geysers}
@fastapi_app.put("/api/map/locations", dependencies=[Depends(require_debug)])
async def save_map_locations(body: dict = Body(...)):
"""Enregistre les zones clés (locations) dans map.json."""
import json
locations = body.get("locations")
if not isinstance(locations, list):
raise HTTPException(400, "locations doit être une liste de {name, x, y}")
for i, loc in enumerate(locations):
if not isinstance(loc, dict) or "name" not in loc or "x" not in loc or "y" not in loc:
raise HTTPException(400, f"locations[{i}] doit avoir name, x, y")
x, y = float(loc["x"]), float(loc["y"])
if not (0 <= x <= 100 and 0 <= y <= 100):
raise HTTPException(400, f"locations[{i}] coordonnées hors [0,100]")
_STATIC_DIR.mkdir(parents=True, exist_ok=True)
existing: dict = {}
if _MAP_JSON.exists():
with open(_MAP_JSON, encoding="utf-8") as f:
existing = json.load(f)
existing["locations"] = [{"name": loc["name"], "x": float(loc["x"]), "y": float(loc["y"])} for loc in locations]
with open(_MAP_JSON, "w", encoding="utf-8") as f:
json.dump(existing, f, indent=2, ensure_ascii=False)
return {"status": "ok", "count": len(locations)}
@fastapi_app.get("/api/sounds/units")
async def list_unit_sounds():
"""Liste tous les sons d'unités présents (unit, kind). Pour l'admin : jouer / supprimer."""
if not _UNITS_SOUNDS_DIR.exists():
return {"sounds": []}
sounds: list[dict] = []
for unit_dir in sorted(_UNITS_SOUNDS_DIR.iterdir()):
if not unit_dir.is_dir():
continue
unit = unit_dir.name
for f in unit_dir.iterdir():
if f.suffix.lower() == ".mp3":
kind = f.stem
sounds.append({"unit": unit, "kind": kind})
return {"sounds": sounds}
@fastapi_app.delete("/api/sounds/units/{unit}/{kind}", dependencies=[Depends(require_debug)])
async def delete_unit_sound(unit: str, kind: str):
"""Supprime un son. Relancer le script régénère uniquement les manquants."""
if not unit.replace("-", "").isalnum() or not kind.replace("-", "").isalnum():
raise HTTPException(400, "unit/kind invalides")
path = _UNITS_SOUNDS_DIR / unit / f"{kind}.mp3"
if not path.exists():
raise HTTPException(404, "Son introuvable")
path.unlink()
return None
def _list_sprites(subdir: Path, suffix: str = ".png") -> list[dict]:
if not subdir.exists():
return []
out = []
for f in sorted(subdir.iterdir()):
if f.suffix.lower() == suffix:
out.append({"id": f.stem, "name": f.stem})
return out
@fastapi_app.get("/api/sprites/units")
async def list_unit_sprites():
"""Liste les sprites d'unités présents (générés par Mistral)."""
return {"sprites": _list_sprites(_SPRITES_DIR / "units")}
@fastapi_app.get("/api/sprites/buildings")
async def list_building_sprites():
"""Liste les sprites de bâtiments présents."""
return {"sprites": _list_sprites(_SPRITES_DIR / "buildings")}
@fastapi_app.post("/api/sprites/generate", dependencies=[Depends(require_debug)])
async def generate_sprites():
"""Lance la génération des sprites (unités + bâtiments) via l'API Mistral. Peut prendre plusieurs minutes."""
import subprocess
def run():
return subprocess.run(
[sys.executable, "-m", "scripts.generate_sprites", "--skip-existing"],
cwd=str(Path(__file__).parent),
capture_output=True,
text=True,
timeout=600,
)
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(None, run)
except subprocess.TimeoutExpired:
raise HTTPException(504, "Génération trop longue (timeout 10 min)")
except Exception as e:
raise HTTPException(500, str(e))
if result.returncode != 0:
raise HTTPException(500, result.stderr or result.stdout or "Script failed")
return {"status": "ok", "message": "Sprites générés. Recharge la page admin."}
def _run_sprite_script(extra_args: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(
[sys.executable, "-m", "scripts.generate_sprites"] + extra_args,
cwd=str(Path(__file__).parent),
capture_output=True,
text=True,
timeout=120,
)
@fastapi_app.post("/api/sprites/generate/units/{unit_id}", dependencies=[Depends(require_debug)])
async def generate_one_unit_sprite(unit_id: str):
"""Régénère une seule image d'unité."""
unit_id = unit_id.strip().lower()
valid = [u.value for u in __import__("game.units", fromlist=["UnitType"]).UnitType]
if unit_id not in valid:
raise HTTPException(400, f"Unit invalide. Valides: {valid}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--unit", unit_id]))
if result.returncode != 0:
raise HTTPException(500, result.stderr or result.stdout or "Échec génération")
return {"status": "ok", "id": unit_id}
@fastapi_app.post("/api/sprites/generate/buildings/{building_id}", dependencies=[Depends(require_debug)])
async def generate_one_building_sprite(building_id: str):
"""Régénère une seule image de bâtiment."""
building_id = building_id.strip().lower().replace(" ", "_")
valid = [b.value for b in __import__("game.buildings", fromlist=["BuildingType"]).BuildingType]
if building_id not in valid:
raise HTTPException(400, f"Building invalide. Valides: {valid}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--building", building_id]))
if result.returncode != 0:
raise HTTPException(500, result.stderr or result.stdout or "Échec génération")
return {"status": "ok", "id": building_id}
@fastapi_app.get("/api/sprites/resources")
async def list_resource_sprites():
"""Liste les sprites de ressources présents."""
return {"sprites": _list_sprites(_SPRITES_DIR / "resources")}
@fastapi_app.post("/api/sprites/generate/resources/{resource_id}", dependencies=[Depends(require_debug)])
async def generate_one_resource_sprite(resource_id: str):
"""Régénère une seule icône de ressource (mineral, geyser)."""
resource_id = resource_id.strip().lower()
valid = ["mineral", "geyser"]
if resource_id not in valid:
raise HTTPException(400, f"Resource invalide. Valides: {valid}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--resource", resource_id]))
if result.returncode != 0:
raise HTTPException(500, result.stderr or result.stdout or "Échec génération")
return {"status": "ok", "id": resource_id}
_ICONS_DIR = _SPRITES_DIR / "icons"
@fastapi_app.get("/api/icons")
async def list_icons():
"""Liste les icônes UI présentes."""
return {"icons": _list_sprites(_ICONS_DIR)}
@fastapi_app.post("/api/sprites/rotate/{kind}/{sprite_id}", dependencies=[Depends(require_debug)])
async def rotate_sprite(kind: str, sprite_id: str):
"""Fait pivoter un sprite de 90° dans le sens antihoraire (rotate CCW = tourne l'arme vers le haut)."""
valid_kinds = {"units": _SPRITES_DIR / "units", "buildings": _SPRITES_DIR / "buildings", "resources": _SPRITES_DIR / "resources"}
if kind not in valid_kinds:
raise HTTPException(400, f"Kind invalide. Valides: {list(valid_kinds.keys())}")
sprite_dir = valid_kinds[kind]
sprite_id = sprite_id.strip().lower()
path = sprite_dir / f"{sprite_id}.png"
if not path.is_file():
raise HTTPException(404, f"Sprite non trouvé: {path.name}")
try:
from PIL import Image
img = Image.open(path)
rotated = img.rotate(90, expand=True)
rotated.save(path)
except Exception as e:
raise HTTPException(500, str(e))
return {"status": "ok", "id": sprite_id, "kind": kind}
@fastapi_app.post("/api/icons/rotate/{icon_id}", dependencies=[Depends(require_debug)])
async def rotate_icon(icon_id: str):
"""Fait pivoter une icône UI de 90° dans le sens antihoraire."""
icon_id = icon_id.strip().lower()
path = _ICONS_DIR / f"{icon_id}.png"
if not path.is_file():
raise HTTPException(404, f"Icône non trouvée: {path.name}")
try:
from PIL import Image
img = Image.open(path)
rotated = img.rotate(90, expand=True)
rotated.save(path)
except Exception as e:
raise HTTPException(500, str(e))
return {"status": "ok", "id": icon_id}
@fastapi_app.post("/api/icons/generate/{icon_id}", dependencies=[Depends(require_debug)])
async def generate_one_icon(icon_id: str):
"""Génère une icône UI symbolique (mineral, gas, supply)."""
icon_id = icon_id.strip().lower()
valid = ["mineral", "gas", "supply"]
if icon_id not in valid:
raise HTTPException(400, f"Icône invalide. Valides: {valid}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _run_sprite_script(["--icon", icon_id]))
if result.returncode != 0:
raise HTTPException(500, result.stderr or result.stdout or "Échec génération")
return {"status": "ok", "id": icon_id}
# Serve SvelteKit static build if present (production / HF Spaces)
_FRONTEND_BUILD = Path(__file__).parent.parent / "frontend" / "build"
if _FRONTEND_BUILD.exists():
fastapi_app.mount(
"/",
StaticFiles(directory=str(_FRONTEND_BUILD), html=True),
name="frontend",
)
# ASGI app: Socket.IO wraps FastAPI
app = socketio.ASGIApp(sio, fastapi_app)
# ---------------------------------------------------------------------------
# Global state
# ---------------------------------------------------------------------------
lobby = LobbyManager()
engines: dict[str, GameEngine] = {} # room_id → GameEngine
_bot_timers: dict[str, asyncio.Task] = {} # sid → pending bot-offer timer
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _player_name_in_room(sid: str, room_id: str) -> str:
room = lobby.get_room(room_id)
if not room:
return sid
player = room.get_player(sid)
return player.name if player else sid
async def _emit_error(sid: str, message: str) -> None:
await sio.emit("error", {"message": message}, to=sid)
def _build_resource_zones(engine: Any, player_id: str) -> list[str]:
"""Return sorted resource zone names (mineral_1…N, geyser_1…M) by proximity to player base."""
from game.map import ResourceType as _RT
player = engine.state.players.get(player_id)
cc = player.command_center() if player else None
base_x = float(cc.x) + 2 if cc else 40.0
base_y = float(cc.y) + 2 if cc else 40.0
resources = engine.state.game_map.resources
minerals = sorted(
[r for r in resources if r.resource_type == _RT.MINERAL and not r.is_depleted],
key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2,
)
geysers = sorted(
[r for r in resources if r.resource_type == _RT.GEYSER],
key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2,
)
zones = [f"mineral_{i + 1}" for i in range(len(minerals))]
zones += [f"geyser_{i + 1}" for i in range(len(geysers))]
return zones
async def _start_game(room_id: str) -> None:
"""Create game state + engine, emit game_start to human players."""
room = lobby.get_room(room_id)
if not room or len(room.players) != 2:
return
if len(engines) >= MAX_SIMULTANEOUS_GAMES:
log.warning("Limite de %d parties simultanées atteinte, impossible de démarrer la room %s", MAX_SIMULTANEOUS_GAMES, room_id)
for p in room.players:
if p.sid != BOT_PLAYER_ID:
await _emit_error(p.sid, "Le serveur est plein (limite de parties simultanées atteinte). Réessaie dans quelques instants.")
return
p1, p2 = room.players[0], room.players[1]
game_state = GameState.create_new(
room_id=room_id,
player1_id=p1.sid,
player1_name=p1.name,
player2_id=p2.sid,
player2_name=p2.name,
)
engine = GameEngine(game_state, sio)
engines[room_id] = engine
room.engine = engine
# Wire up bot if one of the players is the bot
for p in (p1, p2):
if p.sid == BOT_PLAYER_ID:
engine.bot = BotPlayer(engine, BOT_PLAYER_ID)
log.info("Bot wired for room %s", room_id)
payload = game_state.model_dump(mode="json")
# Send game_start only to human players
for p in (p1, p2):
if p.sid != BOT_PLAYER_ID:
await sio.emit("game_start", {"game_state": payload, "your_id": p.sid}, to=p.sid)
import time as _time
room.started_at = _time.time()
engine.start()
log.info("Game started in room %s: %s vs %s | parties actives: %d/%d", room_id, p1.name, p2.name, len(engines), MAX_SIMULTANEOUS_GAMES)
def _cancel_bot_timer(sid: str) -> None:
task = _bot_timers.pop(sid, None)
if task and not task.done():
task.cancel()
async def _schedule_bot_offer(sid: str, room_id: str) -> None:
"""Wait BOT_OFFER_DELAY seconds then offer bot to the waiting player."""
await asyncio.sleep(BOT_OFFER_DELAY)
_bot_timers.pop(sid, None)
# Still in the same room and still waiting for opponent?
room = lobby.get_room(room_id)
if room and room.status == "waiting" and len(room.players) == 1:
await sio.emit("bot_offer", {}, to=sid)
log.info("Bot offer sent to %s (room %s)", sid, room_id)
async def _schedule_bot_offer_queue(sid: str) -> None:
"""Wait BOT_OFFER_DELAY seconds then offer bot to a queued player."""
await asyncio.sleep(BOT_OFFER_DELAY)
_bot_timers.pop(sid, None)
# Still in queue?
if lobby._sid_to_room.get(sid) == "__queue__":
await sio.emit("bot_offer", {}, to=sid)
log.info("Bot offer sent to queued player %s", sid)
# ---------------------------------------------------------------------------
# Connection lifecycle
# ---------------------------------------------------------------------------
@sio.event
async def connect(sid: str, environ: dict, auth: Optional[dict] = None) -> None:
log.info("Client connected: %s", sid)
@sio.event
async def disconnect(sid: str) -> None:
log.info("Client disconnected: %s", sid)
_cancel_bot_timer(sid)
room = lobby.disconnect(sid)
if room:
if room.status == "finished" and room.room_id in engines:
await engines[room.room_id].stop()
del engines[room.room_id]
log.info("Partie terminée (déconnexion) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES)
# Notify remaining human players
for player in room.players:
if player.sid == BOT_PLAYER_ID:
continue
await sio.emit(
"room_update",
{"room": room.to_dict(), "message": "Un joueur a quitté la partie."},
to=player.sid,
)
if room.status == "finished":
_record_game_over(room, player.sid, player.name)
await sio.emit(
"game_over",
{"winner_id": player.sid, "winner_name": player.name,
"reason": "opponent_disconnected"},
to=player.sid,
)
# ---------------------------------------------------------------------------
# Lobby events
# ---------------------------------------------------------------------------
@sio.event
async def get_playing_count(sid: str, data: dict) -> None:
count = lobby.get_playing_count()
await sio.emit("playing_count", {"count": count}, to=sid)
@sio.event
async def get_playing_rooms(sid: str, data: dict) -> None:
rooms = lobby.get_playing_rooms()
await sio.emit("playing_rooms", {"rooms": rooms}, to=sid)
@sio.event
async def observe(sid: str, data: dict) -> None:
room_id = data.get("room_id") or lobby.get_a_playing_room_id()
if not room_id:
await _emit_error(sid, "Aucune partie en cours à observer.")
return
room = lobby.get_room(room_id)
if not room or room.status != "playing":
await _emit_error(sid, "Cette partie n'est plus disponible.")
return
await sio.enter_room(sid, room_id)
engine = engines.get(room_id)
payload = {"room_id": room_id}
if engine:
payload["game_state"] = engine.state.model_dump(mode="json")
await sio.emit("observe_room", payload, to=sid)
@sio.event
async def create_room(sid: str, data: dict) -> None:
name = str(data.get("name", "Player")).strip() or "Player"
name = await sanitize_player_name(name)
_cancel_bot_timer(sid)
await _cleanup_old_game(sid)
room = lobby.create_room(sid, name)
await sio.enter_room(sid, room.room_id)
await sio.emit("room_created", {"room_id": room.room_id, "room": room.to_dict()}, to=sid)
log.info("Room %s created by %s", room.room_id, name)
# Start bot-offer countdown
_bot_timers[sid] = asyncio.create_task(_schedule_bot_offer(sid, room.room_id))
@sio.event
async def join_room(sid: str, data: dict) -> None:
room_id = str(data.get("room_id", "")).upper().strip()
name = str(data.get("name", "Player")).strip() or "Player"
name = await sanitize_player_name(name)
room, err = lobby.join_room(sid, room_id, name)
if err:
await _emit_error(sid, err)
return
# Cancel the host's bot-offer timer — a real opponent arrived
host_sid = next((p.sid for p in room.players if p.sid != sid), None)
if host_sid:
_cancel_bot_timer(host_sid)
await sio.enter_room(sid, room_id)
await sio.emit("room_joined", {"room_id": room_id, "room": room.to_dict()}, to=sid)
# Notify host
await sio.emit("room_update", {"room": room.to_dict()}, room=room_id)
@sio.event
async def quick_match(sid: str, data: dict) -> None:
name = str(data.get("name", "Player")).strip() or "Player"
name = await sanitize_player_name(name)
_cancel_bot_timer(sid)
await _cleanup_old_game(sid)
room, is_new = lobby.quick_match(sid, name)
if not is_new:
await sio.emit("match_queued", {"message": "En attente d'un adversaire…"}, to=sid)
# Start bot-offer countdown for queued player
_cancel_bot_timer(sid)
_bot_timers[sid] = asyncio.create_task(_schedule_bot_offer_queue(sid))
return
# Match found — cancel timers for both players
for player in room.players:
_cancel_bot_timer(player.sid)
# Notify both players
await sio.enter_room(room.players[0].sid, room.room_id)
await sio.enter_room(room.players[1].sid, room.room_id)
for player in room.players:
await sio.emit(
"match_found",
{"room_id": room.room_id, "room": room.to_dict()},
to=player.sid,
)
# Auto-ready both players in quick match and start the game
for player in room.players:
player.ready = True
room.status = "playing"
await _start_game(room.room_id)
@sio.event
async def accept_bot(sid: str, data: dict) -> None:
"""Player accepts the bot offer — add bot to their room and start the game."""
_cancel_bot_timer(sid)
# Find or create room for this player
room = lobby.get_room_for_sid(sid)
if not room:
# Player was in queue — create a room for them first
# Find their name from lobby queue state
queued = next((name for s, name in lobby._queue if s == sid), None)
if queued is None:
await _emit_error(sid, "Session expirée, relance une partie.")
return
# Remove from queue and create a private room
lobby._queue = [(s, n) for s, n in lobby._queue if s != sid]
lobby._sid_to_room.pop(sid, None)
room = lobby.create_room(sid, queued)
await sio.enter_room(sid, room.room_id)
room_obj, err = lobby.add_bot(room.room_id)
if err:
await _emit_error(sid, err)
return
await sio.emit("room_update", {"room": room_obj.to_dict()}, to=sid)
# Auto-ready human and start
human = room_obj.get_player(sid)
if human:
human.ready = True
room_obj.status = "playing"
await _start_game(room_obj.room_id)
@sio.event
async def cancel_construction(sid: str, data: dict) -> None:
"""Cancel an in-progress building construction and refund 75% of resources."""
room = lobby.get_room_for_sid(sid)
if not room:
await _emit_error(sid, "Tu n'es dans aucune room.")
return
engine = engines.get(room.room_id)
if not engine:
await _emit_error(sid, "La partie n'a pas encore commencé.")
return
building_id: str = data.get("building_id", "")
if not building_id:
await _emit_error(sid, "ID de bâtiment manquant.")
return
result = engine.cancel_building_construction(sid, building_id)
if not result.success:
await _emit_error(sid, f"Impossible d'annuler: {result.data.get('error', '?')}")
async def _cleanup_old_game(sid: str) -> None:
"""Leave socket room and stop engine for any game the player is currently in."""
old_room = lobby.get_room_for_sid(sid)
if old_room:
old_room_id = old_room.room_id
await sio.leave_room(sid, old_room_id)
if old_room_id in engines:
await engines[old_room_id].stop()
del engines[old_room_id]
log.info("Partie précédente stoppée room %s | parties actives: %d/%d", old_room_id, len(engines), MAX_SIMULTANEOUS_GAMES)
@sio.event
async def start_tutorial(sid: str, data: dict) -> None:
"""Start a solo tutorial game for the player."""
name = str(data.get("name", "Player")).strip() or "Player"
name = await sanitize_player_name(name)
# Cancel any pending timers
_cancel_bot_timer(sid)
# Leave & stop any previous game/tutorial so stale game_update events
# don't bleed into the new tutorial session.
await _cleanup_old_game(sid)
if len(engines) >= MAX_SIMULTANEOUS_GAMES:
log.warning("Limite de %d parties simultanées atteinte, impossible de démarrer le tutoriel pour %s", MAX_SIMULTANEOUS_GAMES, sid)
await _emit_error(sid, "Le serveur est plein (limite de parties simultanées atteinte). Réessaie dans quelques instants.")
return
# Create a private room
room = lobby.create_room(sid, name)
await sio.enter_room(sid, room.room_id)
room.status = "playing"
game_state = GameState.create_tutorial(
room_id=room.room_id,
player_id=sid,
player_name=name,
)
engine = GameEngine(game_state, sio)
engines[room.room_id] = engine
room.engine = engine
payload = game_state.model_dump(mode="json")
await sio.emit("game_start", {"game_state": payload, "your_id": sid}, to=sid)
import time as _time
room.started_at = _time.time()
engine.start()
log.info("Tutorial started for %s (%s) in room %s | parties actives: %d/%d", name, sid, room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES)
@sio.event
async def tutorial_complete(sid: str, data: dict) -> None:
"""Frontend signals that all tutorial objectives are done. Record the stat."""
import time as _time
room = lobby.get_room_for_sid(sid)
if not room:
return
duration = int(_time.time() - room.started_at) if room.started_at else None
engine = engines.get(room.room_id)
player_name = "Player"
if engine:
pstate = engine.state.players.get(sid)
if pstate:
player_name = pstate.player_name
try:
_stats.record(player_name, "tutorial_complete", duration)
log.info("Tutorial complete: %s in %ss", player_name, duration)
except Exception:
log.exception("Failed to record tutorial stat")
@sio.event
async def player_ready(sid: str, data: dict) -> None:
room, all_ready = lobby.set_ready(sid)
if not room:
await _emit_error(sid, "Tu n'es dans aucune room.")
return
await sio.emit("room_update", {"room": room.to_dict()}, room=room.room_id)
if all_ready:
await _start_game(room.room_id)
# ---------------------------------------------------------------------------
# Voice pipeline
# ---------------------------------------------------------------------------
@sio.event
async def voice_input(sid: str, data: dict) -> None:
"""
Receives base64-encoded audio from the client.
Pipeline: STT → Mistral → game commands → TTS → response
"""
room = lobby.get_room_for_sid(sid)
if not room:
await _emit_error(sid, "Tu n'es dans aucune room.")
return
engine = engines.get(room.room_id)
if not engine:
await _emit_error(sid, "La partie n'a pas encore commencé.")
return
player = engine.state.players.get(sid)
if not player:
await _emit_error(sid, "Joueur introuvable dans la partie.")
return
audio_b64: str = data.get("audio_b64", "")
mime_type: str = data.get("mime_type", "audio/webm")
if not audio_b64:
await _emit_error(sid, "Aucune donnée audio reçue.")
return
try:
audio_bytes = base64.b64decode(audio_b64)
except Exception:
await _emit_error(sid, "Données audio invalides (base64 incorrect).")
return
# 1. Speech-to-text
try:
transcription = await stt.transcribe(audio_bytes, mime_type)
except Exception as exc:
log.exception("STT failed")
await _emit_error(sid, f"Erreur de reconnaissance vocale: {exc}")
return
if not transcription:
await sio.emit("voice_result", {
"transcription": "",
"feedback_text": "Je n'ai rien entendu. Appuie et parle!",
"feedback_level": "warning",
"results": [],
}, to=sid)
return
# 2. Parse command with Mistral
resource_zones = _build_resource_zones(engine, sid)
try:
parsed = await command_parser.parse(transcription, player, resource_zones=resource_zones, landmarks=MAP_LANDMARKS)
except Exception as exc:
log.exception("Command parsing failed")
await _emit_error(sid, f"Erreur d'interprétation: {exc}")
return
# 3a. Intercept resign — ask for confirmation before executing
if any(a.type == ActionType.RESIGN for a in parsed.actions):
await sio.emit("resign_confirm_request", {"transcription": transcription}, to=sid)
return
# 3. Apply commands to game engine
cmd_result = engine.apply_command(sid, parsed)
if cmd_result.feedback_override:
feedback_text = await command_parser.generate_feedback(
cmd_result.feedback_override, parsed.language
)
else:
merged: dict[str, Any] = {}
for r in cmd_result.results:
merged.update(r.data or {})
if "error" in merged:
feedback_text = await command_parser.generate_feedback(
merged["error"], parsed.language
)
else:
feedback_text = _fill_template(parsed.feedback_template, merged)
sound_events = []
for r in cmd_result.results:
sound_events.extend(getattr(r, "sound_events", []) or [])
await sio.emit("voice_result", {
"transcription": transcription,
"feedback_text": feedback_text,
"feedback_level": _compute_feedback_level(cmd_result),
"results": [r.model_dump() for r in cmd_result.results],
"sound_events": sound_events,
}, to=sid)
# Check win condition after command
if engine.state.winner:
winner_state = engine.state.players.get(engine.state.winner)
winner_name = winner_state.player_name if winner_state else engine.state.winner
_record_game_over(room, engine.state.winner, winner_name)
await sio.emit("game_over", {
"winner_id": engine.state.winner,
"winner_name": winner_name,
}, room=room.room_id)
await engines[room.room_id].stop()
del engines[room.room_id]
lobby.finish_room(room.room_id)
log.info("Partie terminée (victoire voice) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES)
async def _process_text_command(sid: str, transcription: str) -> None:
"""Shared pipeline for text-based commands (skip STT)."""
room = lobby.get_room_for_sid(sid)
if not room:
await _emit_error(sid, "Tu n'es dans aucune room.")
return
engine = engines.get(room.room_id)
if not engine:
await _emit_error(sid, "La partie n'a pas encore commencé.")
return
player = engine.state.players.get(sid)
if not player:
await _emit_error(sid, "Joueur introuvable dans la partie.")
return
# Parse command with Mistral
resource_zones = _build_resource_zones(engine, sid)
try:
parsed = await command_parser.parse(transcription, player, resource_zones=resource_zones, landmarks=MAP_LANDMARKS)
except Exception as exc:
log.exception("Command parsing failed")
await _emit_error(sid, f"Erreur d'interprétation: {exc}")
return
# Intercept resign — ask for confirmation before executing
if any(a.type == ActionType.RESIGN for a in parsed.actions):
await sio.emit("resign_confirm_request", {"transcription": transcription}, to=sid)
return
# Apply commands to game engine
cmd_result = engine.apply_command(sid, parsed)
if cmd_result.feedback_override:
feedback_text = await command_parser.generate_feedback(
cmd_result.feedback_override, parsed.language
)
else:
merged = {}
for r in cmd_result.results:
merged.update(r.data or {})
if "error" in merged:
feedback_text = await command_parser.generate_feedback(
merged["error"], parsed.language
)
else:
feedback_text = _fill_template(parsed.feedback_template, merged)
sound_events = []
for r in cmd_result.results:
sound_events.extend(getattr(r, "sound_events", []) or [])
await sio.emit("voice_result", {
"transcription": transcription,
"feedback_text": feedback_text,
"feedback_level": _compute_feedback_level(cmd_result),
"results": [r.model_dump() for r in cmd_result.results],
"sound_events": sound_events,
}, to=sid)
if engine.state.winner:
winner_state = engine.state.players.get(engine.state.winner)
winner_name = winner_state.player_name if winner_state else engine.state.winner
_record_game_over(room, engine.state.winner, winner_name)
await sio.emit("game_over", {
"winner_id": engine.state.winner,
"winner_name": winner_name,
}, room=room.room_id)
await engines[room.room_id].stop()
del engines[room.room_id]
lobby.finish_room(room.room_id)
log.info("Partie terminée (victoire text) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES)
@sio.event
async def text_input(sid: str, data: dict) -> None:
"""
Receives a plain-text command from the client.
Pipeline: Mistral → game commands → response (no STT)
"""
text: str = str(data.get("text", "")).strip()
if not text:
await _emit_error(sid, "Aucun texte reçu.")
return
await _process_text_command(sid, text)
@sio.event
async def resign_confirm(sid: str, data: dict) -> None:
"""Player confirmed they want to resign."""
room = lobby.get_room_for_sid(sid)
if not room:
return
engine = engines.get(room.room_id)
if not engine:
return
player = engine.state.players.get(sid)
if not player:
return
result = engine._cmd_resign(player)
if not result.success:
await _emit_error(sid, result.data.get("error", "Erreur lors de l'abandon."))
return
await sio.emit("voice_result", {
"transcription": "",
"feedback_text": "Vous avez abandonné la partie.",
"feedback_level": "ok",
"results": [result.model_dump()],
"sound_events": [],
}, to=sid)
if engine.state.winner:
winner_state = engine.state.players.get(engine.state.winner)
winner_name = winner_state.player_name if winner_state else engine.state.winner
_record_game_over(room, engine.state.winner, winner_name)
await sio.emit("game_over", {
"winner_id": engine.state.winner,
"winner_name": winner_name,
}, room=room.room_id)
await engines[room.room_id].stop()
del engines[room.room_id]
lobby.finish_room(room.room_id)
log.info("Partie terminée (resign) room %s | parties actives: %d/%d", room.room_id, len(engines), MAX_SIMULTANEOUS_GAMES)