Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import json | |
| import math | |
| import os | |
| import random | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import urllib.error | |
| import urllib.request | |
| from dataclasses import asdict | |
| from pathlib import Path | |
| from typing import Any | |
| import gradio as gr | |
| from fastapi import HTTPException, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| from fastapi.staticfiles import StaticFiles | |
| # On HF Spaces ZeroGPU, the runtime scans the app at startup for any | |
| # @spaces.GPU-decorated function and refuses to launch if none are found. | |
| # llm/zerogpu_backend.py is otherwise imported lazily (only on first chat), | |
| # so we eagerly import it here when running on a Space to register the | |
| # decorator before the startup scan runs. | |
| if os.getenv("SPACE_ID"): | |
| from llm import zerogpu_backend # noqa: F401 | |
| from config import load_settings | |
| from game.rules import checks_remaining_this_turn | |
| from game.session import ( | |
| TACTIC_LIMITS, | |
| add_block, | |
| check_junction, | |
| end_turn, | |
| issue_notice, | |
| new_game, | |
| place_tactic, | |
| persist, | |
| question_witness, | |
| remove_tactic, | |
| finalize_game, | |
| update_notes, | |
| ) | |
| from game.save_load import load_state | |
| from game.state import GameState, WitnessQuestion | |
| from game.story_engine import compact_story_memory, ensure_case_introduction, story_reveal | |
| from game.context_budget import ContextBudget, normalize_context_length | |
| from game.case_catalog import choose_case | |
| from game.witness_engine import deterministic_witness_answer, witness_by_id | |
| from grid_map.atlas import public_atlas_payload | |
| from grid_map.graph_loader import all_junction_ids, legal_moves_from | |
| from grid_map.map_loader import image_for_layer, load_map_metadata | |
| from grid_map.storage import read_json | |
| from llm.omni_client import OmniClient, OmniResponse, scan_minicpm_models | |
| from llm.audio import wav_to_float32_base64 | |
| from llm.devices import ( | |
| context_length_presets, | |
| detect_devices, | |
| gpu_layer_presets, | |
| quantization_catalog, | |
| resolve_device_env, | |
| ) | |
| DEFAULT_DESCRIPTION = "A nervous-looking person in a grey raincoat carrying a red folder." | |
| DEFAULT_NOTICE = "Request high-confidence reports of a grey raincoat carrying a red folder at the selected junction." | |
| DEFAULT_QUESTION = "What exactly did the person carry?" | |
| DEFAULT_SELECTED_JUNCTION = 100 | |
| MAP_CLICK_RADIUS = 64 | |
| PROJECT_ROOT = Path(__file__).resolve().parent | |
| WEB_DIR = PROJECT_ROOT / "ui" / "web" | |
| STATIC_DIR = WEB_DIR / "static" | |
| _SESSIONS: dict[str, GameState] = {} | |
| _LLAMA_PROCESS: subprocess.Popen | None = None | |
| _SETUP_PROCESS: subprocess.Popen | None = None | |
| RUNTIME_ROOT = PROJECT_ROOT / "runtime" | |
| DIFFICULTY_PRESETS = { | |
| "easy": {"PHANTOM_GRID_MAX_TURNS": "16", "PHANTOM_GRID_CHECKS_PER_TURN": "3", "PHANTOM_GRID_MEMORY_CORRUPTION_PER_TURN": "0.04"}, | |
| "normal": {"PHANTOM_GRID_MAX_TURNS": "12", "PHANTOM_GRID_CHECKS_PER_TURN": "2", "PHANTOM_GRID_MEMORY_CORRUPTION_PER_TURN": "0.08"}, | |
| "hard": {"PHANTOM_GRID_MAX_TURNS": "10", "PHANTOM_GRID_CHECKS_PER_TURN": "1", "PHANTOM_GRID_MEMORY_CORRUPTION_PER_TURN": "0.12"}, | |
| } | |
| def build_app() -> gr.Server: | |
| app = gr.Server() | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| async def homepage() -> str: | |
| return (WEB_DIR / "index.html").read_text(encoding="utf-8") | |
| async def map_asset(layer: str) -> FileResponse: | |
| try: | |
| path = Path(image_for_layer(layer)) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=f"Unknown map layer: {layer}") from exc | |
| if not path.exists(): | |
| raise HTTPException(status_code=404, detail=f"Missing map asset: {layer}") | |
| return FileResponse(path) | |
| async def suspect_asset() -> FileResponse: | |
| return FileResponse(STATIC_DIR / "assets" / "reference" / "suspect_portrait_placeholder.png", media_type="image/png") | |
| async def voice_asset(voice_id: str) -> FileResponse: | |
| path = _voice_path(voice_id) | |
| if path is None: | |
| raise HTTPException(status_code=404, detail="Unknown witness voice.") | |
| return FileResponse(path, media_type="audio/wav") | |
| async def snapshot_route(game_id: str | None = None) -> dict[str, Any]: | |
| return game_snapshot(game_id) | |
| async def new_case_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return new_case(payload.get("initial_description"), require_omni=True) | |
| async def select_junctions_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return select_junctions( | |
| payload.get("game_id"), | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| ) | |
| async def issue_notice_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_issue_notice( | |
| payload.get("game_id"), | |
| payload.get("notice_text") or DEFAULT_NOTICE, | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| ) | |
| async def add_block_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_add_block( | |
| payload.get("game_id"), | |
| payload.get("block_type") or "junction_block", | |
| payload.get("focused_junction"), | |
| payload.get("to_junction"), | |
| payload.get("mode"), | |
| payload.get("turns") or 1, | |
| payload.get("selected_junctions") or [], | |
| ) | |
| async def place_tactic_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_place_tactic( | |
| payload.get("game_id"), | |
| payload.get("tactic_type"), | |
| payload.get("junction_id"), | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| layer=payload.get("layer"), | |
| ) | |
| async def remove_tactic_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_remove_tactic( | |
| payload.get("game_id"), | |
| payload.get("tactic_id"), | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| ) | |
| async def check_junctions_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_check_junctions( | |
| payload.get("game_id"), | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| ) | |
| async def ask_witness_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_ask_witness( | |
| payload.get("game_id"), | |
| payload.get("witness_id"), | |
| payload.get("question") or DEFAULT_QUESTION, | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| use_model=True, | |
| ) | |
| async def advance_turn_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_advance_turn( | |
| payload.get("game_id"), | |
| payload.get("selected_junctions") or [], | |
| payload.get("focused_junction"), | |
| use_model=True, | |
| ) | |
| async def omni_status_route() -> dict[str, Any]: | |
| return api_omni_status() | |
| async def omni_models_route() -> dict[str, Any]: | |
| return api_omni_models() | |
| async def notes_route(game_id: str, payload: dict[str, Any]) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| update_notes(state, str(payload.get("notes") or "")) | |
| return {"ok": True, "notes": state.user_notes} | |
| async def stop_game_route(game_id: str, payload: dict[str, Any] | None = None) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| reveal = finalize_game(state, str((payload or {}).get("reason") or "stopped")) | |
| snapshot = _snapshot(state, [], None, "Case finalized.") | |
| snapshot["story_available"] = False | |
| return {"ok": True, "story": reveal, "snapshot": snapshot} | |
| async def story_route(game_id: str) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| if not state.result and not state.finalized_reason: | |
| raise HTTPException(status_code=403, detail="The private story is revealed only after the case ends.") | |
| return {"ok": True, "story": story_reveal(state)} | |
| async def witness_route(game_id: str, witness_id: str) -> dict[str, Any]: | |
| return api_witness_detail(game_id, witness_id) | |
| async def witness_message_route(game_id: str, witness_id: str, payload: dict[str, Any]) -> dict[str, Any]: | |
| try: | |
| return api_witness_message(game_id, witness_id, str(payload.get("message") or "")) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| import traceback | |
| tb = traceback.format_exc() | |
| print(f"[witness_message_route] FAILED: {tb}", flush=True) | |
| status = 503 if isinstance(exc, RuntimeError) else 500 | |
| raise HTTPException(status_code=status, detail=f"{exc.__class__.__name__}: {exc}") from exc | |
| async def witness_socket(websocket: WebSocket, game_id: str, witness_id: str) -> None: | |
| await proxy_witness_socket(websocket, game_id, witness_id) | |
| async def settings_route() -> dict[str, Any]: | |
| return api_settings() | |
| async def update_settings_route(payload: dict[str, Any]) -> dict[str, Any]: | |
| return api_update_settings(payload) | |
| async def llama_action_route(action: str, payload: dict[str, Any] | None = None) -> dict[str, Any]: | |
| return api_llama_action(action, payload or {}) | |
| async def setup_status_route() -> dict[str, Any]: | |
| return api_setup_status() | |
| async def setup_start_route(payload: dict[str, Any] | None = None) -> dict[str, Any]: | |
| return api_setup_start(payload or {}) | |
| async def runtime_options_route() -> dict[str, Any]: | |
| return api_runtime_options() | |
| app.api(new_case, name="new_case") | |
| app.api(select_junctions, name="select_junctions") | |
| app.api(api_issue_notice, name="issue_notice") | |
| app.api(api_add_block, name="add_block") | |
| app.api(api_place_tactic, name="place_tactic") | |
| app.api(api_remove_tactic, name="remove_tactic") | |
| app.api(api_check_junctions, name="check_junctions") | |
| app.api(api_ask_witness, name="ask_witness") | |
| app.api(api_advance_turn, name="advance_turn") | |
| app.api(game_snapshot, name="game_snapshot") | |
| return app | |
| def new_case(initial_description: str | None = None, require_omni: bool = False) -> dict[str, Any]: | |
| if require_omni: | |
| health = OmniClient.from_settings().health() | |
| if not health.get("ready"): | |
| raise HTTPException(status_code=503, detail="MiniCPM-o must be healthy before a new case can start.") | |
| case_profile = choose_case() | |
| description = (initial_description or case_profile["description"]).strip() | |
| state = new_game(description, use_model=require_omni, case_profile=case_profile) | |
| _SESSIONS[state.game_id] = state | |
| return _snapshot( | |
| state, | |
| selected_junctions=[], | |
| focused_junction=None, | |
| event="Case opened. The starting point is hidden.", | |
| sound="lookout_raise", | |
| ) | |
| def select_junctions( | |
| game_id: str | None = None, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id, required=False) | |
| clean_selected = _valid_junctions(selected_junctions or []) | |
| clean_focused = _valid_junction(focused_junction) or (clean_selected[-1] if clean_selected else None) | |
| return _snapshot( | |
| state, | |
| selected_junctions=clean_selected, | |
| focused_junction=clean_focused, | |
| event=_selection_event(clean_selected, clean_focused), | |
| sound="map_select", | |
| ) | |
| def api_issue_notice( | |
| game_id: str | None, | |
| notice_text: str = DEFAULT_NOTICE, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| if focused is None: | |
| focused = state.last_seen_junction or DEFAULT_SELECTED_JUNCTION | |
| selected = [focused] | |
| text = _notice_with_selected_junction(notice_text or DEFAULT_NOTICE, focused) | |
| state, batch = issue_notice(state, text, anchor_junction=focused) | |
| _SESSIONS[state.game_id] = state | |
| message = f"{batch.total_witnesses} witnesses surfaced." | |
| if batch.individual_review_allowed: | |
| message += " Witness cards are available." | |
| else: | |
| message += " The crowd is too dense for individual cards." | |
| return _snapshot(state, selected, focused, message, sound="witness_popup") | |
| def api_add_block( | |
| game_id: str | None, | |
| block_type: str, | |
| focused_junction: int | None = None, | |
| to_junction: int | None = None, | |
| mode: str | None = None, | |
| turns: int = 1, | |
| selected_junctions: list[int] | None = None, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| if focused is None: | |
| return _snapshot(state, selected, focused, "Select a junction before placing a blockade.", sound="map_select") | |
| if block_type == "edge_block": | |
| if _valid_junction(to_junction) is None: | |
| return _snapshot(state, selected, focused, "Pick a connected route first.", sound="map_select") | |
| state, message = add_block( | |
| state, | |
| "edge_block", | |
| from_junction=focused, | |
| to_junction=int(to_junction), | |
| mode=mode, | |
| turns=_clean_turns(turns), | |
| ) | |
| elif block_type == "mode_block": | |
| if mode not in {"taxi", "bus", "subway"}: | |
| return _snapshot(state, selected, focused, "Choose taxi, bus, or subway first.", sound="map_select") | |
| state, message = add_block(state, "mode_block", junction_id=focused, mode=mode, turns=_clean_turns(turns)) | |
| else: | |
| state, message = add_block(state, "junction_block", junction_id=focused, turns=_clean_turns(turns)) | |
| _SESSIONS[state.game_id] = state | |
| return _snapshot(state, selected, focused, message, sound="blockade_set") | |
| def api_place_tactic( | |
| game_id: str | None, | |
| tactic_type: str | None, | |
| junction_id: int | None, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| layer: str | None = None, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| target = _valid_junction(junction_id) or focused | |
| if target is None: | |
| return _snapshot(state, selected, focused, "Drop the tactic on a valid junction.", sound="map_select") | |
| junction = _junction_by_id(target) | |
| if junction is None: | |
| return _snapshot(state, selected, focused, "Drop the tactic on a valid junction.", sound="map_select") | |
| state, message = place_tactic(state, str(tactic_type or ""), target, int(junction["x"]), int(junction["y"]), layer=layer) | |
| _SESSIONS[state.game_id] = state | |
| snapshot = _snapshot(state, [*selected, target], target, message, sound="blockade_set") | |
| if tactic_type == "lookout_board" and "No lookout" not in message: | |
| snapshot["notice_prompt"] = { | |
| "open": True, | |
| "junction_id": target, | |
| "prefill": state.last_notice_text or state.initial_description, | |
| } | |
| return snapshot | |
| def api_remove_tactic( | |
| game_id: str | None, | |
| tactic_id: str | None, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| if not tactic_id: | |
| return _snapshot(state, selected, focused, "Choose a placed tactic first.", sound="map_select") | |
| state, message = remove_tactic(state, tactic_id) | |
| _SESSIONS[state.game_id] = state | |
| return _snapshot(state, selected, focused, message, sound="map_select") | |
| def api_check_junctions( | |
| game_id: str | None, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| targets = _ordered_check_targets(selected, focused) | |
| if not targets: | |
| return _snapshot(state, selected, focused, "Select at least one junction to search.", sound="map_select") | |
| remaining = checks_remaining_this_turn(state.turn_number, state.junction_checks) | |
| if remaining <= 0: | |
| return _snapshot(state, selected, focused, "No searches remain this turn.", sound="map_select") | |
| messages: list[str] = [] | |
| for junction_id in targets[:remaining]: | |
| state, message = check_junction(state, junction_id) | |
| messages.append(f"J{junction_id}: {message}") | |
| if state.result: | |
| break | |
| _SESSIONS[state.game_id] = state | |
| return _snapshot(state, selected, focused, " ".join(messages), sound="blockade_set") | |
| def api_ask_witness( | |
| game_id: str | None, | |
| witness_id: str | None, | |
| question: str = DEFAULT_QUESTION, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| use_model: bool = False, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| if not witness_id: | |
| return _snapshot(state, selected, focused, "Choose a witness card first.", sound="map_select") | |
| if use_model: | |
| _require_omni_ready() | |
| state, answer = question_witness(state, witness_id, question or DEFAULT_QUESTION, use_model=use_model) | |
| _SESSIONS[state.game_id] = state | |
| return _snapshot(state, selected, focused, answer, sound="witness_popup") | |
| def api_advance_turn( | |
| game_id: str | None, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| use_model: bool = False, | |
| ) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| if use_model: | |
| _require_omni_ready() | |
| state.effective_context_length = load_settings().llamacpp_context_length | |
| compact_story_memory(state) | |
| previous_batch_count = len(state.witness_batches) | |
| state, message = end_turn(state, use_model=use_model) | |
| _SESSIONS[state.game_id] = state | |
| sound = "witness_popup" if len(state.witness_batches) > previous_batch_count else "turn_advance" | |
| return _snapshot(state, selected, focused, message, sound=sound) | |
| def api_witness_detail(game_id: str, witness_id: str) -> dict[str, Any]: | |
| state = _state_for(game_id) | |
| witness = witness_by_id(state, witness_id) | |
| if witness is None: | |
| raise HTTPException(status_code=404, detail="Witness not found or not yet surfaced.") | |
| if witness_id not in state.viewed_witness_ids: | |
| state.viewed_witness_ids.append(witness_id) | |
| persist(state) | |
| return { | |
| "ok": True, | |
| "witness": { | |
| "id": witness.witness_id, | |
| "name": witness.name, | |
| "occupation": witness.occupation, | |
| "junction_id": witness.junction_id, | |
| "personality": witness.personality, | |
| "reliability": witness.reliability, | |
| "memory": witness.memory_strength, | |
| "summary": witness.current_summary, | |
| "voice_id": witness.voice_id, | |
| "voice_url": f"/assets/voices/{witness.voice_id}", | |
| "transcript": [asdict(item) for item in witness.question_history], | |
| "observed_turn": witness.turn_created, | |
| }, | |
| } | |
| _CJK_RE = re.compile(r"[㐀-䶿一-鿿豈-]") | |
| _CJK_REPLY_RE = re.compile(r"[\u3400-\u9fff\uf900-\ufaff]") | |
| _UNRELATED_REPLY_MARKERS = ( | |
| "what would you like", "can't help with that question", "cannot help with that question", | |
| "criminal matters", "speak in english", "english only", "language instructions", | |
| "as an ai", "i am an ai", "how can i assist", "sure, i can do that", | |
| "give me the details", "beautiful scenery", "scenic spots", "like in movies", | |
| "provide more details", "witness in an english-language", "facts given by the user", | |
| "won't invent", "will not invent", "let's begin", "got it?", | |
| ) | |
| _SPECIFICITY_WORDS = { | |
| "red", "blue", "green", "yellow", "black", "white", "brown", "purple", "orange", | |
| "morning", "afternoon", "evening", "midnight", "noon", "am", "pm", | |
| } | |
| _GROUNDING_STOPWORDS = { | |
| "about", "after", "again", "answer", "asks", "before", "carefully", "conversation", | |
| "detective", "details", "english", "facts", "final", "from", "gave", "gives", "know", | |
| "noticed", "only", "question", "reply", "sentence", "short", "speak", "stable", "that", | |
| "their", "there", "these", "they", "this", "what", "when", "where", "which", "with", | |
| "witness", "would", "your", "you", "personality", "ordinary", "current", "summary", | |
| } | |
| def _usable_witness_reply(text: str, grounding: str, question: str = "") -> bool: | |
| clean = " ".join((text or "").split()).strip() | |
| lowered = clean.lower() | |
| knowledge_lower = grounding.lower() | |
| if not clean or len(clean) > 500 or _CJK_REPLY_RE.search(clean): | |
| return False | |
| if any(marker in lowered for marker in _UNRELATED_REPLY_MARKERS): | |
| return False | |
| answer_words = set(re.findall(r"[a-z]+", lowered)) | |
| knowledge_words = set(re.findall(r"[a-z]+", knowledge_lower)) | |
| if any(word in answer_words and word not in knowledge_words for word in _SPECIFICITY_WORDS): | |
| return False | |
| response_numbers = set(re.findall(r"\b\d+(?::\d+)?\b", lowered)) | |
| knowledge_numbers = set(re.findall(r"\b\d+(?::\d+)?\b", knowledge_lower)) | |
| if not response_numbers <= knowledge_numbers: | |
| return False | |
| if any(phrase in lowered for phrase in ("i don't know", "i do not know", "not sure", "cannot remember", "can't remember")): | |
| return True | |
| meaningful_answer = answer_words - _GROUNDING_STOPWORDS | |
| meaningful_knowledge = knowledge_words - _GROUNDING_STOPWORDS | |
| return bool(meaningful_answer & meaningful_knowledge) | |
| def _witness_chat_with_english_retry(settings, system_prompt, user_prompt, voice_path, question, grounding): | |
| # MiniCPM occasionally slips into Chinese filler or off-topic text. One | |
| # tight retry catches the easy cases; more retries are too expensive on | |
| # the per-call ZeroGPU budget. | |
| client = OmniClient(settings) | |
| response = client.chat( | |
| system_prompt, user_prompt, task="interview", temperature=0.15, tts=False, | |
| ) | |
| if _usable_witness_reply(response.text, grounding, question): | |
| return response | |
| print(f"[witness_chat] first reply rejected: {response.text!r}", flush=True) | |
| retry_system = ( | |
| system_prompt + " Reply in plain English only; no Chinese characters. " | |
| "Stick to facts the witness was given." | |
| ) | |
| response = client.chat(retry_system, user_prompt, task="interview", temperature=0.0, tts=False) | |
| if _usable_witness_reply(response.text, grounding, question): | |
| return response | |
| print(f"[witness_chat] retry reply rejected: {response.text!r}", flush=True) | |
| return OmniResponse(text="") | |
| def api_witness_message(game_id: str, witness_id: str, message: str) -> dict[str, Any]: | |
| clean = " ".join(message.split())[:2000] | |
| if not clean: | |
| raise HTTPException(status_code=400, detail="Enter a question for the witness.") | |
| _require_omni_ready() | |
| state = _state_for(game_id) | |
| witness = witness_by_id(state, witness_id) | |
| if witness is None: | |
| raise HTTPException(status_code=404, detail="Witness not found or not yet surfaced.") | |
| voice_path = _voice_path(witness.voice_id) | |
| settings = load_settings() | |
| budget = ContextBudget.for_context(settings.llamacpp_context_length) | |
| # MiniCPM-o-4.5 Q4_K_M reliably degrades to Chinese filler when given a JSON | |
| # blob as the user message — its Chinese assistant prior overwhelms a | |
| # prompt it can't parse. Plain English with the question on the last line | |
| # produces consistent on-topic English replies. | |
| stable_block = ", ".join(witness.stable_facts) if witness.stable_facts else "(none recorded)" | |
| grounding = f"{witness.current_summary} {stable_block}" | |
| history = [ | |
| item for item in witness.question_history[-budget.recent_interview_turns :] | |
| if _usable_witness_reply(item.answer, grounding, item.question) | |
| ] | |
| system_prompt = ( | |
| "You are roleplaying a witness in an English-language detective game. " | |
| "Speak only English. Reply in one or two short sentences. Use only the " | |
| "facts the user gives you. Let the supplied personality control tone, " | |
| "confidence, and brevity. Never invent details. If you don't know, say " | |
| "you don't know." | |
| ) | |
| history_block = ( | |
| "\n".join(f" Detective: {item.question}\n You: {item.answer}" for item in history) | |
| if history else " (no prior questions)" | |
| ) | |
| personality_block = ", ".join(f"{k}: {v}" for k, v in witness.personality.items()) or "ordinary" | |
| user_prompt = ( | |
| f"You are {witness.name}, a {witness.occupation} ({personality_block}).\n" | |
| f"What you saw / know: {witness.current_summary}\n" | |
| f"Stable facts: {stable_block}\n" | |
| f"Conversation so far:\n{history_block}\n" | |
| f"The detective now asks: {clean!r}\n" | |
| f"Reply in character, in English, in one or two short sentences." | |
| ) | |
| greeting = any(word in clean.lower().split() for word in ("hello", "hi", "hey")) | |
| response = OmniResponse(text="") if greeting else _witness_chat_with_english_retry( | |
| settings, system_prompt, user_prompt, voice_path, clean, grounding, | |
| ) | |
| answer = response.text.strip() or deterministic_witness_answer(witness, clean) | |
| if settings.witness_chat_tts: | |
| speech = OmniClient(settings).synthesize( | |
| answer, | |
| ref_audio_path=str(voice_path) if voice_path else None, | |
| ) | |
| response.audio_data = speech.audio_data | |
| response.audio_sample_rate = speech.audio_sample_rate | |
| witness.question_history.append(WitnessQuestion(question=clean, answer=answer, turn_number=state.turn_number)) | |
| if witness_id not in state.viewed_witness_ids: | |
| state.viewed_witness_ids.append(witness_id) | |
| persist(state) | |
| return { | |
| "ok": True, | |
| "answer": answer, | |
| "audio_data": response.audio_data, | |
| "audio_sample_rate": response.audio_sample_rate or 24000, | |
| "snapshot": _snapshot(state, [witness.junction_id], witness.junction_id), | |
| } | |
| async def proxy_witness_socket(websocket: WebSocket, game_id: str, witness_id: str) -> None: | |
| state = _state_for(game_id) | |
| witness = witness_by_id(state, witness_id) | |
| if witness is None: | |
| await websocket.close(code=1008, reason="Witness not available") | |
| return | |
| if not OmniClient.from_settings().omni_health().get("ready"): | |
| await websocket.close(code=1013, reason="MiniCPM-o service unavailable") | |
| return | |
| await websocket.accept() | |
| settings = load_settings() | |
| gateway = settings.omni_gateway_url.rstrip("/") | |
| if gateway.startswith("https://"): | |
| gateway = "wss://" + gateway[8:] | |
| elif gateway.startswith("http://"): | |
| gateway = "ws://" + gateway[7:] | |
| session_id = f"{game_id}_{witness_id}".replace("/", "_")[-180:] | |
| target = f"{gateway}/ws/half_duplex/{session_id}" | |
| voice_path = _voice_path(witness.voice_id) | |
| voice_b64, voice_duration = wav_to_float32_base64(voice_path) if voice_path else ("", 0.0) | |
| assistant_chunks: list[str] = [] | |
| try: | |
| import websockets | |
| async with websockets.connect(target, max_size=32 * 1024 * 1024) as upstream: | |
| async def client_to_upstream() -> None: | |
| async for raw in websocket.iter_text(): | |
| data = json.loads(raw) | |
| if data.get("type") == "prepare": | |
| budget = ContextBudget.for_context(settings.llamacpp_context_length) | |
| data["system_content"] = [ | |
| {"type": "text", "text": f"Clone this voice. You are {witness.name}, a {witness.occupation}. Speak only from this knowledge: {witness.current_summary}"}, | |
| { | |
| "type": "audio", | |
| "data": voice_b64, | |
| "name": f"{witness.voice_id}.wav", | |
| "duration": voice_duration, | |
| }, | |
| {"type": "text", "text": "Stay in character. Reply in English only — do not translate or speak Chinese. Be concise, and never invent hidden facts."}, | |
| ] | |
| data["lang"] = "en" | |
| data["config"] = { | |
| "vad": { | |
| "threshold": 0.5, | |
| "min_speech_duration_ms": 128, | |
| "min_silence_duration_ms": 600, | |
| "speech_pad_ms": 30, | |
| }, | |
| "generation": { | |
| "max_new_tokens": min(96, budget.output_tokens), | |
| "length_penalty": 1.1, | |
| "temperature": 0.7, | |
| }, | |
| "tts": {"enabled": True}, | |
| "session": {"timeout_s": 300}, | |
| } | |
| await upstream.send(json.dumps(data)) | |
| async def upstream_to_client() -> None: | |
| async for raw in upstream: | |
| data = json.loads(raw) | |
| if data.get("text_delta"): | |
| assistant_chunks.append(str(data["text_delta"])) | |
| if data.get("type") == "turn_done" and assistant_chunks: | |
| answer = "".join(assistant_chunks).strip() | |
| assistant_chunks.clear() | |
| witness.question_history.append(WitnessQuestion( | |
| question="[Spoken question]", answer=answer, turn_number=state.turn_number | |
| )) | |
| if witness_id not in state.viewed_witness_ids: | |
| state.viewed_witness_ids.append(witness_id) | |
| persist(state) | |
| await websocket.send_text(raw) | |
| tasks = [asyncio.create_task(client_to_upstream()), asyncio.create_task(upstream_to_client())] | |
| done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
| for task in pending: | |
| task.cancel() | |
| for task in done: | |
| task.result() | |
| except (WebSocketDisconnect, OSError, ValueError, json.JSONDecodeError) as exc: | |
| try: | |
| await websocket.send_json({"type": "error", "error": str(exc)}) | |
| except Exception: | |
| pass | |
| finally: | |
| try: | |
| await websocket.close() | |
| except Exception: | |
| pass | |
| def api_settings() -> dict[str, Any]: | |
| settings = load_settings() | |
| llama_status, omni_status = _service_statuses(settings) | |
| return { | |
| "ok": True, | |
| "settings": _settings_payload(settings), | |
| "llama": llama_status, | |
| "omni": omni_status, | |
| "model_scan": scan_minicpm_models(settings.minicpm_model_dir), | |
| "difficulty_presets": { | |
| "easy": "Longer case, more checks, slower memory decay.", | |
| "normal": "Balanced turn limit, checks, and witness memory decay.", | |
| "hard": "Shorter case, fewer checks, faster witness memory decay.", | |
| }, | |
| } | |
| def api_update_settings(payload: dict[str, Any]) -> dict[str, Any]: | |
| updates: dict[str, str] = {} | |
| difficulty = str(payload.get("difficulty") or "").strip().lower() | |
| if difficulty in DIFFICULTY_PRESETS: | |
| updates.update(DIFFICULTY_PRESETS[difficulty]) | |
| updates["PHANTOM_GRID_DIFFICULTY"] = difficulty | |
| field_map = { | |
| "llm_provider": "PHANTOM_GRID_LLM_PROVIDER", | |
| "llm_model": "PHANTOM_GRID_LLM_MODEL", | |
| "llamacpp_model_path": "PHANTOM_GRID_LLAMACPP_MODEL_PATH", | |
| "llamacpp_server_bin": "PHANTOM_GRID_LLAMACPP_SERVER_BIN", | |
| "llamacpp_base_url": "PHANTOM_GRID_LLAMACPP_BASE_URL", | |
| "max_turns": "PHANTOM_GRID_MAX_TURNS", | |
| "checks_per_turn": "PHANTOM_GRID_CHECKS_PER_TURN", | |
| "memory_corruption_per_turn": "PHANTOM_GRID_MEMORY_CORRUPTION_PER_TURN", | |
| "omni_gateway_url": "PHANTOM_GRID_OMNI_GATEWAY_URL", | |
| "omni_launcher_path": "PHANTOM_GRID_OMNI_LAUNCHER_PATH", | |
| "comni_checkout_path": "PHANTOM_GRID_COMNI_CHECKOUT_PATH", | |
| "llamacpp_omni_root": "PHANTOM_GRID_LLAMACPP_OMNI_ROOT", | |
| "minicpm_model_dir": "PHANTOM_GRID_MINICPM_MODEL_DIR", | |
| "minicpm_quantization": "PHANTOM_GRID_MINICPM_QUANTIZATION", | |
| "llamacpp_gpu_layers": "PHANTOM_GRID_LLAMACPP_GPU_LAYERS", | |
| "minicpm_gpu_device": "PHANTOM_GRID_GPU_DEVICE", | |
| "witness_voice_dir": "PHANTOM_GRID_WITNESS_VOICE_DIR", | |
| } | |
| for field, env_key in field_map.items(): | |
| if field in payload and payload[field] is not None: | |
| value = str(payload[field]).strip() | |
| if value: | |
| updates[env_key] = value | |
| if "llamacpp_context_length" in payload: | |
| try: | |
| updates["PHANTOM_GRID_LLAMACPP_CONTEXT_LENGTH"] = str(normalize_context_length(payload["llamacpp_context_length"])) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| if "llamacpp_gpu_layers" in payload: | |
| gpu_layers = str(payload["llamacpp_gpu_layers"]).strip().lower() | |
| if gpu_layers != "auto": | |
| try: | |
| if int(gpu_layers) < 0: | |
| raise ValueError | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail="GPU layers must be 'auto' or a non-negative integer.") from exc | |
| updates["PHANTOM_GRID_LLAMACPP_GPU_LAYERS"] = gpu_layers | |
| provider = updates.get("PHANTOM_GRID_LLM_PROVIDER", load_settings().llm_provider) | |
| if provider not in {"minicpm_omni", "llama_cpp_server", "external_llama_cpp_server", "zerogpu_transformers"}: | |
| raise HTTPException(status_code=400, detail="Choose a supported AI backend.") | |
| if provider == "llama_cpp_server": | |
| model_path = Path(updates.get("PHANTOM_GRID_LLAMACPP_MODEL_PATH") or str(load_settings().llamacpp_model_path or "")).expanduser() | |
| if not model_path.is_file() or model_path.suffix.lower() != ".gguf": | |
| raise HTTPException(status_code=400, detail="Choose an existing .gguf model file for the standalone llama.cpp backend.") | |
| server_bin = Path(updates.get("PHANTOM_GRID_LLAMACPP_SERVER_BIN") or str(load_settings().llamacpp_server_bin or "")).expanduser() | |
| if not server_bin.is_file(): | |
| raise HTTPException(status_code=400, detail="Choose an existing llama-server executable.") | |
| # Persist the resolved paths and sync the model label to the chosen file | |
| # so any user-supplied GGUF (e.g. D:\Models\...\gemma-...Q8_0.gguf) runs. | |
| updates["PHANTOM_GRID_LLAMACPP_MODEL_PATH"] = str(model_path) | |
| updates["PHANTOM_GRID_LLAMACPP_SERVER_BIN"] = str(server_bin) | |
| updates["PHANTOM_GRID_LLM_MODEL"] = model_path.name | |
| elif provider == "external_llama_cpp_server": | |
| base_url = updates.get("PHANTOM_GRID_LLAMACPP_BASE_URL", load_settings().llamacpp_base_url).rstrip("/") | |
| model = updates.get("PHANTOM_GRID_LLM_MODEL", load_settings().llm_model).strip() | |
| if not base_url.startswith(("http://", "https://")): | |
| raise HTTPException(status_code=400, detail="External server URL must start with http:// or https://.") | |
| if not model: | |
| raise HTTPException(status_code=400, detail="Enter the model ID exposed by the external llama.cpp server.") | |
| updates["PHANTOM_GRID_LLAMACPP_BASE_URL"] = base_url | |
| model_dir = Path(updates.get("PHANTOM_GRID_MINICPM_MODEL_DIR") or str(load_settings().minicpm_model_dir or "")) | |
| selected = updates.get("PHANTOM_GRID_MINICPM_QUANTIZATION") | |
| if selected: | |
| catalog_names = {item["id"] for item in quantization_catalog()} | |
| on_disk_names = {item["filename"] for item in scan_minicpm_models(model_dir).get("models", [])} | |
| # Allow catalog entries even when the file isn't on disk yet — this is the | |
| # first-run case where the user is choosing what the provisioner should | |
| # download. Otherwise require the file to already be present. | |
| if selected not in catalog_names and selected not in on_disk_names: | |
| raise HTTPException(status_code=400, detail="Selected quantization is not a compatible MiniCPM-o LLM GGUF file.") | |
| if "minicpm_gpu_device" in payload: | |
| device_id = str(payload["minicpm_gpu_device"]).strip() | |
| if device_id: | |
| valid_device_ids = {item["id"] for item in detect_devices()} | |
| # Accept stored ids that simply aren't present anymore (e.g. external | |
| # GPU unplugged) — we just warn via the picker, not the validator. | |
| if device_id in valid_device_ids or device_id == "auto" or device_id.startswith(("cuda:", "rocm:")): | |
| updates["PHANTOM_GRID_GPU_DEVICE"] = device_id | |
| if "witness_chat_tts" in payload: | |
| value = payload["witness_chat_tts"] | |
| truthy = value if isinstance(value, bool) else str(value).strip().lower() not in {"", "0", "false", "off", "no"} | |
| updates["PHANTOM_GRID_WITNESS_CHAT_TTS"] = "1" if truthy else "0" | |
| if updates: | |
| _write_env_updates(updates) | |
| os.environ.update(updates) | |
| return api_settings() | |
| def api_llama_action(action: str, payload: dict[str, Any]) -> dict[str, Any]: | |
| if payload: | |
| api_update_settings(payload) | |
| settings = load_settings() | |
| normalized = action.strip().lower() | |
| # ZeroGPU: no subprocess to start/stop/restart. Report status; ignore lifecycle verbs. | |
| if settings.llm_provider == "zerogpu_transformers": | |
| llama_status, omni_status = _service_statuses(settings) | |
| event = "ZeroGPU backend runs in-process." if normalized in {"start", "restart", "stop"} else None | |
| return { | |
| "ok": True, | |
| "event": event, | |
| "llama": llama_status, | |
| "omni": omni_status, | |
| "settings": _settings_payload(settings), | |
| } | |
| if settings.llm_provider == "external_llama_cpp_server" and normalized in {"start", "restart", "stop"}: | |
| llama_status, omni_status = _service_statuses(settings) | |
| return { | |
| "ok": llama_status.get("ready", False), | |
| "event": "External llama.cpp is user-managed. Start, restart, and stop it outside Phantom Grid.", | |
| "llama": llama_status, | |
| "omni": omni_status, | |
| "settings": _settings_payload(settings), | |
| } | |
| if normalized == "status": | |
| llama_status, omni_status = _service_statuses(settings) | |
| return {"ok": True, "llama": llama_status, "omni": omni_status, "settings": _settings_payload(settings)} | |
| if normalized == "stop": | |
| _stop_llama_process() | |
| current = load_settings() | |
| llama_status, omni_status = _service_statuses(current) | |
| return {"ok": True, "event": "MiniCPM-o service stopped.", "llama": llama_status, "omni": omni_status, "settings": _settings_payload(current)} | |
| if normalized == "restart": | |
| _stop_llama_process() | |
| started = _start_llama_process(settings) | |
| current = load_settings() | |
| llama_status, omni_status = _service_statuses(current) | |
| return {"ok": started["ok"], "event": started["event"], "llama": llama_status, "omni": omni_status, "settings": _settings_payload(current)} | |
| if normalized == "start": | |
| started = _start_llama_process(settings) | |
| current = load_settings() | |
| llama_status, omni_status = _service_statuses(current) | |
| return {"ok": started["ok"], "event": started["event"], "llama": llama_status, "omni": omni_status, "settings": _settings_payload(current)} | |
| llama_status, omni_status = _service_statuses(settings) | |
| return {"ok": False, "event": f"Unknown llama action: {action}", "llama": llama_status, "omni": omni_status, "settings": _settings_payload(settings)} | |
| def api_omni_status() -> dict[str, Any]: | |
| settings = load_settings() | |
| health = OmniClient(settings).omni_health() | |
| return _omni_status_payload(settings, health) | |
| def _service_statuses(settings) -> tuple[dict[str, Any], dict[str, Any]]: | |
| client = OmniClient(settings) | |
| return _llama_status(settings, client.health()), _omni_status_payload(settings, client.omni_health()) | |
| def _omni_status_payload(settings, health: dict[str, Any]) -> dict[str, Any]: | |
| scan = scan_minicpm_models(settings.minicpm_model_dir) | |
| managed = bool( | |
| settings.llm_provider == "minicpm_omni" | |
| and _LLAMA_PROCESS | |
| and _LLAMA_PROCESS.poll() is None | |
| ) | |
| return { | |
| "ok": True, | |
| "reachable": health.get("reachable", False), | |
| "ready": health.get("ready", False), | |
| "detail": health.get("detail"), | |
| "managed_process": managed, | |
| "pid": _LLAMA_PROCESS.pid if managed else None, | |
| "model_complete": scan.get("complete", False), | |
| "selected_model": settings.minicpm_quantization, | |
| "context_length": settings.llamacpp_context_length, | |
| "gpu_layers": settings.llamacpp_gpu_layers, | |
| } | |
| def api_omni_models() -> dict[str, Any]: | |
| settings = load_settings() | |
| return {"ok": True, **scan_minicpm_models(settings.minicpm_model_dir)} | |
| def api_setup_status() -> dict[str, Any]: | |
| global _SETUP_PROCESS | |
| # ZeroGPU provider: the model is loaded in-process by llm/zerogpu_backend.py | |
| # at import. There is no local runtime to install, no subprocess to manage, | |
| # and no cmake/llama-server dependency. Report ready so the UI's auto-poll | |
| # doesn't fire /api/setup/start (which would spawn the cmake provisioner). | |
| if load_settings().llm_provider == "zerogpu_transformers": | |
| health = OmniClient(load_settings()).health() | |
| ready = bool(health.get("ready")) | |
| detail = health.get("detail") or {} | |
| load_error = detail.get("load_error") if isinstance(detail, dict) else None | |
| if load_error: | |
| message = f"ZeroGPU model failed to load: {load_error}" | |
| state = "error" | |
| elif ready: | |
| message = "ZeroGPU model is ready." | |
| state = "ready" | |
| else: | |
| message = f"ZeroGPU model loading ({detail.get('model_id', '?')})..." | |
| state = "installing" | |
| return { | |
| "ok": load_error is None, | |
| "state": state, | |
| "stage": "ready" if ready else "service", | |
| "message": message, | |
| "progress": 100 if ready else 50, | |
| "files_ready": True, | |
| "service_ready": ready, | |
| "installing": not ready and load_error is None, | |
| "detail": detail, | |
| "updated_at": None, | |
| } | |
| paths = _local_runtime_paths() | |
| scan = scan_minicpm_models(paths["models"]) | |
| files_ready = ( | |
| (paths["comni"] / "worker.py").exists() | |
| and (paths["comni"] / "gateway.py").exists() | |
| and _local_comni_python(paths["comni"]).exists() | |
| and _local_llama_server(paths["llama"]) is not None | |
| and scan.get("complete", False) | |
| ) | |
| if _SETUP_PROCESS is not None and _SETUP_PROCESS.poll() is not None: | |
| _SETUP_PROCESS = None | |
| status = _read_setup_status() | |
| process_running = ( | |
| (_SETUP_PROCESS is not None and _SETUP_PROCESS.poll() is None) | |
| or _setup_pid_running() | |
| ) | |
| if files_ready: | |
| _configure_local_runtime(scan) | |
| health = OmniClient(load_settings()).health() | |
| service_ready = bool(health.get("ready")) | |
| return { | |
| "ok": True, | |
| "state": "ready" if service_ready else "installed", | |
| "stage": "ready" if service_ready else "service", | |
| "message": "Local AI is ready." if service_ready else "Local AI is installed and ready to start.", | |
| "progress": 100, | |
| "files_ready": True, | |
| "service_ready": service_ready, | |
| "installing": False, | |
| "updated_at": status.get("updated_at"), | |
| } | |
| if status.get("state") == "running" and not process_running: | |
| status = { | |
| "state": "error", | |
| "stage": "setup", | |
| "message": "The previous setup process stopped unexpectedly. Retry setup; completed downloads will be reused.", | |
| "progress": int(status.get("progress", 0)), | |
| "updated_at": status.get("updated_at"), | |
| } | |
| return { | |
| "ok": status.get("state") != "error", | |
| "state": status.get("state", "missing"), | |
| "stage": status.get("stage", "setup"), | |
| "message": status.get("message", "Preparing the local AI runtime..."), | |
| "progress": int(status.get("progress", 0)), | |
| "files_ready": False, | |
| "service_ready": False, | |
| "installing": process_running, | |
| "updated_at": status.get("updated_at"), | |
| } | |
| def api_setup_start(payload: dict[str, Any] | None = None) -> dict[str, Any]: | |
| global _SETUP_PROCESS | |
| # ZeroGPU provider: nothing to install or launch — the in-process backend | |
| # was loaded at import. The setup-start request is a no-op. | |
| if load_settings().llm_provider == "zerogpu_transformers": | |
| return {**api_setup_status(), "event": "ZeroGPU runtime is in-process; no setup needed.", "ok": True} | |
| # Persist any picker choices before kicking off setup so the provisioner | |
| # and the launcher both see the chosen model/GPU/context. | |
| if payload: | |
| api_update_settings(payload) | |
| current = api_setup_status() | |
| if current["files_ready"]: | |
| started = _start_llama_process(load_settings()) | |
| return {**api_setup_status(), "event": started["event"], "ok": started["ok"]} | |
| if _SETUP_PROCESS is not None and _SETUP_PROCESS.poll() is None: | |
| return current | |
| RUNTIME_ROOT.mkdir(parents=True, exist_ok=True) | |
| (RUNTIME_ROOT / "setup_status.json").write_text( | |
| json.dumps({"state": "running", "stage": "setup", "message": "Starting local AI setup...", "progress": 1}) + "\n", | |
| encoding="utf-8", | |
| ) | |
| provisioner = PROJECT_ROOT / "scripts" / "provision_local_runtime.py" | |
| log = (RUNTIME_ROOT / "provisioner.log").open("a", encoding="utf-8") | |
| settings = load_settings() | |
| catalog_ids = {item["id"] for item in quantization_catalog()} | |
| model_file = settings.minicpm_quantization if settings.minicpm_quantization in catalog_ids else "MiniCPM-o-4_5-Q4_K_M.gguf" | |
| try: | |
| _SETUP_PROCESS = subprocess.Popen( | |
| [ | |
| sys.executable, | |
| str(provisioner), | |
| "--runtime-root", str(RUNTIME_ROOT), | |
| "--model-file", model_file, | |
| ], | |
| cwd=PROJECT_ROOT, | |
| stdout=log, | |
| stderr=subprocess.STDOUT, | |
| creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0, | |
| ) | |
| except OSError as exc: | |
| log.close() | |
| return {**current, "ok": False, "state": "error", "message": f"Could not start setup: {exc}"} | |
| return {**api_setup_status(), "event": "Local AI setup started."} | |
| def api_runtime_options() -> dict[str, Any]: | |
| settings = load_settings() | |
| RUNTIME_ROOT.mkdir(parents=True, exist_ok=True) | |
| disk = shutil.disk_usage(RUNTIME_ROOT) | |
| return { | |
| "ok": True, | |
| "devices": detect_devices(), | |
| "quantizations": quantization_catalog(), | |
| "gpu_layer_presets": gpu_layer_presets(), | |
| "context_length_presets": context_length_presets(), | |
| "runtime_root": str(RUNTIME_ROOT), | |
| "free_disk_gb": round(disk.free / 1024**3, 1), | |
| "current": { | |
| "minicpm_quantization": settings.minicpm_quantization or "MiniCPM-o-4_5-Q4_K_M.gguf", | |
| "minicpm_gpu_device": settings.minicpm_gpu_device or "auto", | |
| "llamacpp_gpu_layers": settings.llamacpp_gpu_layers or "auto", | |
| "llamacpp_context_length": settings.llamacpp_context_length, | |
| }, | |
| } | |
| def _local_runtime_paths() -> dict[str, Path]: | |
| return { | |
| "comni": RUNTIME_ROOT / "MiniCPM-o-Demo", | |
| "llama": RUNTIME_ROOT / "llama.cpp-omni", | |
| "models": RUNTIME_ROOT / "models" / "MiniCPM-o-4_5-gguf", | |
| } | |
| def _read_setup_status() -> dict[str, Any]: | |
| path = RUNTIME_ROOT / "setup_status.json" | |
| if not path.exists(): | |
| return {} | |
| try: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError): | |
| return {} | |
| def _setup_pid_running() -> bool: | |
| lock_path = RUNTIME_ROOT / "setup.worker.lock" | |
| if lock_path.exists() and os.name == "nt": | |
| import msvcrt | |
| handle = lock_path.open("r+b") | |
| try: | |
| handle.seek(0) | |
| msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1) | |
| msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1) | |
| except OSError: | |
| handle.close() | |
| return True | |
| handle.close() | |
| path = RUNTIME_ROOT / "setup.pid" | |
| if not path.exists(): | |
| return False | |
| try: | |
| pid = int(path.read_text(encoding="ascii").strip()) | |
| if os.name == "nt": | |
| import ctypes | |
| handle = ctypes.windll.kernel32.OpenProcess(0x1000, False, pid) | |
| if not handle: | |
| raise OSError(f"Process {pid} is not running.") | |
| ctypes.windll.kernel32.CloseHandle(handle) | |
| else: | |
| os.kill(pid, 0) | |
| return True | |
| except (OSError, SystemError, ValueError): | |
| path.unlink(missing_ok=True) | |
| return False | |
| def _local_llama_server(root: Path) -> Path | None: | |
| candidates = ( | |
| root / "build" / "bin" / "Release" / "llama-omni-server.exe", | |
| root / "build" / "bin" / "llama-omni-server.exe", | |
| root / "build" / "bin" / "llama-omni-server", | |
| root / "build" / "bin" / "Release" / "llama-server.exe", | |
| root / "build" / "bin" / "llama-server.exe", | |
| root / "build" / "bin" / "llama-server", | |
| ) | |
| return next((path for path in candidates if path.exists()), None) | |
| def _local_comni_python(root: Path) -> Path: | |
| if os.name == "nt": | |
| return root / ".venv" / "base" / "Scripts" / "python.exe" | |
| return root / ".venv" / "base" / "bin" / "python" | |
| def _configure_local_runtime(scan: dict[str, Any]) -> None: | |
| paths = _local_runtime_paths() | |
| models = scan.get("models", []) | |
| if not models: | |
| return | |
| current = load_settings() | |
| on_disk = {item["filename"]: item for item in models} | |
| # Honor the picker's choice if the file is on disk; otherwise fall back to | |
| # Q4_K_M (the default), then to whatever's available. | |
| preferred = ( | |
| on_disk.get(current.minicpm_quantization) | |
| or next((item for item in models if "Q4_K_M" in item["filename"]), models[0]) | |
| ) | |
| updates = { | |
| "PHANTOM_GRID_OMNI_LAUNCHER_PATH": "scripts/launch_minicpm_omni.py", | |
| "PHANTOM_GRID_COMNI_CHECKOUT_PATH": "runtime/MiniCPM-o-Demo", | |
| "PHANTOM_GRID_LLAMACPP_OMNI_ROOT": "runtime/llama.cpp-omni", | |
| "PHANTOM_GRID_MINICPM_MODEL_DIR": "runtime/models/MiniCPM-o-4_5-gguf", | |
| "PHANTOM_GRID_MINICPM_QUANTIZATION": preferred["filename"], | |
| } | |
| if ( | |
| current.comni_checkout_path == paths["comni"] | |
| and current.llamacpp_omni_root == paths["llama"] | |
| and current.minicpm_model_dir == paths["models"] | |
| and current.minicpm_quantization == preferred["filename"] | |
| ): | |
| return | |
| _write_env_updates(updates) | |
| os.environ.update(updates) | |
| def game_snapshot(game_id: str | None = None) -> dict[str, Any]: | |
| state = _state_for(game_id, required=False) | |
| return _snapshot(state, [], None, "Ready.") | |
| def nearest_junction_for_point(x: int, y: int, max_distance: int = MAP_CLICK_RADIUS) -> int | None: | |
| best_id: int | None = None | |
| best_distance = float(max_distance) | |
| for junction in _junction_records(): | |
| distance = math.dist((x, y), (int(junction["x"]), int(junction["y"]))) | |
| if distance <= best_distance: | |
| best_id = int(junction["id"]) | |
| best_distance = distance | |
| return best_id | |
| def junctions_for_drag_path(points: list[dict[str, int]], max_distance: int = MAP_CLICK_RADIUS) -> list[int]: | |
| selected: list[int] = [] | |
| for point in points: | |
| x = _optional_int(point.get("x")) | |
| y = _optional_int(point.get("y")) | |
| if x is None or y is None: | |
| continue | |
| for junction in _junction_records(): | |
| junction_id = int(junction["id"]) | |
| if junction_id in selected: | |
| continue | |
| if math.dist((x, y), (int(junction["x"]), int(junction["y"]))) <= max_distance: | |
| selected.append(junction_id) | |
| return selected | |
| def toggle_junction_selection(current: list[int], junction_id: int) -> list[int]: | |
| clean = _valid_junctions(current) | |
| valid = _valid_junction(junction_id) | |
| if valid is None: | |
| return clean | |
| if valid in clean: | |
| return [item for item in clean if item != valid] | |
| return [*clean, valid] | |
| def _snapshot( | |
| state: GameState | None, | |
| selected_junctions: list[int] | None = None, | |
| focused_junction: int | None = None, | |
| event: str = "", | |
| sound: str | None = None, | |
| ) -> dict[str, Any]: | |
| selected, focused = _selection_context(selected_junctions, focused_junction) | |
| return { | |
| "ok": True, | |
| "event": event, | |
| "sound": sound, | |
| "game": _visible_game_state(state), | |
| "case_introduction": state.case_introduction if state else None, | |
| "map": _map_payload(), | |
| "selection": { | |
| "junctions": selected, | |
| "focused": focused, | |
| "legal_moves": _legal_moves_payload(focused, state), | |
| }, | |
| "lookout": _lookout_payload(state), | |
| "witness_locations": _witness_locations(state), | |
| "witness_cards": _witness_cards(state), | |
| "previous_statements": _previous_statements(state), | |
| "active_blocks": _active_blocks_payload(state), | |
| "placed_tactics": _placed_tactics_payload(state), | |
| "tactic_counts": _tactic_counts_payload(state), | |
| "events": _public_events(state), | |
| "asset_prompts": _asset_prompts(), | |
| "notes": state.user_notes if state else "", | |
| "last_notice_text": state.last_notice_text if state else DEFAULT_NOTICE, | |
| "story_available": bool(state and (state.result or state.finalized_reason)), | |
| } | |
| def _visible_game_state(state: GameState | None) -> dict[str, Any] | None: | |
| if state is None: | |
| return None | |
| confirmed_sightings = [ | |
| sighting for sighting in state.case_introduction.get("last_seen", []) | |
| if sighting.get("confidence") == "confirmed" | |
| ] | |
| last_seen = confirmed_sightings[-1] if confirmed_sightings else None | |
| return { | |
| "game_id": state.game_id, | |
| "turn": state.turn_number, | |
| "max_turns": state.max_turns, | |
| "phase": state.phase, | |
| "result": state.result, | |
| "checks_remaining": checks_remaining_this_turn(state.turn_number, state.junction_checks), | |
| "notices": len(state.notices), | |
| "witness_batches": len(state.witness_batches), | |
| "initial_description": state.initial_description, | |
| "suspect_image": state.case_introduction.get("suspect_image", "/assets/suspect"), | |
| "last_seen": last_seen, | |
| "finalized_reason": state.finalized_reason, | |
| "effective_context_length": state.effective_context_length, | |
| } | |
| def _map_payload() -> dict[str, Any]: | |
| metadata = load_map_metadata() | |
| return { | |
| "layers": list(metadata.get("images", {}).keys()), | |
| "default_layer": "normal", | |
| "junctions": _junction_records(), | |
| "atlas": public_atlas_payload(), | |
| } | |
| def _legal_moves_payload(focused_junction: int | None, state: GameState | None) -> list[dict[str, Any]]: | |
| if focused_junction is None: | |
| return [] | |
| blocks = [asdict(block) for block in state.active_blocks] if state else None | |
| return [ | |
| { | |
| "destination": move.destination, | |
| "mode": move.mode, | |
| "blocked": move.blocked, | |
| "label": f"J{focused_junction} to J{move.destination} by {move.mode}", | |
| } | |
| for move in legal_moves_from(focused_junction, blocks) | |
| ] | |
| def _lookout_payload(state: GameState | None) -> dict[str, Any]: | |
| if state is None: | |
| return {"raised": False, "witness_count": 0, "review_allowed": False, "notice": None} | |
| batch = next((item for item in reversed(state.witness_batches) if item.notice_id.startswith("notice_")), None) | |
| if batch is None: | |
| return {"raised": False, "witness_count": 0, "review_allowed": False, "notice": None} | |
| notice = next((item for item in state.notices if item.notice_id == batch.notice_id), None) | |
| return { | |
| "raised": True, | |
| "witness_count": batch.total_witnesses, | |
| "review_allowed": batch.individual_review_allowed, | |
| "notice": notice.text if notice else "", | |
| "parsed_location": notice.parsed_location if notice else "", | |
| } | |
| def _witness_locations(state: GameState | None) -> list[dict[str, Any]]: | |
| if state is None or not state.witness_batches: | |
| return [] | |
| distribution: dict[int, dict[str, Any]] = {} | |
| for batch in state.witness_batches: | |
| for witness in batch.witnesses: | |
| location = distribution.setdefault( | |
| witness.junction_id, | |
| { | |
| "junction_id": witness.junction_id, | |
| "count": 0, | |
| "reports": [], | |
| "inspectable": False, | |
| "sample_witness_id": witness.witness_id, | |
| "sample_style": witness.personality.get("style", "witness"), | |
| "sample_summary": witness.current_summary, | |
| "sample_relevance": witness.relevance_score, | |
| "viewed": False, | |
| }, | |
| ) | |
| location["count"] += 1 | |
| location["reports"].append( | |
| { | |
| "id": witness.witness_id, | |
| "viewed": witness.witness_id in state.viewed_witness_ids, | |
| "style": witness.personality.get("style", "witness"), | |
| "summary": witness.current_summary, | |
| "relevance": witness.relevance_score, | |
| "name": witness.name, | |
| "occupation": witness.occupation, | |
| "observed_turn": witness.turn_created, | |
| } | |
| ) | |
| location["inspectable"] = location["inspectable"] or batch.individual_review_allowed | |
| is_viewed = witness.witness_id in state.viewed_witness_ids | |
| location["viewed"] = location["viewed"] or is_viewed | |
| if witness.relevance_score > location["sample_relevance"]: | |
| location["sample_witness_id"] = witness.witness_id | |
| location["sample_style"] = witness.personality.get("style", "witness") | |
| location["sample_summary"] = witness.current_summary | |
| location["sample_relevance"] = witness.relevance_score | |
| return [ | |
| distribution[junction_id] | |
| for junction_id in sorted(distribution) | |
| ] | |
| def _witness_cards(state: GameState | None) -> list[dict[str, Any]]: | |
| if state is None: | |
| return [] | |
| cards: list[dict[str, Any]] = [] | |
| for batch in state.witness_batches: | |
| if not batch.individual_review_allowed: | |
| continue | |
| for witness in batch.witnesses: | |
| cards.append( | |
| { | |
| "id": witness.witness_id, | |
| "junction_id": witness.junction_id, | |
| "reliability": witness.reliability, | |
| "memory": witness.memory_strength, | |
| "relevance": witness.relevance_score, | |
| "style": witness.personality.get("style", "witness"), | |
| "name": witness.name, | |
| "occupation": witness.occupation, | |
| "voice_id": witness.voice_id, | |
| "summary": witness.current_summary, | |
| "questions": [asdict(question) for question in witness.question_history[-2:]], | |
| "viewed": witness.witness_id in state.viewed_witness_ids, | |
| "observed_turn": witness.turn_created, | |
| } | |
| ) | |
| return cards[-18:] | |
| def _previous_statements(state: GameState | None) -> list[dict[str, Any]]: | |
| if state is None: | |
| return [] | |
| statements: list[dict[str, Any]] = [] | |
| for batch in state.witness_batches: | |
| for witness in batch.witnesses: | |
| if witness.witness_id not in state.viewed_witness_ids or not witness.question_history: | |
| continue | |
| latest = witness.question_history[-1] | |
| statements.append( | |
| { | |
| "id": witness.witness_id, | |
| "turn": latest.turn_number, | |
| "junction_id": witness.junction_id, | |
| "time_label": _time_label(latest.turn_number), | |
| "summary": witness.current_summary, | |
| "question": latest.question, | |
| "answer": latest.answer, | |
| "viewed": True, | |
| "observed_turn": witness.turn_created, | |
| } | |
| ) | |
| return statements[-8:] | |
| def _active_blocks_payload(state: GameState | None) -> list[dict[str, Any]]: | |
| if state is None: | |
| return [] | |
| blocks: list[dict[str, Any]] = [] | |
| for block in state.active_blocks: | |
| if block.block_type == "edge_block": | |
| label = f"J{block.from_junction} to J{block.to_junction}" | |
| elif block.block_type == "mode_block": | |
| label = f"{block.mode} near J{block.junction_id or 'all'}" | |
| else: | |
| label = f"J{block.junction_id}" | |
| blocks.append({**asdict(block), "label": label}) | |
| return blocks | |
| def _placed_tactics_payload(state: GameState | None) -> list[dict[str, Any]]: | |
| if state is None: | |
| return [] | |
| return [asdict(tactic) for tactic in state.placed_tactics] | |
| def _tactic_counts_payload(state: GameState | None) -> dict[str, Any]: | |
| placed_counts = {key: 0 for key in TACTIC_LIMITS} | |
| if state is not None: | |
| for tactic in state.placed_tactics: | |
| if tactic.tactic_type in placed_counts: | |
| placed_counts[tactic.tactic_type] += 1 | |
| remaining = { | |
| key: max(limit - placed_counts.get(key, 0), 0) | |
| for key, limit in TACTIC_LIMITS.items() | |
| } | |
| return { | |
| "limits": TACTIC_LIMITS, | |
| "placed": placed_counts, | |
| "remaining": remaining, | |
| "total_limit": sum(TACTIC_LIMITS.values()), | |
| "total_remaining": sum(remaining.values()), | |
| } | |
| def _public_events(state: GameState | None) -> list[dict[str, Any]]: | |
| if state is None: | |
| return [] | |
| return [ | |
| entry | |
| for entry in state.game_log[-12:] | |
| if entry.get("kind") != "culprit_move_private" | |
| ][-6:] | |
| def _asset_prompts() -> dict[str, str]: | |
| return { | |
| "case_table_background": "top-down view of a moody London detective desk, paper map, pins, string, chalk dust, warm lamp light, stylized game UI background, no text", | |
| "suspect_placeholder": "anonymous noir suspect silhouette in a grey raincoat holding a red folder, graphic novel style, transparent background, no text", | |
| "witness_card_set": "four small portrait cards of London street witnesses, varied ages and moods, 1930s detective board style, consistent illustration style, no text", | |
| "lookout_board_texture": "green-black chalkboard with faint chalk smudges and taped paper edges, game UI texture, no readable text", | |
| "map_select": "short tactile wooden token tap on a board, warm room tone, 0.3 seconds", | |
| "blockade_set": "metal stamp clack with soft paper thud, detective office, 0.5 seconds", | |
| "lookout_raise": "chalk scrape and corkboard paper rustle, subtle, 0.8 seconds", | |
| "witness_popup": "quick paper card flick with faint bell, playful noir, 0.4 seconds", | |
| "turn_advance": "old clock tick plus distant city ambience swell, 1 second", | |
| } | |
| def _settings_payload(settings) -> dict[str, Any]: | |
| return { | |
| "llm_provider": settings.llm_provider, | |
| "llm_model": settings.llm_model, | |
| "llamacpp_model_path": str(settings.llamacpp_model_path or ""), | |
| "llamacpp_model_exists": bool(settings.llamacpp_model_path and settings.llamacpp_model_path.exists()), | |
| "llamacpp_server_bin": str(settings.llamacpp_server_bin or ""), | |
| "llamacpp_server_bin_exists": bool(settings.llamacpp_server_bin and settings.llamacpp_server_bin.exists()), | |
| "llamacpp_base_url": settings.llamacpp_base_url, | |
| "difficulty": os.getenv("PHANTOM_GRID_DIFFICULTY", _difficulty_from_settings(settings)), | |
| "max_turns": settings.max_turns, | |
| "checks_per_turn": settings.checks_per_turn, | |
| "memory_corruption_per_turn": settings.memory_corruption_per_turn, | |
| "omni_gateway_url": settings.omni_gateway_url, | |
| "omni_launcher_path": str(settings.omni_launcher_path or ""), | |
| "omni_launcher_exists": bool(settings.omni_launcher_path and settings.omni_launcher_path.exists()), | |
| "comni_checkout_path": str(settings.comni_checkout_path or ""), | |
| "llamacpp_omni_root": str(settings.llamacpp_omni_root or ""), | |
| "minicpm_model_dir": str(settings.minicpm_model_dir or ""), | |
| "minicpm_quantization": settings.minicpm_quantization, | |
| "llamacpp_context_length": settings.llamacpp_context_length, | |
| "llamacpp_gpu_layers": settings.llamacpp_gpu_layers, | |
| "minicpm_gpu_device": settings.minicpm_gpu_device, | |
| "witness_chat_tts": settings.witness_chat_tts, | |
| "witness_voice_dir": str(settings.witness_voice_dir), | |
| } | |
| def _difficulty_from_settings(settings) -> str: | |
| if settings.max_turns >= 16 or settings.checks_per_turn >= 3: | |
| return "easy" | |
| if settings.max_turns <= 10 or settings.checks_per_turn <= 1: | |
| return "hard" | |
| return "normal" | |
| def _llama_status(settings, health: dict[str, Any] | None = None) -> dict[str, Any]: | |
| global _LLAMA_PROCESS | |
| if _LLAMA_PROCESS is not None and _LLAMA_PROCESS.poll() is not None: | |
| _LLAMA_PROCESS = None | |
| health = health or OmniClient(settings).health() | |
| managed = bool(settings.llm_provider != "external_llama_cpp_server" and _LLAMA_PROCESS is not None) | |
| return { | |
| "managed_process": managed, | |
| "pid": _LLAMA_PROCESS.pid if managed else None, | |
| "reachable": health.get("reachable", False), | |
| "ready": health.get("ready", False), | |
| "detail": health.get("detail"), | |
| } | |
| def _start_llama_process(settings) -> dict[str, Any]: | |
| global _LLAMA_PROCESS | |
| # The ZeroGPU backend is in-process — there is no subprocess to spawn. | |
| if settings.llm_provider == "zerogpu_transformers": | |
| return {"ok": True, "event": "ZeroGPU backend runs in-process; no llama subprocess is needed."} | |
| if _LLAMA_PROCESS is not None and _LLAMA_PROCESS.poll() is None: | |
| return {"ok": True, "event": f"The selected AI backend is already managed as PID {_LLAMA_PROCESS.pid}."} | |
| if settings.llm_provider == "external_llama_cpp_server": | |
| return {"ok": False, "event": "External llama.cpp is user-managed and cannot be started by Phantom Grid."} | |
| if settings.llm_provider == "llama_cpp_server": | |
| if not settings.llamacpp_server_bin or not settings.llamacpp_server_bin.is_file(): | |
| return {"ok": False, "event": "Set a valid llama-server executable before starting."} | |
| if not settings.llamacpp_model_path or not settings.llamacpp_model_path.is_file(): | |
| return {"ok": False, "event": "Set a valid GGUF model path before starting."} | |
| gpu_layers = "999" if settings.llamacpp_gpu_layers == "auto" else settings.llamacpp_gpu_layers | |
| args = [ | |
| str(settings.llamacpp_server_bin), "-m", str(settings.llamacpp_model_path), | |
| "--host", "127.0.0.1", "--port", str(_port_from_base_url(settings.llamacpp_base_url)), | |
| "-c", str(settings.llamacpp_context_length), "-ngl", gpu_layers, | |
| ] | |
| env = os.environ.copy() | |
| env.update(resolve_device_env(settings.minicpm_gpu_device or "auto", settings.llamacpp_gpu_layers or "auto")) | |
| try: | |
| _LLAMA_PROCESS = subprocess.Popen( | |
| args, | |
| cwd=str(settings.llamacpp_model_path.parent), | |
| env=env, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0, | |
| ) | |
| except OSError as exc: | |
| return {"ok": False, "event": f"Could not start llama.cpp: {exc}"} | |
| return {"ok": True, "event": f"llama.cpp started {settings.llamacpp_model_path.name} as PID {_LLAMA_PROCESS.pid}."} | |
| if not settings.omni_launcher_path or not settings.omni_launcher_path.exists(): | |
| return {"ok": False, "event": "Set a valid Comni launcher path before starting."} | |
| if not settings.comni_checkout_path or not settings.comni_checkout_path.exists(): | |
| return {"ok": False, "event": "Set a valid OpenBMB Comni checkout directory before starting."} | |
| if not settings.llamacpp_omni_root or not settings.llamacpp_omni_root.exists(): | |
| return {"ok": False, "event": "Set a valid llama.cpp-omni root directory before starting."} | |
| scan = scan_minicpm_models(settings.minicpm_model_dir) | |
| valid_names = {item["filename"] for item in scan.get("models", [])} | |
| if settings.minicpm_quantization not in valid_names: | |
| return {"ok": False, "event": "Select a detected MiniCPM-o quantization before starting."} | |
| if not scan.get("complete"): | |
| return {"ok": False, "event": "The MiniCPM-o model directory is missing required audio/TTS companion GGUF modules."} | |
| launcher = settings.omni_launcher_path | |
| suffix = launcher.suffix.lower() | |
| if suffix == ".ps1": | |
| args = ["powershell", "-ExecutionPolicy", "Bypass", "-File", str(launcher)] | |
| elif suffix in {".bat", ".cmd"}: | |
| args = ["cmd", "/c", str(launcher)] | |
| elif suffix == ".py": | |
| args = [sys.executable, str(launcher)] | |
| else: | |
| args = [str(launcher)] | |
| env = os.environ.copy() | |
| env.update({ | |
| "MINICPM_MODEL_DIR": str(settings.minicpm_model_dir or ""), | |
| "MINICPM_LLM_MODEL": settings.minicpm_quantization, | |
| "MINICPM_CTX_SIZE": str(settings.llamacpp_context_length), | |
| "MINICPM_N_GPU_LAYERS": settings.llamacpp_gpu_layers, | |
| "MINICPM_GPU_DEVICE": settings.minicpm_gpu_device or "auto", | |
| "MINICPM_LLAMACPP_ROOT": str(settings.llamacpp_omni_root or ""), | |
| "MINICPM_GATEWAY_URL": settings.omni_gateway_url, | |
| "MINICPM_COMNI_ROOT": str(settings.comni_checkout_path or ""), | |
| "MINICPM_COMNI_PYTHON": str(_local_comni_python(settings.comni_checkout_path)) if settings.comni_checkout_path else "", | |
| }) | |
| env.update(resolve_device_env(settings.minicpm_gpu_device or "auto", settings.llamacpp_gpu_layers or "auto")) | |
| try: | |
| _LLAMA_PROCESS = subprocess.Popen( | |
| args, | |
| cwd=str(launcher.parent), | |
| env=env, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0, | |
| ) | |
| except OSError as exc: | |
| return {"ok": False, "event": f"Could not start MiniCPM-o: {exc}"} | |
| return {"ok": True, "event": f"MiniCPM-o stack launcher started as PID {_LLAMA_PROCESS.pid}."} | |
| def _stop_llama_process() -> None: | |
| global _LLAMA_PROCESS | |
| if _LLAMA_PROCESS is None: | |
| return | |
| if _LLAMA_PROCESS.poll() is None: | |
| if os.name == "nt": | |
| subprocess.run( | |
| ["taskkill", "/PID", str(_LLAMA_PROCESS.pid), "/T", "/F"], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| check=False, | |
| ) | |
| else: | |
| _LLAMA_PROCESS.terminate() | |
| try: | |
| _LLAMA_PROCESS.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| _LLAMA_PROCESS.kill() | |
| _LLAMA_PROCESS = None | |
| def _port_from_base_url(base_url: str) -> int: | |
| try: | |
| from urllib.parse import urlparse | |
| parsed = urlparse(base_url) | |
| return parsed.port or 8080 | |
| except ValueError: | |
| return 8080 | |
| def _require_omni_ready() -> None: | |
| health = OmniClient.from_settings().health() | |
| if not health.get("ready"): | |
| raise HTTPException(status_code=503, detail="The selected AI backend is unavailable. Start or retry it in Settings.") | |
| def _voice_path(voice_id: str) -> Path | None: | |
| if not voice_id.startswith("voice_") or not voice_id[6:].isdigit(): | |
| return None | |
| root = load_settings().witness_voice_dir.resolve() | |
| candidate = (root / f"{voice_id}.wav").resolve() | |
| if candidate.parent != root or not candidate.exists(): | |
| return None | |
| return candidate | |
| def _write_env_updates(updates: dict[str, str]) -> None: | |
| env_path = PROJECT_ROOT / ".env" | |
| existing: dict[str, str] = {} | |
| order: list[str] = [] | |
| if env_path.exists(): | |
| for raw_line in env_path.read_text(encoding="utf-8").splitlines(): | |
| if not raw_line.strip() or raw_line.strip().startswith("#") or "=" not in raw_line: | |
| continue | |
| key, value = raw_line.split("=", 1) | |
| key = key.strip() | |
| existing[key] = value.strip().strip('"').strip("'") | |
| order.append(key) | |
| existing.update(updates) | |
| for key in updates: | |
| if key not in order: | |
| order.append(key) | |
| lines = [f"{key}={existing[key]}" for key in order if key in existing] | |
| env_path.write_text("\n".join(lines) + "\n", encoding="utf-8") | |
| def _junction_by_id(junction_id: int) -> dict[str, Any] | None: | |
| return next((junction for junction in _junction_records() if int(junction["id"]) == junction_id), None) | |
| def _time_label(turn_number: int) -> str: | |
| labels = ["morning", "midday", "afternoon", "evening", "night"] | |
| return labels[(turn_number - 1) % len(labels)] | |
| def _state_for(game_id: str | None, required: bool = True) -> GameState | None: | |
| if not game_id: | |
| if required: | |
| raise HTTPException(status_code=400, detail="Start a case first.") | |
| return None | |
| state = _SESSIONS.get(game_id) | |
| if state is None: | |
| try: | |
| state = load_state(game_id) | |
| _SESSIONS[game_id] = state | |
| except (FileNotFoundError, KeyError, TypeError, ValueError): | |
| state = None | |
| if state is None and required: | |
| raise HTTPException(status_code=404, detail="Case not found. Start a new case.") | |
| if state is not None and ensure_case_introduction(state): | |
| persist(state) | |
| return state | |
| def _selection_context( | |
| selected_junctions: list[int] | None, | |
| focused_junction: int | None, | |
| ) -> tuple[list[int], int | None]: | |
| selected = _valid_junctions(selected_junctions or []) | |
| focused = _valid_junction(focused_junction) | |
| if focused is None and selected: | |
| focused = selected[-1] | |
| if focused is not None and focused not in selected: | |
| selected = [*selected, focused] | |
| return selected, focused | |
| def _ordered_check_targets(selected_junctions: list[int], focused_junction: int | None) -> list[int]: | |
| targets: list[int] = [] | |
| if focused_junction is not None: | |
| targets.append(focused_junction) | |
| for junction_id in selected_junctions: | |
| if junction_id not in targets: | |
| targets.append(junction_id) | |
| return targets | |
| def _valid_junctions(junctions: list[int]) -> list[int]: | |
| valid_ids = set(all_junction_ids()) | |
| clean: list[int] = [] | |
| for raw in junctions: | |
| junction_id = _optional_int(raw) | |
| if junction_id in valid_ids and junction_id not in clean: | |
| clean.append(junction_id) | |
| return clean | |
| def _valid_junction(junction_id: int | None) -> int | None: | |
| parsed = _optional_int(junction_id) | |
| if parsed in set(all_junction_ids()): | |
| return parsed | |
| return None | |
| def _selection_event(selected_junctions: list[int], focused_junction: int | None) -> str: | |
| if focused_junction is None: | |
| return "No junction selected." | |
| count = len(selected_junctions) | |
| return f"J{focused_junction} focused. {count} selected." | |
| def _notice_with_selected_junction(notice_text: str, selected_junction: int | None) -> str: | |
| if selected_junction is None: | |
| return notice_text.replace("selected junction", "the search area") | |
| return notice_text.replace("selected junction", f"Junction {selected_junction}") | |
| def _clean_turns(turns: int | str | None) -> int: | |
| parsed = _optional_int(turns) | |
| if parsed is None: | |
| return 1 | |
| return min(max(parsed, 1), 3) | |
| def _junction_records() -> list[dict[str, Any]]: | |
| settings = load_settings() | |
| data = read_json(settings.junction_registry_path) | |
| atlas = public_atlas_payload() | |
| places = [*atlas.get("districts", []), *atlas.get("landmarks", [])] | |
| records: list[dict[str, Any]] = [] | |
| for junction in data.get("junctions", []): | |
| enriched = dict(junction) | |
| enriched["nearest_landmarks"] = [ | |
| { | |
| "id": place.get("id"), | |
| "name": place.get("name"), | |
| "category": place.get("category"), | |
| } | |
| for place in places | |
| if int(junction["id"]) in { | |
| *place.get("junction_ids", []), | |
| *place.get("nearby_junction_ids", []), | |
| *([place["junction_id"]] if place.get("junction_id") is not None else []), | |
| } | |
| ] | |
| records.append(enriched) | |
| return records | |
| def _optional_int(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _case_state_text(state: GameState) -> str: | |
| remaining = max(state.max_turns - state.turn_number + 1, 0) | |
| checks_used = sum(1 for check in state.junction_checks if check.turn_number == state.turn_number) | |
| return "\n".join( | |
| [ | |
| f"Game: {state.game_id}", | |
| f"Turn: {state.turn_number} / {state.max_turns}", | |
| f"Turns remaining: {remaining}", | |
| f"Phase: {state.phase}", | |
| f"Result: {state.result or 'in progress'}", | |
| f"Initial description: {state.initial_description}", | |
| f"Checks used this turn: {checks_used}", | |
| f"Notices issued: {len(state.notices)}", | |
| f"Witness batches: {len(state.witness_batches)}", | |
| ] | |
| ) | |
| def _witness_batches_text(state: GameState) -> str: | |
| if not state.witness_batches: | |
| return "No witness batches yet." | |
| lines: list[str] = [] | |
| for batch in state.witness_batches[-4:]: | |
| notice = next((notice for notice in state.notices if notice.notice_id == batch.notice_id), None) | |
| lines.append(f"{batch.batch_id}: {batch.total_witnesses} witnesses") | |
| if notice: | |
| lines.append(f"Notice: {notice.text}") | |
| lines.append(f"Parsed location: {notice.parsed_location}") | |
| lines.append("Individual review: " + ("available" if batch.individual_review_allowed else "unavailable")) | |
| return "\n".join(lines).strip() | |
| def _active_blocks_text(state: GameState) -> str: | |
| if not state.active_blocks: | |
| return "No active blocks." | |
| return "\n".join( | |
| f"{block.block_id}: {block.block_type}, mode={block.mode or 'any'}, junction={block.junction_id}, edge={block.from_junction}->{block.to_junction}, turns={block.turns_remaining}" | |
| for block in state.active_blocks | |
| ) | |
| def _game_log_text(state: GameState) -> str: | |
| return "\n".join(f"T{entry['turn_number']} {entry['kind']}: {entry['message']}" for entry in state.game_log[-12:]) | |
| # Hugging Face Spaces' Gradio SDK runner imports this module and looks for a | |
| # top-level `demo` (gr.Blocks / gr.Server) to launch. Locally we still gate the | |
| # manual launch behind __main__ so `python app.py` works as before. | |
| demo = build_app() | |
| # The HF Gradio SDK runs `gradio app.py` in hot-reload mode, which calls | |
| # `demo.launch()` with hardcoded server_name=127.0.0.1 and ignores the | |
| # GRADIO_SERVER_NAME env var. On HF Spaces the reverse proxy expects the app | |
| # on 0.0.0.0:7860, so loopback is unreachable. Wrap launch() to force the | |
| # correct bind regardless of what the CLI passes in. | |
| if os.getenv("SPACE_ID"): | |
| _project_root = str(PROJECT_ROOT) | |
| _orig_launch = demo.launch | |
| def _hf_forced_launch(**kwargs): | |
| kwargs["server_name"] = "0.0.0.0" | |
| kwargs["server_port"] = int(os.getenv("PORT") or "7860") | |
| kwargs.setdefault("allowed_paths", [_project_root]) | |
| return _orig_launch(**kwargs) | |
| demo.launch = _hf_forced_launch | |
| if __name__ == "__main__": | |
| # Bind address/port are env-overridable so the same entrypoint works locally | |
| # (default loopback) and inside a container / Hugging Face Space, where the | |
| # app must listen on 0.0.0.0 and the platform supplies the port. | |
| host = os.getenv("PHANTOM_GRID_HOST", "127.0.0.1") | |
| port = int(os.getenv("PORT") or os.getenv("PHANTOM_GRID_PORT") or "7860") | |
| demo.launch(server_name=host, server_port=port, allowed_paths=[str(PROJECT_ROOT)]) | |