""" Semantic guessing game - backend server. Loads pre-computed word embeddings and serves similarity scores. Usage: pip install fastapi uvicorn numpy python generate_embeddings.py # create mock embeddings first uvicorn server:app --reload --port 8000 """ import hashlib import base64 import json import math import random import re import secrets import time from pathlib import Path import numpy as np from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel # --------------------------------------------------------------------------- # Load embeddings # --------------------------------------------------------------------------- EMBEDDINGS_PATH = Path(__file__).parent / "embeddings.npz" if not EMBEDDINGS_PATH.exists(): raise FileNotFoundError( f"Embeddings not found at {EMBEDDINGS_PATH}. " f"Run: python generate_embeddings.py" ) print(f"Loading embeddings from {EMBEDDINGS_PATH}...") _data = np.load(EMBEDDINGS_PATH, allow_pickle=True) WORDS: list[str] = _data["words"].tolist() VECTORS: np.ndarray = _data["vectors"].astype(np.float32) # (vocab_size, dim) WORD_TO_IDX: dict[str, int] = {w: i for i, w in enumerate(WORDS)} print(f"Loaded {len(WORDS)} words, {VECTORS.shape[1]}-dim embeddings") # Load curated secret word pool SECRET_WORDS_PATH = Path(__file__).parent / "secret_words.txt" if SECRET_WORDS_PATH.exists(): _secret_raw = [w.strip().lower() for w in SECRET_WORDS_PATH.read_text(encoding="utf-8").splitlines() if w.strip()] # Only keep secret words that are actually in our embedding vocabulary SECRET_CANDIDATES = [w for w in _secret_raw if w in WORD_TO_IDX] print(f"Loaded {len(SECRET_CANDIDATES)} secret word candidates from {SECRET_WORDS_PATH}") else: # Fallback: filter from embedding vocabulary SECRET_CANDIDATES = [w for w in WORDS if 3 <= len(w) <= 16 and w.isalpha()] print(f"No secret_words.txt found, using {len(SECRET_CANDIDATES)} filtered candidates from vocabulary") # Load secret word metadata (POS, hypernyms, difficulty) SECRET_META_PATH = Path(__file__).parent / "secret_words_meta.json" SECRET_META: dict = {} if SECRET_META_PATH.exists(): SECRET_META = json.loads(SECRET_META_PATH.read_text(encoding="utf-8")) print(f"Loaded metadata for {len(SECRET_META)} secret words from {SECRET_META_PATH}") else: print(f"No secret_words_meta.json found, hint metadata unavailable") PHASE3_MIN_COVERAGE = {"vad": 0.25, "sensorimotor": 0.2, "glasgow": 0.15} if SECRET_META: PHASE3_COVERAGE = { k: (sum(1 for _e in SECRET_META.values() if _e.get(k)) / len(SECRET_META)) for k in PHASE3_MIN_COVERAGE } else: PHASE3_COVERAGE = {k: 0.0 for k in PHASE3_MIN_COVERAGE} PHASE3_HINT_ENABLED = { k: PHASE3_COVERAGE.get(k, 0.0) >= threshold for k, threshold in PHASE3_MIN_COVERAGE.items() } print( "Phase 3 hint coverage: " + ", ".join( f"{k}={PHASE3_COVERAGE[k]*100:.1f}% ({'enabled' if PHASE3_HINT_ENABLED[k] else 'disabled'})" for k in ["vad", "sensorimotor", "glasgow"] ) ) # Pre-group candidates by difficulty for filtered new-game CANDIDATES_BY_DIFFICULTY: dict[str, list[str]] = {} for _w in SECRET_CANDIDATES: _diff = SECRET_META.get(_w, {}).get("difficulty", "medium") CANDIDATES_BY_DIFFICULTY.setdefault(_diff, []).append(_w) CANDIDATE_SET_BY_DIFFICULTY: dict[str, set[str]] = { diff: set(words) for diff, words in CANDIDATES_BY_DIFFICULTY.items() } SECRET_WORD_SET = set(SECRET_CANDIDATES) SECRET_WORD_SEEDS_PATH = Path(__file__).parent / "secret_word_seeds.json" DEFAULT_CANONICAL_SEED_PREFIX = "" DEFAULT_CANONICAL_SEED_NAMESPACE = "semantick:secret-seed:v2" DEFAULT_CANONICAL_SEED_TOKEN_LEN = 6 def _seed_for_word_with_nonce(word: str, seed_prefix: str, nonce: int) -> str: payload = f"{DEFAULT_CANONICAL_SEED_NAMESPACE}|{word}|{nonce}".encode("utf-8") digest = hashlib.blake2b(payload, digest_size=16).digest() token = base64.b32encode(digest).decode("ascii").rstrip("=").lower() return f"{seed_prefix}{token[:DEFAULT_CANONICAL_SEED_TOKEN_LEN]}" def _build_default_secret_seed_map(words: list[str], seed_prefix: str) -> dict[str, str]: unique_words = sorted(set(words)) result: dict[str, str] = {} used: set[str] = set() for word in unique_words: nonce = 0 while True: seed = _seed_for_word_with_nonce(word, seed_prefix, nonce) if len(word) >= 4 and word in seed: nonce += 1 continue if seed not in used: used.add(seed) result[word] = seed break nonce += 1 return result def _load_secret_seed_mapping() -> tuple[str, dict[str, str]]: """ Load precomputed canonical seed mapping if available and valid. Falls back to deterministic opaque hash mapping. """ fallback = _build_default_secret_seed_map(SECRET_CANDIDATES, DEFAULT_CANONICAL_SEED_PREFIX) if not SECRET_WORD_SEEDS_PATH.exists(): print("No secret_word_seeds.json found, using default canonical mapping") return DEFAULT_CANONICAL_SEED_PREFIX, fallback try: payload = json.loads(SECRET_WORD_SEEDS_PATH.read_text(encoding="utf-8")) prefix = str(payload.get("seed_prefix") or DEFAULT_CANONICAL_SEED_PREFIX) raw_map = payload.get("all") or {} loaded: dict[str, str] = {} if isinstance(raw_map, dict): for raw_word, raw_seed in raw_map.items(): word = str(raw_word).strip().lower() seed = str(raw_seed).strip() if word in SECRET_WORD_SET and seed: loaded[word] = seed # Ensure complete coverage for current secret candidate set. complete = {word: loaded.get(word, fallback[word]) for word in sorted(SECRET_WORD_SET)} # Enforce 1:1 seed uniqueness; fallback if duplicates are present. if len(set(complete.values())) != len(complete): print("secret_word_seeds.json has duplicate seeds; using default canonical mapping") return DEFAULT_CANONICAL_SEED_PREFIX, fallback print(f"Loaded canonical secret seeds for {len(complete)} words from {SECRET_WORD_SEEDS_PATH}") return prefix, complete except Exception as exc: print(f"Failed to parse {SECRET_WORD_SEEDS_PATH}: {exc}; using default canonical mapping") return DEFAULT_CANONICAL_SEED_PREFIX, fallback CANONICAL_SEED_PREFIX, SECRET_WORD_TO_CANONICAL_SEED = _load_secret_seed_mapping() CANONICAL_SEED_TO_WORD = { seed: word for word, seed in SECRET_WORD_TO_CANONICAL_SEED.items() } # --------------------------------------------------------------------------- # Game state (in-memory, per-session) # --------------------------------------------------------------------------- GAMES: dict[str, dict] = {} MAX_GAMES = 1000 GAME_TTL_SECONDS = 86400 # 24 hours HINT_RANKS = [1000, 100, 10, 9, 8, 7, 6, 5, 4, 3, 2] def cleanup_games(): """Remove expired games and enforce max games limit.""" now = time.time() expired = [gid for gid, g in GAMES.items() if now - g["created_at"] > GAME_TTL_SECONDS] for gid in expired: del GAMES[gid] # If still over limit, remove oldest if len(GAMES) > MAX_GAMES: by_age = sorted(GAMES.items(), key=lambda x: x[1]["created_at"]) for gid, _ in by_age[:len(GAMES) - MAX_GAMES]: del GAMES[gid] def _ensure_guess_counters(game: dict) -> None: """Backfill guess counters for older in-memory game entries.""" if "guess_count" in game: return guess_count = 0 for g in game.get("guesses", []): if not g.get("isHint"): guess_count += 1 g.setdefault("guess_num", guess_count) game["guess_count"] = guess_count def _get_game_or_404(game_id: str, *, require_active: bool = False) -> dict: cleanup_games() game = GAMES.get(game_id) if not game: raise HTTPException(404, "Game not found. It may have expired — start a new game.") _ensure_guess_counters(game) if require_active and (game.get("solved") or game.get("gave_up")): raise HTTPException(400, "Game is over. Start a new game.") return game def rank_to_score(rank: int, vocab_size: int) -> float: """ Map rank to a 0-1 score using a curved logarithmic scale. Rank 1 = 1.0, rank N = 0.0. Power of 1.5 on the log ratio gives more room at the top and compresses the bottom: rank 10 ≈ 0.91, rank 100 ≈ 0.75, rank 1000 ≈ 0.54, rank 10000 ≈ 0.28 """ if rank <= 1: return 1.0 log_ratio = math.log(rank) / math.log(vocab_size) return round(max(0.0, 1 - log_ratio ** 1.5), 4) # --------------------------------------------------------------------------- # API # --------------------------------------------------------------------------- app = FastAPI(title="Semantick") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) class NewGameResponse(BaseModel): game_id: str vocab_size: int seed: str class SeedLookupResponse(BaseModel): word: str found: bool seed: str | None = None message: str difficulty: str | None = None class GameStateResponse(BaseModel): game_id: str vocab_size: int seed: str guesses: list[dict] hints_used: int total_hints: int = 0 solved: bool gave_up: bool secret_word: str | None = None # only if solved or gave up pos_revealed: bool = False pos: str | None = None # only if pos_revealed category_hints: list[str] = [] difficulty: str | None = None definition: str | None = None definition_done: bool = False concreteness_label: str | None = None vad_label: str | None = None sensorimotor_label: str | None = None glasgow_label: str | None = None conceptnet_hints: list[dict] = [] has_definition: bool = False has_concreteness: bool = False has_vad: bool = False has_sensorimotor: bool = False has_glasgow: bool = False has_conceptnet: bool = False has_categories: bool = False class GuessRequest(BaseModel): word: str class GuessResponse(BaseModel): word: str score: float rank: int | None = None # rank among all vocab words (1 = closest) guess_number: int # which guess this was (1-indexed) total_guesses: int solved: bool class HintResponse(BaseModel): word: str rank: int score: float hints_used: int already_guessed: bool = False class PosHintResponse(BaseModel): pos: str total_hints: int class CategoryHintResponse(BaseModel): category: str category_hints_used: int total_hints: int has_more: bool = True class DefinitionHintResponse(BaseModel): definition: str total_hints: int done: bool class ConcretenessHintResponse(BaseModel): concreteness: str total_hints: int class LabelHintResponse(BaseModel): label: str total_hints: int class ConceptNetHintResponse(BaseModel): relation: str values: list[str] total_hints: int has_more: bool = True class GiveUpResponse(BaseModel): secret_word: str total_guesses: int def _build_progressive_definition(definition: str, secret_word: str, words_revealed: int) -> tuple[str, int]: """ Build a progressively revealed definition. Returns (display_string, total_content_words). Words are revealed from the end toward the start (short function words first tends to be less spoilery). The secret word itself is always redacted as "___". """ # Redact the secret word first redacted = re.sub(re.escape(secret_word), "___", definition, flags=re.IGNORECASE) # Tokenize preserving whitespace/punctuation tokens = re.findall(r"[A-Za-z']+|[^A-Za-z']+", redacted) # Identify content word indices (alphabetic tokens that aren't the redaction placeholder) content_indices = [i for i, t in enumerate(tokens) if re.match(r"[A-Za-z']", t) and t != "___"] total_content = len(content_indices) if words_revealed <= 0: return redacted, total_content # nothing revealed yet = not started # Determine which word indices to reveal (reveal from end to start) reveal_set = set(content_indices[-words_revealed:]) if words_revealed < total_content else set(content_indices) # Build output parts = [] for i, token in enumerate(tokens): if i in set(content_indices) - reveal_set: parts.append("_" * len(token)) else: parts.append(token) return "".join(parts), total_content def _concreteness_label(rating: float) -> str: if rating >= 4.5: return "very concrete" elif rating >= 3.5: return "somewhat concrete" elif rating >= 2.5: return "somewhat abstract" else: return "very abstract" def _normalize_score(value: float, *, low: float, high: float) -> str: if value <= low: return "low" if value >= high: return "high" return "mid" def _score_to_1_9(value: float) -> float: if value <= 1.0: return 1.0 + (8.0 * value) if value <= 9.0: return value if value <= 100.0: return 1.0 + (8.0 * (value / 100.0)) return 9.0 def _score_to_1_7(value: float) -> float: if value <= 1.0: return 1.0 + (6.0 * value) if value <= 7.0: return value if value <= 100.0: return 1.0 + (6.0 * (value / 100.0)) return 7.0 def _vad_label(vad: dict) -> str: valence = _score_to_1_9(float(vad.get("valence", 5.0))) arousal = _score_to_1_9(float(vad.get("arousal", 5.0))) dominance = _score_to_1_9(float(vad.get("dominance", 5.0))) valence_map = {"low": "unpleasant", "mid": "neutral tone", "high": "pleasant"} arousal_map = {"low": "calm", "mid": "moderate arousal", "high": "intense"} dominance_map = {"low": "submissive", "mid": "balanced control", "high": "in control"} return ", ".join( [ valence_map[_normalize_score(valence, low=4.0, high=6.0)], arousal_map[_normalize_score(arousal, low=4.0, high=6.0)], dominance_map[_normalize_score(dominance, low=4.0, high=6.0)], ] ) def _sensorimotor_label(sensorimotor: dict) -> str: modality = str(sensorimotor.get("dominant_modality") or "").replace("_", " ").strip() if not modality: return "unclear sensory grounding" strength = float(sensorimotor.get("strength") or 0.0) if strength <= 2.5: strength_label = "light" elif strength <= 4.0: strength_label = "moderate" else: strength_label = "strong" secondary = str(sensorimotor.get("secondary_modality") or "").replace("_", " ").strip() if secondary and secondary != modality: return f"{strength_label} {modality} / {secondary} grounding" return f"{strength_label} {modality} grounding" def _glasgow_label(glasgow: dict) -> str: parts: list[str] = [] familiarity = glasgow.get("familiarity") imageability = glasgow.get("imageability") valence = glasgow.get("valence") if familiarity is not None: fam = _score_to_1_7(float(familiarity)) fam_label = _normalize_score(fam, low=3.0, high=5.0) parts.append({"low": "less familiar", "mid": "moderately familiar", "high": "very familiar"}[fam_label]) if imageability is not None: img = _score_to_1_7(float(imageability)) img_label = _normalize_score(img, low=3.0, high=5.0) parts.append({"low": "low imageability", "mid": "medium imageability", "high": "high imageability"}[img_label]) if not parts and valence is not None: val = _score_to_1_7(float(valence)) val_label = _normalize_score(val, low=3.0, high=5.0) parts.append({"low": "negative tone", "mid": "neutral tone", "high": "positive tone"}[val_label]) return ", ".join(parts) if parts else "compact psycholinguistic profile available" def create_game(secret_word: str, seed: str) -> dict: """Create a new game state for a given secret word.""" secret_idx = WORD_TO_IDX[secret_word] # Pre-compute similarity scores (one matrix-vector multiply) sims = VECTORS @ VECTORS[secret_idx] # Pre-compute hint words using argpartition — O(n) instead of O(n log n) full sort hint_positions = [r - 1 for r in HINT_RANKS] # 0-indexed positions partitioned = np.argpartition(-sims, hint_positions) hint_words = [WORDS[int(partitioned[pos])] for pos in hint_positions] meta = SECRET_META.get(secret_word, {}) return { "secret_word": secret_word, "secret_idx": secret_idx, "sims": sims, "hint_words": hint_words, "seed": seed, "guesses": [], "guess_count": 0, "hints_used": 0, "total_hints": 0, "pos_revealed": False, "category_hints_used": 0, "definition_words_revealed": 0, "concreteness_revealed": False, "vad_revealed": False, "sensorimotor_revealed": False, "glasgow_revealed": False, "conceptnet_hints_used": 0, "meta": meta, "difficulty": meta.get("difficulty"), "solved": False, "winner": None, "gave_up": False, "gave_up_by": None, "created_at": time.time(), } def canonical_seed_for_word(word: str) -> str | None: return SECRET_WORD_TO_CANONICAL_SEED.get(word.strip().lower()) def _word_from_canonical_seed(seed: str) -> str | None: raw = seed.strip() if not raw: return None # Fast path from explicit map (supports future non-prefix formats too). mapped = CANONICAL_SEED_TO_WORD.get(raw) if mapped: return mapped return None def _canonical_seed_fallback(word: str) -> str: normalized = word.strip().lower() if not normalized: return "" nonce = 0 while True: seed = _seed_for_word_with_nonce(normalized, DEFAULT_CANONICAL_SEED_PREFIX, nonce) if len(normalized) >= 4 and normalized in seed: nonce += 1 continue owner = CANONICAL_SEED_TO_WORD.get(seed) if owner is None or owner == normalized: return seed nonce += 1 def seed_to_word_filtered(seed: str, difficulty: str | None = None) -> str: """Deterministically pick a secret word, optionally filtered by difficulty.""" if difficulty: if difficulty not in CANDIDATES_BY_DIFFICULTY: raise HTTPException(422, f"Unknown difficulty '{difficulty}'") pool = CANDIDATES_BY_DIFFICULTY[difficulty] pool_set = CANDIDATE_SET_BY_DIFFICULTY[difficulty] else: pool = SECRET_CANDIDATES pool_set = SECRET_WORD_SET if not pool: raise HTTPException(500, "Secret word pool is empty") canonical_word = _word_from_canonical_seed(seed) if canonical_word and canonical_word in pool_set: return canonical_word h = int(hashlib.sha256(seed.encode()).hexdigest(), 16) return pool[h % len(pool)] @app.get("/api/seed-for-word", response_model=SeedLookupResponse) def seed_for_word(word: str = Query(...), difficulty: str | None = Query(None)): normalized = word.strip().lower() if not normalized: raise HTTPException(400, "Empty word") if difficulty: if difficulty not in CANDIDATES_BY_DIFFICULTY: raise HTTPException(422, f"Unknown difficulty '{difficulty}'") pool_set = CANDIDATE_SET_BY_DIFFICULTY[difficulty] else: pool_set = SECRET_WORD_SET if normalized not in pool_set: if difficulty and normalized in SECRET_WORD_SET: actual_difficulty = SECRET_META.get(normalized, {}).get("difficulty") if actual_difficulty: msg = f"word is in secret words list but not in '{difficulty}' difficulty (it's '{actual_difficulty}')" else: msg = f"word is in secret words list but not in '{difficulty}' difficulty" else: msg = "word not in secret words list" return SeedLookupResponse( word=normalized, found=False, seed=None, message=msg, difficulty=difficulty, ) seed_value = canonical_seed_for_word(normalized) or _canonical_seed_fallback(normalized) return SeedLookupResponse( word=normalized, found=True, seed=seed_value, message="ok", difficulty=difficulty, ) @app.post("/api/new-game", response_model=NewGameResponse) def new_game(seed: str | None = Query(None), difficulty: str | None = Query(None)): """Start a new game. Optionally provide a seed and/or difficulty filter.""" cleanup_games() if seed: secret_word = seed_to_word_filtered(seed, difficulty) else: seed = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=6)) secret_word = seed_to_word_filtered(seed, difficulty) game_id = secrets.token_urlsafe(12) GAMES[game_id] = create_game(secret_word, seed) return NewGameResponse(game_id=game_id, vocab_size=len(WORDS), seed=seed) @app.get("/api/game/{game_id}", response_model=GameStateResponse) def get_game_state(game_id: str): """Retrieve current game state (for resuming after refresh).""" game = _get_game_or_404(game_id) meta = game.get("meta", {}) hypernyms = meta.get("hypernyms", []) revealed_categories = hypernyms[:game.get("category_hints_used", 0)] # Build revealed conceptnet hints conceptnet_relations = meta.get("conceptnet", {}) cn_order = ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] cn_available = [r for r in cn_order if r in conceptnet_relations and conceptnet_relations[r]] cn_used = game.get("conceptnet_hints_used", 0) revealed_cn = [{"relation": cn_available[i], "values": conceptnet_relations[cn_available[i]]} for i in range(min(cn_used, len(cn_available)))] # Definition (progressive reveal) definition = None definition_done = False def_words_revealed = game.get("definition_words_revealed", 0) raw_defs = meta.get("definitions", []) if def_words_revealed > 0 and raw_defs: definition, total_content = _build_progressive_definition(raw_defs[0], game["secret_word"], def_words_revealed) definition_done = def_words_revealed >= total_content # Concreteness label concreteness_label = None if game.get("concreteness_revealed"): rating = meta.get("concreteness") if rating is not None: concreteness_label = _concreteness_label(rating) vad_label = _vad_label(meta.get("vad", {})) if game.get("vad_revealed") and meta.get("vad") else None sensorimotor_label = ( _sensorimotor_label(meta.get("sensorimotor", {})) if game.get("sensorimotor_revealed") and meta.get("sensorimotor") else None ) glasgow_label = _glasgow_label(meta.get("glasgow", {})) if game.get("glasgow_revealed") and meta.get("glasgow") else None # Availability flags conceptnet_relations = meta.get("conceptnet", {}) cn_order = ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] cn_available = [r for r in cn_order if r in conceptnet_relations and conceptnet_relations[r]] has_conceptnet = len(cn_available) > game.get("conceptnet_hints_used", 0) has_categories = len(hypernyms) > game.get("category_hints_used", 0) has_vad = PHASE3_HINT_ENABLED["vad"] and bool(meta.get("vad")) and not game.get("vad_revealed") has_sensorimotor = PHASE3_HINT_ENABLED["sensorimotor"] and bool(meta.get("sensorimotor")) and not game.get("sensorimotor_revealed") has_glasgow = PHASE3_HINT_ENABLED["glasgow"] and bool(meta.get("glasgow")) and not game.get("glasgow_revealed") return GameStateResponse( game_id=game_id, vocab_size=len(WORDS), seed=game["seed"], guesses=game["guesses"], hints_used=game["hints_used"], total_hints=game.get("total_hints", 0), solved=game["solved"], gave_up=game["gave_up"], secret_word=game["secret_word"] if game["solved"] or game["gave_up"] else None, pos_revealed=game.get("pos_revealed", False), pos=meta.get("pos") if game.get("pos_revealed") else None, category_hints=revealed_categories, difficulty=game.get("difficulty"), definition=definition, definition_done=definition_done, concreteness_label=concreteness_label, vad_label=vad_label, sensorimotor_label=sensorimotor_label, glasgow_label=glasgow_label, conceptnet_hints=revealed_cn, has_definition=bool(raw_defs), has_concreteness=meta.get("concreteness") is not None, has_vad=has_vad, has_sensorimotor=has_sensorimotor, has_glasgow=has_glasgow, has_conceptnet=has_conceptnet, has_categories=has_categories, ) @app.post("/api/game/{game_id}/guess", response_model=GuessResponse) def make_guess(game_id: str, req: GuessRequest): """Submit a guess and get the similarity score.""" game = _get_game_or_404(game_id, require_active=True) word = req.word.strip().lower() if not word: raise HTTPException(400, "Empty guess") # Check if word is in vocabulary guess_idx = WORD_TO_IDX.get(word) if guess_idx is None: raise HTTPException(422, "not in our dictionary — try a synonym") existing = next((g for g in game["guesses"] if not g.get("isHint") and g["word"] == word), None) if existing: return GuessResponse( word=word, score=existing["score"], rank=existing["rank"], guess_number=existing.get("guess_num", 0), total_guesses=game.get("guess_count", 0), solved=game.get("solved", False), ) # Compute rank on-the-fly from pre-computed similarity scores solved = word == game["secret_word"] sims = game["sims"] guess_sim = float(sims[guess_idx]) rank = int((sims > guess_sim).sum()) + 1 score = rank_to_score(rank, len(WORDS)) # Track guess game["guess_count"] = game.get("guess_count", 0) + 1 game["guesses"].append( { "word": word, "score": score, "rank": rank, "isHint": False, "guess_num": game["guess_count"], } ) if solved: game["solved"] = True game["winner"] = "solo" return GuessResponse( word=word, score=score, rank=rank, guess_number=game["guess_count"], total_guesses=game["guess_count"], solved=solved, ) @app.post("/api/game/{game_id}/hint", response_model=HintResponse) def get_hint(game_id: str): """Reveal a hint word at a predetermined rank.""" game = _get_game_or_404(game_id, require_active=True) hints_used = game["hints_used"] if hints_used >= len(HINT_RANKS): raise HTTPException(400, "No more hints available") rank = HINT_RANKS[hints_used] hint_word = game["hint_words"][hints_used] score = rank_to_score(rank, len(WORDS)) game["hints_used"] = hints_used + 1 game["total_hints"] = game.get("total_hints", 0) + 1 already_guessed = any(g["word"] == hint_word for g in game["guesses"]) if not already_guessed: game["guesses"].append({"word": hint_word, "score": score, "rank": rank, "isHint": True}) return HintResponse(word=hint_word, rank=rank, score=score, hints_used=game["hints_used"], already_guessed=already_guessed) @app.post("/api/game/{game_id}/hint/pos", response_model=PosHintResponse) def get_pos_hint(game_id: str): """Reveal the part of speech of the secret word.""" game = _get_game_or_404(game_id, require_active=True) if game.get("pos_revealed"): # Return it again idempotently return PosHintResponse(pos=game["meta"].get("pos", "unknown"), total_hints=game.get("total_hints", 0)) meta = game.get("meta", {}) pos = meta.get("pos", "unknown") game["pos_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 return PosHintResponse(pos=pos, total_hints=game["total_hints"]) @app.post("/api/game/{game_id}/hint/category", response_model=CategoryHintResponse) def get_category_hint(game_id: str): """Reveal the next category (hypernym) level for the secret word.""" game = _get_game_or_404(game_id, require_active=True) meta = game.get("meta", {}) hypernyms = meta.get("hypernyms", []) used = game.get("category_hints_used", 0) if used >= len(hypernyms): raise HTTPException(400, "No more category hints available") category = hypernyms[used] game["category_hints_used"] = used + 1 game["total_hints"] = game.get("total_hints", 0) + 1 return CategoryHintResponse(category=category, category_hints_used=game["category_hints_used"], total_hints=game["total_hints"], has_more=game["category_hints_used"] < len(hypernyms)) @app.post("/api/game/{game_id}/hint/definition", response_model=DefinitionHintResponse) def get_definition_hint(game_id: str): """Progressively reveal words in the definition. Each call reveals one more word.""" game = _get_game_or_404(game_id, require_active=True) meta = game.get("meta", {}) definitions = meta.get("definitions", []) if not definitions: raise HTTPException(400, "No definition available for this word") current = game.get("definition_words_revealed", 0) # Check if already fully revealed _, total_content = _build_progressive_definition(definitions[0], game["secret_word"], 0) if current >= total_content: raise HTTPException(400, "Definition fully revealed") game["definition_words_revealed"] = current + 1 game["total_hints"] = game.get("total_hints", 0) + 1 display, _ = _build_progressive_definition(definitions[0], game["secret_word"], current + 1) done = (current + 1) >= total_content return DefinitionHintResponse(definition=display, total_hints=game["total_hints"], done=done) @app.post("/api/game/{game_id}/hint/concreteness", response_model=ConcretenessHintResponse) def get_concreteness_hint(game_id: str): """Reveal how concrete or abstract the secret word is.""" game = _get_game_or_404(game_id, require_active=True) if game.get("concreteness_revealed"): raise HTTPException(400, "Concreteness already revealed") meta = game.get("meta", {}) rating = meta.get("concreteness") if rating is None: raise HTTPException(400, "No concreteness data available for this word") label = _concreteness_label(rating) game["concreteness_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 return ConcretenessHintResponse(concreteness=label, total_hints=game["total_hints"]) @app.post("/api/game/{game_id}/hint/vad", response_model=LabelHintResponse) def get_vad_hint(game_id: str): """Reveal affective profile (valence/arousal/dominance) when enabled.""" if not PHASE3_HINT_ENABLED["vad"]: raise HTTPException(400, "VAD hints disabled due low dataset coverage") game = _get_game_or_404(game_id, require_active=True) if game.get("vad_revealed"): raise HTTPException(400, "VAD hint already revealed") meta = game.get("meta", {}) vad = meta.get("vad") if not vad: raise HTTPException(400, "No VAD data available for this word") label = _vad_label(vad) game["vad_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 return LabelHintResponse(label=label, total_hints=game["total_hints"]) @app.post("/api/game/{game_id}/hint/sensorimotor", response_model=LabelHintResponse) def get_sensorimotor_hint(game_id: str): """Reveal dominant sensorimotor modality when enabled.""" if not PHASE3_HINT_ENABLED["sensorimotor"]: raise HTTPException(400, "Sensorimotor hints disabled due low dataset coverage") game = _get_game_or_404(game_id, require_active=True) if game.get("sensorimotor_revealed"): raise HTTPException(400, "Sensorimotor hint already revealed") meta = game.get("meta", {}) sensorimotor = meta.get("sensorimotor") if not sensorimotor: raise HTTPException(400, "No sensorimotor data available for this word") label = _sensorimotor_label(sensorimotor) game["sensorimotor_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 return LabelHintResponse(label=label, total_hints=game["total_hints"]) @app.post("/api/game/{game_id}/hint/glasgow", response_model=LabelHintResponse) def get_glasgow_hint(game_id: str): """Reveal compact psycholinguistic profile from Glasgow norms when enabled.""" if not PHASE3_HINT_ENABLED["glasgow"]: raise HTTPException(400, "Glasgow hints disabled due low dataset coverage") game = _get_game_or_404(game_id, require_active=True) if game.get("glasgow_revealed"): raise HTTPException(400, "Glasgow hint already revealed") meta = game.get("meta", {}) glasgow = meta.get("glasgow") if not glasgow: raise HTTPException(400, "No Glasgow data available for this word") label = _glasgow_label(glasgow) game["glasgow_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 return LabelHintResponse(label=label, total_hints=game["total_hints"]) @app.post("/api/game/{game_id}/hint/conceptnet", response_model=ConceptNetHintResponse) def get_conceptnet_hint(game_id: str): """Reveal the next ConceptNet relation for the secret word.""" game = _get_game_or_404(game_id, require_active=True) meta = game.get("meta", {}) conceptnet = meta.get("conceptnet", {}) cn_order = ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] available = [r for r in cn_order if r in conceptnet and conceptnet[r]] used = game.get("conceptnet_hints_used", 0) if used >= len(available): raise HTTPException(400, "No more semantic clues available") relation = available[used] values = conceptnet[relation] game["conceptnet_hints_used"] = used + 1 game["total_hints"] = game.get("total_hints", 0) + 1 return ConceptNetHintResponse(relation=relation, values=values, total_hints=game["total_hints"], has_more=game["conceptnet_hints_used"] < len(available)) @app.post("/api/game/{game_id}/give-up", response_model=GiveUpResponse) def give_up(game_id: str): """Reveal the secret word.""" game = _get_game_or_404(game_id) if not game.get("gave_up"): game["gave_up"] = True if not game.get("gave_up_by"): game["gave_up_by"] = "solo" return GiveUpResponse( secret_word=game["secret_word"], total_guesses=game.get("guess_count", 0), ) @app.get("/api/health") def health(): cleanup_games() cleanup_lobbies() return { "status": "ok", "vocab_size": len(WORDS), "active_games": len(GAMES), "phase3_hint_coverage": PHASE3_COVERAGE, "phase3_hint_enabled": PHASE3_HINT_ENABLED, } # --------------------------------------------------------------------------- # Multiplayer Lobby # --------------------------------------------------------------------------- LOBBIES: dict[str, dict] = {} MAX_LOBBIES = 200 LOBBY_TTL_SECONDS = 7200 # 2 hours def cleanup_lobbies(): now = time.time() expired = [lid for lid, lb in LOBBIES.items() if now - lb.get("updated_at", lb["created_at"]) > LOBBY_TTL_SECONDS] for lid in expired: del LOBBIES[lid] if len(LOBBIES) > MAX_LOBBIES: by_age = sorted(LOBBIES.items(), key=lambda x: x[1].get("updated_at", x[1]["created_at"])) for lid, _ in by_age[:len(LOBBIES) - MAX_LOBBIES]: del LOBBIES[lid] def generate_lobby_code() -> str: """Generate a 6-char uppercase alphanumeric lobby code.""" chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" # no I/O/0/1 for readability return "".join(random.choices(chars, k=6)) def _touch_lobby(lobby: dict) -> None: lobby["updated_at"] = time.time() def _lobby_member_items(lobby: dict) -> list[tuple[str, dict]]: return sorted(lobby["members"].items(), key=lambda kv: kv[1].get("joined_at", 0.0)) def _lobby_player_names(lobby: dict) -> list[str]: return [member["name"] for _, member in _lobby_member_items(lobby)] def _lobby_host_name(lobby: dict) -> str: host_id = lobby.get("host_player_id") if host_id and host_id in lobby["members"]: return lobby["members"][host_id]["name"] return "Host" def _remove_lobby_member(lobby: dict, player_id: str) -> str | None: member = lobby["members"].pop(player_id, None) lobby["connections"].pop(player_id, None) if not member: return None if lobby.get("host_player_id") == player_id: next_host = _lobby_member_items(lobby)[0][0] if lobby["members"] else None lobby["host_player_id"] = next_host _touch_lobby(lobby) return member["name"] async def broadcast(lobby: dict, message: dict, exclude: str | None = None): """Send a JSON message to all connected players in a lobby.""" dead: list[str] = [] for pid, ws in lobby["connections"].items(): if pid == exclude: continue try: await ws.send_json(message) except Exception: dead.append(pid) for pid in dead: lobby["connections"].pop(pid, None) member = lobby["members"].get(pid) if member: member["last_seen"] = time.time() class CreateLobbyRequest(BaseModel): host_name: str difficulty: str | None = None class CreateLobbyResponse(BaseModel): code: str host_name: str player_id: str class LobbyStateResponse(BaseModel): code: str host: str players: list[str] difficulty: str | None game_active: bool game_id: str | None @app.post("/api/lobby/create", response_model=CreateLobbyResponse) def create_lobby(req: CreateLobbyRequest): cleanup_lobbies() code = generate_lobby_code() while code in LOBBIES: code = generate_lobby_code() name = req.host_name.strip()[:20] or "Host" now = time.time() player_id = secrets.token_urlsafe(8) LOBBIES[code] = { "id": code, "host_player_id": player_id, "members": { player_id: { "name": name, "joined_at": now, "last_seen": now, } }, "connections": {}, "game_id": None, "difficulty": req.difficulty, "created_at": now, "updated_at": now, } return CreateLobbyResponse(code=code, host_name=name, player_id=player_id) @app.get("/api/lobby/{code}", response_model=LobbyStateResponse) def get_lobby(code: str): cleanup_lobbies() cleanup_games() code = code.upper() lobby = LOBBIES.get(code) if not lobby: raise HTTPException(404, "Lobby not found") if lobby.get("game_id") and lobby["game_id"] not in GAMES: lobby["game_id"] = None game = GAMES.get(lobby["game_id"]) if lobby.get("game_id") else None return LobbyStateResponse( code=code, host=_lobby_host_name(lobby), players=_lobby_player_names(lobby), difficulty=lobby["difficulty"], game_active=bool(game and not game.get("solved") and not game.get("gave_up")), game_id=lobby["game_id"], ) async def _send_lobby_game_snapshot(ws: WebSocket, lobby: dict, game_id: str, game: dict): _ensure_guess_counters(game) meta = game.get("meta", {}) hypernyms = meta.get("hypernyms", []) await ws.send_json({ "type": "game_started", "seed": game["seed"], "difficulty": lobby.get("difficulty"), "vocab_size": len(WORDS), "game_id": game_id, "has_definition": bool(meta.get("definitions")), "has_concreteness": meta.get("concreteness") is not None, "has_vad": PHASE3_HINT_ENABLED["vad"] and bool(meta.get("vad")), "has_sensorimotor": PHASE3_HINT_ENABLED["sensorimotor"] and bool(meta.get("sensorimotor")), "has_glasgow": PHASE3_HINT_ENABLED["glasgow"] and bool(meta.get("glasgow")), "has_conceptnet": bool([r for r in ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] if meta.get("conceptnet", {}).get(r)]), "has_categories": bool(meta.get("hypernyms")), }) for g in game["guesses"]: if g.get("isHint"): await ws.send_json({ "type": "hint_result", "player": g.get("player", "?"), "hint_type": "word", "word": g["word"], "score": g["score"], "rank": g["rank"], "hints_used": game["hints_used"], "total_hints": game.get("total_hints", 0), }) else: await ws.send_json({ "type": "guess_result", "player": g.get("player", "?"), "word": g["word"], "score": g["score"], "rank": g["rank"], "guess_num": g.get("guess_num", 0), "duplicate": False, }) if game.get("pos_revealed"): await ws.send_json({"type": "hint_pos", "player": "?", "pos": meta.get("pos", "unknown"), "total_hints": game.get("total_hints", 0)}) if game.get("concreteness_revealed") and meta.get("concreteness") is not None: await ws.send_json({"type": "hint_concreteness", "player": "?", "label": _concreteness_label(meta["concreteness"]), "total_hints": game.get("total_hints", 0)}) if game.get("vad_revealed") and meta.get("vad"): await ws.send_json({"type": "hint_vad", "player": "?", "label": _vad_label(meta["vad"]), "total_hints": game.get("total_hints", 0)}) if game.get("sensorimotor_revealed") and meta.get("sensorimotor"): await ws.send_json({"type": "hint_sensorimotor", "player": "?", "label": _sensorimotor_label(meta["sensorimotor"]), "total_hints": game.get("total_hints", 0)}) if game.get("glasgow_revealed") and meta.get("glasgow"): await ws.send_json({"type": "hint_glasgow", "player": "?", "label": _glasgow_label(meta["glasgow"]), "total_hints": game.get("total_hints", 0)}) cat_used = game.get("category_hints_used", 0) if cat_used > 0: await ws.send_json({ "type": "hint_category", "player": "?", "category": hypernyms[cat_used - 1], "categories": hypernyms[:cat_used], "has_categories": cat_used < len(hypernyms), "total_hints": game.get("total_hints", 0), }) def_revealed = game.get("definition_words_revealed", 0) raw_defs = meta.get("definitions", []) if def_revealed > 0 and raw_defs: display, total = _build_progressive_definition(raw_defs[0], game["secret_word"], def_revealed) await ws.send_json({"type": "hint_definition", "player": "?", "definition": display, "done": def_revealed >= total, "total_hints": game.get("total_hints", 0)}) cn_used = game.get("conceptnet_hints_used", 0) conceptnet = meta.get("conceptnet", {}) cn_order = ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] cn_available = [r for r in cn_order if r in conceptnet and conceptnet[r]] for i in range(min(cn_used, len(cn_available))): await ws.send_json({ "type": "hint_conceptnet", "player": "?", "relation": cn_available[i], "values": conceptnet[cn_available[i]], "has_more": i + 1 < len(cn_available), "total_hints": game.get("total_hints", 0), }) if game.get("solved"): await ws.send_json({ "type": "game_over", "winner": game.get("winner") or "?", "word": game["secret_word"], "guesses": game.get("guess_count", 0), }) if game.get("gave_up"): await ws.send_json({ "type": "gave_up", "player": game.get("gave_up_by") or "?", "word": game["secret_word"], }) async def _handle_lobby_ws(lobby: dict, player_id: str, ws: WebSocket): """Main WebSocket handler for a lobby participant.""" code = lobby["id"] while True: try: data = await ws.receive_json() except WebSocketDisconnect: break except Exception: break member = lobby["members"].get(player_id) if not member: break name = member["name"] is_host = player_id == lobby.get("host_player_id") _touch_lobby(lobby) msg_type = data.get("type") cleanup_games() game_id = lobby.get("game_id") game = GAMES.get(game_id) if game_id else None if game: _ensure_guess_counters(game) elif game_id and game is None: lobby["game_id"] = None if msg_type == "leave_lobby": removed_name = _remove_lobby_member(lobby, player_id) if removed_name and lobby["members"]: await broadcast(lobby, { "type": "player_left", "name": removed_name, "players": _lobby_player_names(lobby), "host": _lobby_host_name(lobby), }) if not lobby["members"]: LOBBIES.pop(code, None) await ws.close() break if msg_type == "start_game" or msg_type == "new_round": if not is_host: await ws.send_json({"type": "error", "message": "Only the host can start a game"}) continue cleanup_games() diff = data.get("difficulty") or lobby.get("difficulty") lobby["difficulty"] = diff seed = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=6)) secret_word = seed_to_word_filtered(seed, diff) new_game_id = secrets.token_urlsafe(12) GAMES[new_game_id] = create_game(secret_word, seed) lobby["game_id"] = new_game_id _touch_lobby(lobby) meta = SECRET_META.get(secret_word, {}) cn_order = ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] cn_available = [r for r in cn_order if meta.get("conceptnet", {}).get(r)] hypernyms = meta.get("hypernyms", []) await broadcast(lobby, { "type": "game_started", "seed": seed, "difficulty": diff, "vocab_size": len(WORDS), "game_id": new_game_id, "has_definition": bool(meta.get("definitions")), "has_concreteness": meta.get("concreteness") is not None, "has_vad": PHASE3_HINT_ENABLED["vad"] and bool(meta.get("vad")), "has_sensorimotor": PHASE3_HINT_ENABLED["sensorimotor"] and bool(meta.get("sensorimotor")), "has_glasgow": PHASE3_HINT_ENABLED["glasgow"] and bool(meta.get("glasgow")), "has_conceptnet": bool(cn_available), "has_categories": bool(hypernyms), }) auto_hints = max(0, int(data.get("start_hints", 0))) new_game = GAMES[new_game_id] for _ in range(auto_hints): h_used = new_game["hints_used"] if h_used >= len(HINT_RANKS): break rank = HINT_RANKS[h_used] hint_word = new_game["hint_words"][h_used] score = rank_to_score(rank, len(WORDS)) new_game["hints_used"] = h_used + 1 new_game["total_hints"] = new_game.get("total_hints", 0) + 1 already_guessed = any(g["word"] == hint_word for g in new_game["guesses"]) if not already_guessed: new_game["guesses"].append({"word": hint_word, "score": score, "rank": rank, "isHint": True, "player": "(auto)"}) await broadcast(lobby, { "type": "hint_result", "player": "(auto)", "hint_type": "word", "word": hint_word, "score": score, "rank": rank, "hints_used": new_game["hints_used"], "total_hints": new_game.get("total_hints", 0), "already_guessed": already_guessed, }) elif msg_type == "guess": if not game or game["solved"] or game["gave_up"]: await ws.send_json({"type": "error", "message": "No active game"}) continue word = data.get("word", "").strip().lower() if not word: continue guess_idx = WORD_TO_IDX.get(word) if guess_idx is None: await ws.send_json({"type": "error", "message": f'"{word}" not in dictionary'}) continue if any(g["word"] == word for g in game["guesses"]): existing = next(g for g in game["guesses"] if g["word"] == word) await ws.send_json({ "type": "guess_result", "player": name, "word": word, "score": existing["score"], "rank": existing["rank"], "guess_num": existing.get("guess_num", 0), "duplicate": True, }) continue solved = word == game["secret_word"] sims = game["sims"] guess_sim = float(sims[guess_idx]) rank = int((sims > guess_sim).sum()) + 1 score = rank_to_score(rank, len(WORDS)) game["guess_count"] = game.get("guess_count", 0) + 1 game["guesses"].append({"word": word, "score": score, "rank": rank, "isHint": False, "player": name, "guess_num": game["guess_count"]}) if solved: game["solved"] = True game["winner"] = name result = { "type": "guess_result", "player": name, "word": word, "score": score, "rank": rank, "guess_num": game["guess_count"], "duplicate": False, } await broadcast(lobby, result) if solved: await broadcast(lobby, {"type": "game_over", "winner": name, "word": word, "guesses": game.get("guess_count", 0)}) elif msg_type == "hint": if not game or game["solved"] or game["gave_up"]: await ws.send_json({"type": "error", "message": "No active game"}) continue hints_used = game["hints_used"] if hints_used >= len(HINT_RANKS): await ws.send_json({"type": "error", "message": "No more hints"}) continue rank = HINT_RANKS[hints_used] hint_word = game["hint_words"][hints_used] score = rank_to_score(rank, len(WORDS)) game["hints_used"] = hints_used + 1 game["total_hints"] = game.get("total_hints", 0) + 1 already_guessed = any(g["word"] == hint_word for g in game["guesses"]) if not already_guessed: game["guesses"].append({"word": hint_word, "score": score, "rank": rank, "isHint": True, "player": name}) await broadcast(lobby, { "type": "hint_result", "player": name, "hint_type": "word", "word": hint_word, "score": score, "rank": rank, "hints_used": game["hints_used"], "total_hints": game.get("total_hints", 0), "already_guessed": already_guessed, }) elif msg_type == "hint_pos": if not game or game["solved"] or game["gave_up"]: continue meta = game.get("meta", {}) pos = meta.get("pos", "unknown") if not game.get("pos_revealed"): game["pos_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 await broadcast(lobby, {"type": "hint_pos", "player": name, "pos": pos, "total_hints": game.get("total_hints", 0)}) elif msg_type == "hint_category": if not game or game["solved"] or game["gave_up"]: continue meta = game.get("meta", {}) hypernyms = meta.get("hypernyms", []) used = game.get("category_hints_used", 0) if used >= len(hypernyms): await ws.send_json({"type": "error", "message": "No more category hints"}) continue category = hypernyms[used] game["category_hints_used"] = used + 1 game["total_hints"] = game.get("total_hints", 0) + 1 await broadcast(lobby, { "type": "hint_category", "player": name, "category": category, "categories": hypernyms[:game["category_hints_used"]], "has_categories": used + 1 < len(hypernyms), "total_hints": game.get("total_hints", 0), }) elif msg_type == "hint_definition": if not game or game["solved"] or game["gave_up"]: continue meta = game.get("meta", {}) definitions = meta.get("definitions", []) if not definitions: await ws.send_json({"type": "error", "message": "No definition available"}) continue current = game.get("definition_words_revealed", 0) _, total_content = _build_progressive_definition(definitions[0], game["secret_word"], 0) if current >= total_content: await ws.send_json({"type": "error", "message": "Definition fully revealed"}) continue game["definition_words_revealed"] = current + 1 game["total_hints"] = game.get("total_hints", 0) + 1 display, _ = _build_progressive_definition(definitions[0], game["secret_word"], current + 1) done = (current + 1) >= total_content await broadcast(lobby, { "type": "hint_definition", "player": name, "definition": display, "done": done, "total_hints": game.get("total_hints", 0), }) elif msg_type == "hint_concreteness": if not game or game["solved"] or game["gave_up"]: continue meta = game.get("meta", {}) rating = meta.get("concreteness") if rating is None: await ws.send_json({"type": "error", "message": "No concreteness data"}) continue if not game.get("concreteness_revealed"): game["concreteness_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 label = _concreteness_label(rating) await broadcast(lobby, {"type": "hint_concreteness", "player": name, "label": label, "total_hints": game.get("total_hints", 0)}) elif msg_type == "hint_vad": if not game or game["solved"] or game["gave_up"]: continue if not PHASE3_HINT_ENABLED["vad"]: await ws.send_json({"type": "error", "message": "VAD hints disabled"}) continue meta = game.get("meta", {}) vad = meta.get("vad") if not vad: await ws.send_json({"type": "error", "message": "No VAD data"}) continue if not game.get("vad_revealed"): game["vad_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 await broadcast(lobby, {"type": "hint_vad", "player": name, "label": _vad_label(vad), "total_hints": game.get("total_hints", 0)}) elif msg_type == "hint_sensorimotor": if not game or game["solved"] or game["gave_up"]: continue if not PHASE3_HINT_ENABLED["sensorimotor"]: await ws.send_json({"type": "error", "message": "Sensorimotor hints disabled"}) continue meta = game.get("meta", {}) sensorimotor = meta.get("sensorimotor") if not sensorimotor: await ws.send_json({"type": "error", "message": "No sensorimotor data"}) continue if not game.get("sensorimotor_revealed"): game["sensorimotor_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 await broadcast(lobby, {"type": "hint_sensorimotor", "player": name, "label": _sensorimotor_label(sensorimotor), "total_hints": game.get("total_hints", 0)}) elif msg_type == "hint_glasgow": if not game or game["solved"] or game["gave_up"]: continue if not PHASE3_HINT_ENABLED["glasgow"]: await ws.send_json({"type": "error", "message": "Glasgow hints disabled"}) continue meta = game.get("meta", {}) glasgow = meta.get("glasgow") if not glasgow: await ws.send_json({"type": "error", "message": "No Glasgow data"}) continue if not game.get("glasgow_revealed"): game["glasgow_revealed"] = True game["total_hints"] = game.get("total_hints", 0) + 1 await broadcast(lobby, {"type": "hint_glasgow", "player": name, "label": _glasgow_label(glasgow), "total_hints": game.get("total_hints", 0)}) elif msg_type == "hint_conceptnet": if not game or game["solved"] or game["gave_up"]: continue meta = game.get("meta", {}) conceptnet = meta.get("conceptnet", {}) cn_order = ["UsedFor", "HasProperty", "AtLocation", "IsA", "HasA"] available = [r for r in cn_order if r in conceptnet and conceptnet[r]] used = game.get("conceptnet_hints_used", 0) if used >= len(available): await ws.send_json({"type": "error", "message": "No more semantic clues"}) continue relation = available[used] values = conceptnet[relation] game["conceptnet_hints_used"] = used + 1 game["total_hints"] = game.get("total_hints", 0) + 1 await broadcast(lobby, { "type": "hint_conceptnet", "player": name, "relation": relation, "values": values, "has_more": game["conceptnet_hints_used"] < len(available), "total_hints": game.get("total_hints", 0), }) elif msg_type == "give_up": if not game or game["solved"] or game["gave_up"]: continue game["gave_up"] = True game["gave_up_by"] = name await broadcast(lobby, {"type": "gave_up", "player": name, "word": game["secret_word"]}) else: await ws.send_json({"type": "error", "message": f"Unknown message type: {msg_type}"}) @app.websocket("/api/lobby/{code}/ws") async def lobby_websocket(ws: WebSocket, code: str, name: str = Query(...), player_id: str | None = Query(None)): cleanup_lobbies() code = code.upper() lobby = LOBBIES.get(code) if not lobby: await ws.close(code=4004, reason="Lobby not found") return name = name.strip()[:20] or "Player" reconnecting = bool(player_id and player_id in lobby["members"]) if reconnecting and player_id: name = lobby["members"][player_id]["name"] lobby["members"][player_id]["last_seen"] = time.time() else: existing_names = {m["name"] for m in lobby["members"].values()} base_name = name counter = 2 while name in existing_names: name = f"{base_name}{counter}" counter += 1 player_id = secrets.token_urlsafe(8) lobby["members"][player_id] = { "name": name, "joined_at": time.time(), "last_seen": time.time(), } if lobby.get("host_player_id") is None: lobby["host_player_id"] = player_id await ws.accept() existing_ws = lobby["connections"].get(player_id) if existing_ws is not None and existing_ws is not ws: try: await existing_ws.close() except Exception: pass lobby["connections"][player_id] = ws _touch_lobby(lobby) await broadcast( lobby, {"type": "player_joined", "name": name, "players": _lobby_player_names(lobby), "host": _lobby_host_name(lobby)}, ) cleanup_games() game_id = lobby.get("game_id") game = GAMES.get(game_id) if game_id else None if game: await _send_lobby_game_snapshot(ws, lobby, game_id, game) await ws.send_json({ "type": "welcome", "name": name, "host": _lobby_host_name(lobby), "players": _lobby_player_names(lobby), "code": code, "player_id": player_id, "reconnected": reconnecting, }) try: await _handle_lobby_ws(lobby, player_id, ws) finally: if lobby["connections"].get(player_id) is ws: lobby["connections"].pop(player_id, None) member = lobby["members"].get(player_id) if member: member["last_seen"] = time.time() _touch_lobby(lobby) if not lobby["members"]: LOBBIES.pop(code, None) # Serve static frontend files (if present) _static = Path(__file__).parent / "static" if _static.exists(): app.mount("/", StaticFiles(directory=str(_static), html=True), name="static")