Spaces:
Sleeping
Sleeping
| """ | |
| Flask Backend for Love Live Card Game Web UI | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import sys | |
| import threading | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from typing import Any, Dict | |
| import numpy as np | |
| from flask import Flask, jsonify, request, send_from_directory | |
| from flask.json.provider import DefaultJSONProvider | |
| # Ensure project root is in sys.path for absolute imports | |
| if getattr(sys, "frozen", False): | |
| PROJECT_ROOT = sys._MEIPASS # type: ignore | |
| CURRENT_DIR = os.path.join(PROJECT_ROOT, "backend") | |
| else: | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "..")) | |
| if PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, PROJECT_ROOT) | |
| # Rust Engine | |
| import engine_rust | |
| try: | |
| from ai.headless_runner import RandomAgent, create_easy_cards | |
| from ai.headless_runner import SmartHeuristicAgent as SmartAgent | |
| AI_AVAILABLE = True | |
| except ImportError: | |
| print("Warning: AI modules not found. AI features will be disabled.") | |
| AI_AVAILABLE = False | |
| class RandomAgent: | |
| pass | |
| class SmartAgent: | |
| pass | |
| def create_easy_cards(): | |
| return None, None | |
| from engine.game.data_loader import CardDataLoader | |
| from engine.game.deck_utils import UnifiedDeckParser | |
| from engine.game.desc_utils import get_action_desc | |
| from engine.game.enums import Phase | |
| from engine.game.game_state import GameState | |
| from engine.game.replay_manager import inflate_history, optimize_history | |
| from engine.game.serializer import serialize_state | |
| from engine.game.state_utils import create_uid | |
| try: | |
| from rust_serializer import RustCompatGameState, RustGameStateSerializer | |
| except ImportError: | |
| from backend.rust_serializer import RustCompatGameState, RustGameStateSerializer | |
| INSTANCE_SHIFT = 20 | |
| BASE_ID_MASK = 0xFFFFF | |
| # --- MODULE DIRECTORIES --- | |
| ENGINE_DIR = os.path.join(PROJECT_ROOT, "engine") | |
| AI_DIR = os.path.join(PROJECT_ROOT, "ai") | |
| TOOLS_DIR = os.path.join(PROJECT_ROOT, "tools") | |
| DATA_DIR = os.path.join(PROJECT_ROOT, "data") | |
| # Tools imports (optional) | |
| try: | |
| from tools.deck_extractor import extract_deck_data | |
| except ImportError: | |
| print("Warning: Could not import deck_extractor from tools.") | |
| def extract_deck_data(content, db): | |
| return [], [], {}, ["Importer not found"] | |
| # Static folder is now in frontend/web_ui | |
| FRONTEND_DIR = os.path.join(PROJECT_ROOT, "frontend") | |
| WEB_UI_DIR = os.path.join(FRONTEND_DIR, "web_ui") | |
| IMG_DIR = os.path.join(FRONTEND_DIR, "img") # Images seem to be in frontend/img | |
| # Note: frontend/web_ui has its own js/css folders which index.html likely uses | |
| app = Flask(__name__, static_folder=WEB_UI_DIR) | |
| class NumpyJSONProvider(DefaultJSONProvider): | |
| def default(self, obj): | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.bool_): | |
| return bool(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| return super().default(obj) | |
| app.json = NumpyJSONProvider(app) | |
| def serve_img(filename): | |
| # Sanitize and normalize the filename | |
| filename = filename.replace("\\", "/").lstrip("/") | |
| # Check if this is a card image request | |
| if filename.startswith("cards/") or filename.startswith("cards_webp/"): | |
| # Remove old nested 'cards/' prefix if it's there | |
| pure_filename = os.path.basename(filename) | |
| webp_path = os.path.join(IMG_DIR, "cards_webp", pure_filename) | |
| # Priority 1: Flat WebP folder | |
| if os.path.exists(webp_path) and os.path.isfile(webp_path): | |
| return send_from_directory(os.path.join(IMG_DIR, "cards_webp"), pure_filename) | |
| # Priority 2: Try falling back to original nested PNGs for backward compatibility/backup | |
| # (This is mostly for non-compiled access or manual links) | |
| pass | |
| # Define possible search directories relative to PROJECT_ROOT | |
| search_dirs = [ | |
| os.path.join(IMG_DIR, "cards_webp"), # Flattened WebP first | |
| IMG_DIR, # frontend/img | |
| os.path.join(IMG_DIR, "texticon"), # frontend/img/texticon | |
| os.path.join(WEB_UI_DIR, "img"), # frontend/web_ui/img | |
| FRONTEND_DIR, # Allow direct frontend access if needed | |
| ] | |
| for base_dir in search_dirs: | |
| full_path = os.path.join(base_dir, filename) | |
| if os.path.exists(full_path) and os.path.isfile(full_path): | |
| return send_from_directory(base_dir, filename) | |
| # Fallback for .webp requesting .png or vice-versa | |
| if filename.endswith(".webp"): | |
| png_fallback = filename[:-5] + ".png" | |
| full_png_path = os.path.join(base_dir, png_fallback) | |
| if os.path.exists(full_png_path) and os.path.isfile(full_png_path): | |
| return send_from_directory(base_dir, png_fallback) | |
| # Extra fallback for common icons if they are misplaced | |
| if filename == "icon_blade.png" or "icon_blade" in filename: | |
| # Try to find it anywhere in frontend/img | |
| for root, dirs, files in os.walk(IMG_DIR): | |
| if "icon_blade.png" in files: | |
| return send_from_directory(root, "icon_blade.png") | |
| print(f"DEBUG_IMG_404: Could not find {filename} in {search_dirs}") | |
| return "Image not found", 404 | |
| def serve_icon_root(): | |
| return serve_img("icon_blade.png") | |
| # AI agents are now instantiated per-room in create_room_internal | |
| # Global game state | |
| # Room Registry | |
| ROOMS: dict[str, dict[str, Any]] = {} | |
| game_lock = threading.Lock() | |
| # Room cleanup configuration | |
| ROOM_INACTIVE_TIMEOUT_MINUTES = 10 # Remove rooms inactive for 10 minutes | |
| ROOM_CLEANUP_INTERVAL = 60 # Run cleanup every 60 seconds (loops) | |
| # Rust Card DB (Global Singleton for performance) | |
| RUST_DB = None | |
| RUST_DB_VANILLA = None | |
| try: | |
| compiled_data_path = os.path.join(DATA_DIR, "cards_compiled.json") | |
| with open(compiled_data_path, "r", encoding="utf-8") as f: | |
| RUST_DB = engine_rust.PyCardDatabase(f.read()) | |
| except Exception as e: | |
| print(f"Warning: Failed to load RUST_DB from {compiled_data_path}: {e}") | |
| try: | |
| vanilla_data_path = os.path.join(DATA_DIR, "cards_vanilla.json") | |
| with open(vanilla_data_path, "r", encoding="utf-8") as f: | |
| RUST_DB_VANILLA = engine_rust.PyCardDatabase(f.read()) | |
| RUST_DB_VANILLA.set_is_vanilla(True) # Disable abilities in vanilla mode | |
| except Exception as e: | |
| print(f"Warning: Failed to load RUST_DB_VANILLA from {vanilla_data_path}: {e}") | |
| # Python DBs (for metadata/serialization) | |
| member_db: dict[int, Any] = {} | |
| live_db: dict[int, Any] = {} | |
| energy_db: dict[int, Any] = {} | |
| # Vanilla card databases (separate from compiled for card set selection) | |
| vanilla_member_db: dict[int, Any] = {} | |
| vanilla_live_db: dict[int, Any] = {} | |
| vanilla_energy_db: dict[int, Any] = {} | |
| rust_serializer = None # Initialized after data load | |
| # --- LEGACY GLOBALS (Single-Player / CLI Tests Only) --- | |
| # Do NOT use these in multi-room logic! | |
| game_history: list[dict] = [] | |
| action_log: list[int] = [] | |
| current_seed: int = 0 | |
| custom_deck_p0: list[str] | None = None | |
| custom_deck_p1: list[str] | None = None | |
| custom_energy_deck_p0: list[str] | None = None | |
| custom_energy_deck_p1: list[str] | None = None | |
| def load_game_data(): | |
| """Load card data into global databases (both compiled and vanilla).""" | |
| global member_db, live_db, energy_db, vanilla_member_db, vanilla_live_db, vanilla_energy_db, rust_serializer | |
| try: | |
| # Load compiled data (default/main) | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| print(f"Loading compiled card data from: {cards_path}") | |
| loader = CardDataLoader(cards_path) | |
| m, l, e = loader.load() | |
| member_db.update(m) | |
| live_db.update(l) | |
| energy_db.update(e) | |
| # Load vanilla data separately | |
| print(f"Loading vanilla card data from: data/cards_vanilla.json") | |
| try: | |
| vanilla_path = os.path.join(DATA_DIR, "cards_vanilla.json") | |
| vanilla_loader = CardDataLoader(vanilla_path) | |
| vm, vl, ve = vanilla_loader.load() | |
| vanilla_member_db.update(vm) | |
| vanilla_live_db.update(vl) | |
| vanilla_energy_db.update(ve) | |
| print(f"Vanilla data loaded: {len(vanilla_member_db)} Members, {len(vanilla_live_db)} Lives, {len(vanilla_energy_db)} Energy") | |
| except Exception as ex: | |
| print(f"Warning: Failed to load vanilla card data: {ex}") | |
| # Initialize rust_serializer with compiled data | |
| rust_serializer = RustGameStateSerializer(member_db, live_db, energy_db) | |
| # Build mappings | |
| build_card_no_mapping() | |
| print(f"Compiled data loaded: {len(member_db)} Members, {len(live_db)} Lives, {len(energy_db)} Energy") | |
| print(f"DEBUG PATHS: PROJECT_ROOT={PROJECT_ROOT}") | |
| print(f"DEBUG PATHS: FRONTEND_DIR={FRONTEND_DIR}") | |
| print(f"DEBUG PATHS: WEB_UI_DIR={WEB_UI_DIR}") | |
| print(f"DEBUG PATHS: IMG_DIR={IMG_DIR}") | |
| except Exception as ex: | |
| print(f"CRITICAL ERROR loading card data: {ex}") | |
| import sys | |
| sys.exit(1) | |
| def get_local_ip(): | |
| """Get the local IP address of this machine.""" | |
| import socket | |
| try: | |
| # Create a temporary socket to determine the local IP | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| # Doesn't even have to be reachable | |
| s.connect(("8.8.8.8", 1)) | |
| local_ip = s.getsockname()[0] | |
| s.close() | |
| return local_ip | |
| except OSError: | |
| return "127.0.0.1" | |
| def _coerce_pending_choice_params(raw_params: Any) -> Dict[str, Any]: | |
| """Normalize pending-choice params from either Rust JSON strings or Python dicts.""" | |
| if isinstance(raw_params, dict): | |
| return raw_params | |
| if not isinstance(raw_params, str): | |
| return {} | |
| try: | |
| parsed = json.loads(raw_params) | |
| except json.JSONDecodeError: | |
| return {} | |
| return parsed if isinstance(parsed, dict) else {} | |
| # Load data immediately on import | |
| def get_room_id() -> str: | |
| """Extract room_id from request header or query param.""" | |
| # Priority: Header > Query Param > Default "SINGLE_PLAYER" | |
| rid = request.headers.get("X-Room-Id") or request.args.get("room_id") | |
| if not rid: | |
| # Debug why no ID found | |
| # print(f"DEBUG: No X-Room-Id or room_id param. Headers: {request.headers}", file=sys.stderr) | |
| rid = "SINGLE_PLAYER" | |
| return rid | |
| def get_player_idx(room: dict = None): | |
| """ | |
| Extract player perspective. | |
| Priority: X-Session-Token (via room) > viewer query param > X-Player-Idx header. | |
| """ | |
| # 1. Try Session Token Lookup if room provided | |
| session_token = request.headers.get("X-Session-Token") | |
| if room and session_token: | |
| sessions = room.get("sessions", {}) | |
| if session_token in sessions: | |
| pid = sessions[session_token] | |
| if pid != -1: | |
| return pid | |
| # 2. Try query param 'viewer' (fallback/legacy) | |
| viewer = request.args.get("viewer") | |
| if viewer is not None: | |
| try: | |
| return int(viewer) | |
| except (ValueError, TypeError): | |
| pass | |
| # 3. Fallback to header | |
| try: | |
| return int(request.headers.get("X-Player-Idx", 0)) | |
| except (ValueError, TypeError): | |
| return 0 | |
| def get_lang(): | |
| """Extract language preference from X-Lang header or lang query param.""" | |
| lang = request.headers.get("X-Lang") or request.args.get("lang") or "jp" | |
| if lang.lower() in ("en", "english"): | |
| return "en" | |
| return "jp" | |
| def get_room(room_id: str) -> dict[str, Any] | None: | |
| """Get room data safely.""" | |
| with game_lock: | |
| room = ROOMS.get(room_id) | |
| if room: | |
| room["last_active"] = datetime.now() | |
| return room | |
| PLANNER_ROOT_PHASES = (Phase.MAIN, Phase.LIVE_SET) | |
| PLANNER_TRACKED_PHASES = (Phase.MAIN, Phase.LIVE_SET, Phase.RESPONSE) | |
| def is_planner_root_phase(phase: Any) -> bool: | |
| try: | |
| return int(phase) in {int(p) for p in PLANNER_ROOT_PHASES} | |
| except (TypeError, ValueError): | |
| return False | |
| def is_planner_tracked_phase(phase: Any) -> bool: | |
| try: | |
| return int(phase) in {int(p) for p in PLANNER_TRACKED_PHASES} | |
| except (TypeError, ValueError): | |
| return False | |
| def get_planner_store(room: dict[str, Any]) -> dict[str, Any]: | |
| planner_store = room.setdefault("planner_lab", {}) | |
| planner_store.setdefault("sessions", {}) | |
| planner_store.setdefault("last_results", {}) | |
| return planner_store | |
| def get_planner_session_key(gs, player_idx): | |
| """Generate a planner session key from game state.""" | |
| try: | |
| return f"{int(gs.turn)}:{player_idx}" | |
| except AttributeError: | |
| return f"unknown:{player_idx}" | |
| def clone_rust_state(gs): | |
| """Clone a Rust game state (requires Rust engine).""" | |
| if RUST_DB is None: | |
| raise RuntimeError("Rust engine not initialized") | |
| try: | |
| cloned = engine_rust.PyGameState(RUST_DB) | |
| cloned.apply_state_json(gs.to_json()) | |
| return cloned | |
| except (AttributeError, Exception) as e: | |
| print(f"Error cloning rust state: {e}") | |
| raise | |
| def get_score_breakdown_dict(gs, player_idx, rust_db=None): | |
| """Get score breakdown from Rust game state.""" | |
| if rust_db is None: | |
| rust_db = RUST_DB | |
| if rust_db is None or gs is None: | |
| return {"board": 0.0, "live": 0.0, "success": 0.0, "win": 0.0, "hand": 0.0, "cycling": 0.0, "total": 0.0} | |
| try: | |
| board, live, success, win, hand, cycling, total = gs.get_score_breakdown(rust_db, player_idx) | |
| except (AttributeError, TypeError, Exception) as e: | |
| print(f"Error getting score breakdown: {e}") | |
| return {"board": 0.0, "live": 0.0, "success": 0.0, "win": 0.0, "hand": 0.0, "cycling": 0.0, "total": 0.0} | |
| return { | |
| "board": float(board), | |
| "live": float(live), | |
| "success": float(success), | |
| "win": float(win), | |
| "hand": float(hand), | |
| "cycling": float(cycling), | |
| "total": float(total), | |
| } | |
| def describe_action_for_state(gs, action_id, lang="jp"): | |
| """Describe an action from game state.""" | |
| try: | |
| compat_gs = RustCompatGameState(gs, member_db, live_db, energy_db) | |
| return get_action_desc(action_id, compat_gs, lang=lang, text=getattr(gs, 'pending_choice_text', '')) | |
| except Exception as e: | |
| print(f"Error describing action: {e}") | |
| return f"Action {action_id}" | |
| def apply_sequence_with_descriptions( | |
| root_state_json, | |
| action_ids, | |
| player_idx, | |
| lang="jp", | |
| rust_db=None, | |
| ): | |
| """Apply a sequence of actions with descriptions (Rust engine only).""" | |
| if rust_db is None: | |
| rust_db = RUST_DB | |
| if rust_db is None: | |
| return {"error": "Rust engine not initialized", "sequence": []} | |
| try: | |
| sim_state = engine_rust.PyGameState(rust_db) | |
| sim_state.apply_state_json(root_state_json) | |
| except (AttributeError, TypeError, Exception) as e: | |
| print(f"Error creating Rust game state: {e}") | |
| return {"error": f"Failed to create game state: {e}", "sequence": []} | |
| entries = [] | |
| invalid_step = None | |
| for seq_index, action_id in enumerate(action_ids, start=1): | |
| try: | |
| legal_ids = set(sim_state.get_legal_action_ids()) | |
| desc = describe_action_for_state(sim_state, action_id, lang) | |
| except Exception as e: | |
| print(f"Error processing action: {e}") | |
| break | |
| entry = { | |
| "index": seq_index, | |
| "action_id": int(action_id), | |
| "desc": desc, | |
| "legal": action_id in legal_ids, | |
| } | |
| entries.append(entry) | |
| if action_id not in legal_ids: | |
| invalid_step = seq_index | |
| entry["error"] = "Illegal from recreated state" | |
| break | |
| try: | |
| sim_state.step(action_id) | |
| except Exception as e: | |
| print(f"Error stepping game state: {e}") | |
| invalid_step = seq_index | |
| break | |
| return { | |
| "action_ids": [int(action_id) for action_id in action_ids], | |
| "sequence": entries, | |
| "invalid_step": invalid_step, | |
| "is_valid": invalid_step is None, | |
| "breakdown": get_score_breakdown_dict(sim_state, player_idx, rust_db), | |
| "phase_after": int(sim_state.phase), | |
| "turn_after": int(sim_state.turn), | |
| "current_player_after": int(sim_state.current_player), | |
| "is_terminal": bool(sim_state.is_terminal()), | |
| } | |
| def build_optimal_sequence_payload(root_state_json, player_idx, lang="jp", rust_db=None): | |
| """Build optimal action sequence from game state (Rust engine only).""" | |
| if rust_db is None: | |
| rust_db = RUST_DB | |
| if rust_db is None: | |
| return {"error": "Rust engine not initialized", "sequence": [], "action_ids": []} | |
| try: | |
| root_state = engine_rust.PyGameState(rust_db) | |
| root_state.apply_state_json(root_state_json) | |
| except (AttributeError, TypeError, Exception) as e: | |
| print(f"Error in build_optimal_sequence_payload: {e}") | |
| return {"error": str(e), "sequence": [], "action_ids": []} | |
| liveset_nodes = 0 | |
| if int(root_state.phase) == int(Phase.LIVE_SET): | |
| optimal_ids, liveset_nodes, _ = root_state.find_best_liveset_selection(rust_db) | |
| planner_breakdown = {"board": 0.0, "live": 0.0, "total": 0.0} | |
| else: | |
| _, optimal_ids, planner_nodes, planner_breakdown_raw = root_state.plan_full_turn(rust_db) | |
| optimal_ids = list(optimal_ids) | |
| planner_breakdown = { | |
| "board": float(planner_breakdown_raw[0]), | |
| "live": float(planner_breakdown_raw[1]), | |
| "total": float(planner_breakdown_raw[0] + planner_breakdown_raw[1]), | |
| } | |
| liveset_nodes = int(planner_nodes) | |
| full_ids = list(optimal_ids) | |
| main_state = engine_rust.PyGameState(rust_db) | |
| main_state.apply_state_json(root_state_json) | |
| for action_id in optimal_ids: | |
| main_state.step(action_id) | |
| appended_liveset = [] | |
| if int(main_state.phase) == int(Phase.LIVE_SET): | |
| liveset_ids, extra_nodes, _ = main_state.find_best_liveset_selection(rust_db) | |
| appended_liveset = list(liveset_ids) | |
| full_ids.extend(appended_liveset) | |
| liveset_nodes += int(extra_nodes) | |
| applied = apply_sequence_with_descriptions(root_state_json, full_ids, player_idx, lang, rust_db) | |
| applied["nodes"] = int(liveset_nodes) | |
| applied["planner_breakdown"] = planner_breakdown | |
| applied["main_action_ids"] = [int(action_id) for action_id in optimal_ids] | |
| applied["liveset_action_ids"] = [int(action_id) for action_id in appended_liveset] | |
| return applied | |
| def build_planner_analysis_from_session(session, lang="jp", rust_db=None): | |
| """Build planner analysis from a recorded session (Rust engine only).""" | |
| if rust_db is None: | |
| rust_db = RUST_DB | |
| if rust_db is None: | |
| return {"error": "Rust engine not initialized", "optimal": {}, "your_sequence": {}} | |
| player_idx = int(session["player_idx"]) | |
| optimal = build_optimal_sequence_payload(session["root_state_json"], player_idx, lang, rust_db) | |
| your_sequence = apply_sequence_with_descriptions( | |
| session["root_state_json"], | |
| session.get("actions", []), | |
| player_idx, | |
| lang, | |
| rust_db, | |
| ) | |
| matched_prefix = 0 | |
| for own_action, optimal_action in zip(your_sequence["action_ids"], optimal["action_ids"]): | |
| if own_action != optimal_action: | |
| break | |
| matched_prefix += 1 | |
| your_sequence["matched_prefix"] = matched_prefix | |
| your_sequence["optimal_length"] = len(optimal["action_ids"]) | |
| your_sequence["status"] = "completed" if session.get("completed") else "in_progress" | |
| your_sequence["root_turn"] = int(session["root_turn"]) | |
| your_sequence["root_phase"] = int(session["root_phase"]) | |
| return { | |
| "session_key": session["session_key"], | |
| "player_id": player_idx, | |
| "root_turn": int(session["root_turn"]), | |
| "root_phase": int(session["root_phase"]), | |
| "optimal": optimal, | |
| "your_sequence": your_sequence, | |
| } | |
| def ensure_planner_session(room, gs, player_idx): | |
| """Ensure a planner session exists for tracking player actions (Rust engine only).""" | |
| if not is_planner_root_phase(gs.phase): | |
| return None | |
| planner_store = get_planner_store(room) | |
| session_key = get_planner_session_key(gs, player_idx) | |
| session_id = str(player_idx) | |
| existing = planner_store["sessions"].get(session_id) | |
| if existing and existing.get("session_key") == session_key: | |
| return existing | |
| session = { | |
| "session_key": session_key, | |
| "player_idx": int(player_idx), | |
| "root_turn": int(gs.turn), | |
| "root_phase": int(gs.phase), | |
| "root_state_json": gs.to_json(), | |
| "actions": [], | |
| "completed": False, | |
| } | |
| planner_store["sessions"][session_id] = session | |
| return session | |
| def append_planner_action(room: dict[str, Any], player_idx: int, action_id: int) -> None: | |
| planner_store = get_planner_store(room) | |
| session = planner_store["sessions"].get(str(player_idx)) | |
| if not session: | |
| return | |
| session.setdefault("actions", []).append(int(action_id)) | |
| def finalize_planner_session(room: dict[str, Any], player_idx: int, lang: str) -> None: | |
| planner_store = get_planner_store(room) | |
| session = planner_store["sessions"].pop(str(player_idx), None) | |
| if not session: | |
| return | |
| session["completed"] = True | |
| result = build_planner_analysis_from_session(session, lang) | |
| result["your_sequence"]["status"] = "completed" | |
| planner_store["last_results"][str(player_idx)] = result | |
| def maybe_finalize_planner_session( | |
| room: dict[str, Any], | |
| gs: engine_rust.PyGameState, | |
| player_idx: int, | |
| lang: str, | |
| ) -> None: | |
| planner_store = get_planner_store(room) | |
| session = planner_store["sessions"].get(str(player_idx)) | |
| if not session: | |
| return | |
| if ( | |
| int(gs.turn) == int(session["root_turn"]) | |
| and int(gs.current_player) == int(player_idx) | |
| and is_planner_tracked_phase(gs.phase) | |
| and not gs.is_terminal() | |
| ): | |
| return | |
| def build_planner_payload(room, gs, player_idx, lang="jp"): | |
| """Build planner payload for displaying planning analysis to player (Rust engine only).""" | |
| planner_store = get_planner_store(room) | |
| active = False | |
| session = planner_store["sessions"].get(str(player_idx)) | |
| rust_db = get_rust_db_for_card_set(room.get("card_set", "compiled")) | |
| if not session and int(gs.current_player) == int(player_idx) and is_planner_root_phase(gs.phase) and not gs.is_terminal(): | |
| session = ensure_planner_session(room, gs, player_idx) | |
| if session: | |
| active = True | |
| analysis = build_planner_analysis_from_session(session, lang, rust_db) | |
| analysis["active"] = True | |
| analysis["available"] = True | |
| return analysis | |
| last_result = planner_store["last_results"].get(str(player_idx)) | |
| if last_result: | |
| payload = dict(last_result) | |
| payload["active"] = False | |
| payload["available"] = True | |
| payload["message"] = "Showing your last completed scored sequence." | |
| return payload | |
| return { | |
| "available": bool(int(gs.current_player) == int(player_idx) and is_planner_root_phase(gs.phase) and not gs.is_terminal()), | |
| "active": False, | |
| "message": "Turn planner becomes available on your Main or Live Set turn.", | |
| "player_id": int(player_idx), | |
| "root_turn": int(gs.turn), | |
| "root_phase": int(gs.phase), | |
| "optimal": None, | |
| "your_sequence": None, | |
| } | |
| # Reverse mapping: card_no string -> internal integer ID | |
| card_no_to_id: dict[str, int] = {} | |
| vanilla_card_no_to_id: dict[str, int] = {} | |
| def build_card_no_mapping(): | |
| """Build reverse lookup from card_no string to internal ID for both compiled and vanilla. | |
| Ensures consistency with the Rust engine's internal ID assignments. | |
| """ | |
| global card_no_to_id, vanilla_card_no_to_id | |
| card_no_to_id = {} | |
| vanilla_card_no_to_id = {} | |
| # Build mapping for compiled data | |
| try: | |
| compiled_path = os.path.join(DATA_DIR, "cards_compiled.json") | |
| if not os.path.exists(compiled_path): | |
| print(f"Warning: {compiled_path} not found. Compiled mapping will be empty.") | |
| else: | |
| with open(compiled_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Build mapping from dbs | |
| count = 0 | |
| for db_name in ["member_db", "live_db", "energy_db"]: | |
| db = data.get(db_name, {}) | |
| for internal_id, card_data in db.items(): | |
| card_no = card_data.get("card_no") | |
| if card_no: | |
| # Convert string key to integer ID | |
| # USE NORMALIZED KEY | |
| norm_key = UnifiedDeckParser.normalize_code(card_no) | |
| card_no_to_id[norm_key] = int(internal_id) | |
| count += 1 | |
| print(f"Built card_no_to_id mapping from compiled data: {count} entries") | |
| except Exception as e: | |
| print(f"Error building mapping from compiled data: {e}") | |
| # Build mapping for vanilla data | |
| try: | |
| vanilla_path = os.path.join(DATA_DIR, "cards_vanilla.json") | |
| if not os.path.exists(vanilla_path): | |
| print(f"Warning: {vanilla_path} not found. Vanilla mapping will be empty.") | |
| else: | |
| with open(vanilla_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Build mapping from dbs | |
| count = 0 | |
| for db_name in ["member_db", "live_db", "energy_db"]: | |
| db = data.get(db_name, {}) | |
| for internal_id, card_data in db.items(): | |
| card_no = card_data.get("card_no") | |
| if card_no: | |
| # Convert string key to integer ID | |
| norm_key = UnifiedDeckParser.normalize_code(card_no) | |
| vanilla_card_no_to_id[norm_key] = int(internal_id) | |
| count += 1 | |
| print(f"Built vanilla_card_no_to_id mapping from vanilla data: {count} entries") | |
| except Exception as e: | |
| print(f"Error building mapping from vanilla data: {e}") | |
| # Load data immediately on import | |
| load_game_data() | |
| # Initialize mapping on startup | |
| build_card_no_mapping() | |
| def get_card_no_mapping(card_set: str = "compiled") -> dict[str, int]: | |
| """Get the appropriate card_no_to_id mapping based on card set.""" | |
| if card_set == "vanilla": | |
| return vanilla_card_no_to_id | |
| else: | |
| return card_no_to_id | |
| def convert_deck_strings_to_ids(deck_strings, card_set: str = "compiled"): | |
| """Convert list of card_no strings to internal IDs (Unique Instance IDs). | |
| Uses the appropriate mapping based on card_set ("vanilla" or "compiled"). | |
| """ | |
| mapping = get_card_no_mapping(card_set) | |
| ids = [] | |
| counts = {} | |
| for card_no in deck_strings: | |
| norm_code = UnifiedDeckParser.normalize_code(card_no) | |
| if norm_code in mapping: | |
| base_id = mapping[norm_code] | |
| count = counts.get(base_id, 0) | |
| uid = create_uid(base_id, count) | |
| counts[base_id] = count + 1 | |
| ids.append(uid) | |
| else: | |
| print(f"Warning: Unknown card_no '{card_no}' (norm: '{norm_code}') in {card_set} set, skipping.") | |
| return ids | |
| def save_replay(gs: GameState | None = None): | |
| """Save the provided game state's history to a file.""" | |
| if gs is None or not gs.rule_log: | |
| return | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| os.makedirs("replays", exist_ok=True) | |
| filename = f"replays/replay_{timestamp}.json" | |
| filename_opt = f"replays/replay_{timestamp}_opt.json" | |
| # Use historical states from rule_log or history if we maintain one | |
| # For now, we assume GS has what we need or we pass history | |
| history = [] # In this engine, standard replays are often built from logs or incremental states | |
| # 1. Save Standard Replay (Compatible) | |
| data = { | |
| "game_id": 0, | |
| "timestamp": timestamp, | |
| "winner": gs.winner if gs else -1, | |
| "states": history, | |
| } | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False) | |
| print(f"Replay saved to {filename}") | |
| # 2. Save Optimized Replay (Dict Encoded) | |
| try: | |
| print("Optimizing replay...") | |
| # Gather Level 3 Context | |
| deck_info = None | |
| if gs: | |
| deck_info = { | |
| "p0_deck": list(getattr(gs.players[0], "initial_deck_indices", [])), | |
| "p1_deck": list(getattr(gs.players[1], "initial_deck_indices", [])), | |
| } | |
| opt_data = optimize_history( | |
| history, | |
| member_db, | |
| live_db, | |
| energy_db, | |
| exclude_db_cards=True, | |
| # seed=current_seed, | |
| # action_log=action_log, | |
| deck_info=deck_info, | |
| ) | |
| final_opt = { | |
| "game_id": 0, | |
| "timestamp": timestamp, | |
| "winner": gs.winner if gs else -1, | |
| } | |
| # Merge optimization data | |
| if "level" in opt_data and opt_data["level"] == 3: | |
| final_opt.update(opt_data) # seed, decks, action_log | |
| print("Level 3 Optimization Active (Action Log)") | |
| else: | |
| final_opt["states"] = opt_data["states"] | |
| with open(filename_opt, "w", encoding="utf-8") as f: | |
| json.dump(final_opt, f, ensure_ascii=False) | |
| # Calculate savings | |
| size_std = os.path.getsize(filename) | |
| size_opt = os.path.getsize(filename_opt) | |
| savings = (1 - size_opt / size_std) * 100 | |
| print(f"Optimized replay saved to {filename_opt}") | |
| print(f"Compression: {size_std / 1024:.1f}KB -> {size_opt / 1024:.1f}KB ({savings:.1f}% savings)") | |
| except Exception as e: | |
| print(f"Failed to save optimized replay: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| except Exception as e: | |
| print(f"Failed to save replay: {e}") | |
| # Global state variables removed to prevent room bleeding | |
| def init_game(deck_type="normal"): | |
| # This function is legacy and mainly for single-player tests. | |
| # It now uses legacy global variables for backward compatibility. | |
| global member_db, live_db, energy_db, game_history, action_log, current_seed | |
| # Ensure true randomness for each game | |
| import time | |
| real_seed = int(time.time() * 1000) % (2**31) | |
| current_seed = real_seed | |
| game_history = [] | |
| action_log = [] | |
| random.seed(real_seed) | |
| # DATA PATH: data/cards.json | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| loader = CardDataLoader(cards_path) | |
| member_db, live_db, energy_db = loader.load() | |
| # CRITICAL: Populate GameState static DBs so validations work | |
| # Use initialize_class_db to ensure proper wrapping with MaskedDB | |
| GameState.initialize_class_db(member_db, live_db) | |
| GameState.energy_db = energy_db | |
| # Initialize JIT arrays for performance | |
| GameState._init_jit_arrays() | |
| # Build reverse mapping for custom deck support | |
| build_card_no_mapping() | |
| # Pre-calculate Start Deck card IDs | |
| # Load raw JSON to check product field for filtering | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| with open(cards_path, "r", encoding="utf-8") as f: | |
| json.load(f) | |
| for _cid, _m in member_db.items(): | |
| # Find raw key by matching name/cost/type? Or better, DataLoader should store product. | |
| # Since DataLoader doesn't verify product yet, we'll try to guess or just use ALL valid cards | |
| # that are from Start Deck (usually ID < 100 for this mock loader or by string ID). | |
| # Actually, let's just use ALL loaded members/lives for 'normal' and specific ones for 'starter'. | |
| # For 'start_deck', we can filter by card string ID prefix 'PL!-sd1' or 'LL-E'. | |
| # But 'member_db' keys are integers 0..N. We need a way to link back. | |
| # The loader assigns IDs sequentially. | |
| # Let's just build a random valid deck from ALL cards for now, | |
| # unless 'easy' mode. | |
| pass | |
| # If deck_type is 'easy', we use the simple mock cards for logic testing. | |
| # If deck_type is 'normal' or 'starter', we use REAL cards. | |
| if deck_type == "easy": | |
| easy_m, easy_l = create_easy_cards() | |
| member_db[easy_m.card_id] = easy_m | |
| live_db[easy_l.card_id] = easy_l | |
| game_state = GameState() | |
| # Setup players | |
| for pidx, p in enumerate(game_state.players): | |
| # Check for custom deck first | |
| custom_deck = custom_deck_p0 if pidx == 0 else custom_deck_p1 | |
| if custom_deck: | |
| # Use custom deck | |
| p.main_deck = convert_deck_strings_to_ids(custom_deck) | |
| print(f"Player {pidx}: Using custom deck ({len(p.main_deck)} cards)") | |
| elif deck_type == "easy": | |
| # Use Easy Cards (888/999) but mapped to real images | |
| p.main_deck = [888] * 48 + [999] * 12 | |
| else: | |
| # NORMAL / STARTER MODE: Build a valid deck | |
| # Rule: Max 4 copies of same card number. | |
| # Total: 48 Members + 12 Lives (Total 60 in main deck per game_state spec) | |
| p.main_deck = [] | |
| # 1. Select Members (48) | |
| available_members = list(member_db.keys()) | |
| if available_members: | |
| # Shuffle availability to vary decks | |
| random.shuffle(available_members) | |
| member_bucket = [] | |
| for mid in available_members: | |
| # Add 4 copies of each until we have enough | |
| # Use create_uid for unique instance IDs | |
| for i in range(4): | |
| uid = create_uid(mid, i) | |
| member_bucket.append(uid) | |
| if len(member_bucket) >= 150: # Optimization: Don't build massive list | |
| break | |
| # Pick 48 from the bucket | |
| if len(member_bucket) < 48: | |
| # Fallback if DB too small | |
| while len(member_bucket) < 48: | |
| member_bucket.extend(available_members) | |
| # Ensure we don't accidentally pick >4 if we just slice | |
| # Actually, simply taking the first 48 from our constructed bucket (which has 4 of each distinct card) | |
| # guarantees validity if we shuffle the CARDS/TYPES, not the final list. | |
| # Steps: | |
| # 1. Shuffle types. | |
| # 2. Add 4 of Type A, 4 of Type B... | |
| # 3. Take first 48 cards. | |
| p.main_deck.extend(member_bucket[:48]) | |
| # 2. Select Lives (12) | |
| available_lives = list(live_db.keys()) | |
| if available_lives: | |
| random.shuffle(available_lives) | |
| live_bucket = [] | |
| for lid in available_lives: | |
| # live_bucket.extend([lid] * 4) | |
| for i in range(4): | |
| uid = create_uid(lid, i) | |
| live_bucket.append(uid) | |
| if len(live_bucket) >= 50: | |
| break | |
| if len(live_bucket) < 12: | |
| while len(live_bucket) < 12: | |
| live_bucket.extend(available_lives) | |
| p.main_deck.extend(live_bucket[:12]) | |
| random.shuffle(p.main_deck) | |
| # Energy Deck (12 cards) | |
| # Use actual Energy Card ID if available (2000+) | |
| if energy_db: | |
| eid = list(energy_db.keys())[0] # Take first energy card type found | |
| p.energy_deck = [eid] * 12 | |
| else: | |
| p.energy_deck = [40000] * 12 # Fallback | |
| # Custom Energy Deck Override | |
| custom_energy = custom_energy_deck_p0 if pidx == 0 else custom_energy_deck_p1 | |
| if custom_energy: | |
| p.energy_deck = convert_deck_strings_to_ids(custom_energy) | |
| print(f"Player {pidx}: Using custom energy deck ({len(p.energy_deck)} cards)") | |
| # Explicit shuffle before drawing | |
| if not custom_deck: | |
| random.shuffle(p.main_deck) | |
| if game_state.players.index(p) == 0: | |
| print(f"DEBUG: P0 Deck Shuffled. Top 5: {p.main_deck[-5:]}") | |
| # Initial draw (6 cards - standard Mulligan start) | |
| for _ in range(6): | |
| if p.main_deck: | |
| p.hand.append(p.main_deck.pop()) | |
| p.hand_added_turn.append(game_state.turn_number) | |
| # Initial energy: 3 cards (Rule 6.2.1.7) | |
| for _ in range(3): | |
| if p.energy_deck: | |
| p.energy_zone.append(p.energy_deck.pop(0)) | |
| # Randomly determine first player | |
| game_state.first_player = random.randint(0, 1) | |
| # For Mulligan Phase (P1/Index 0), Current Player MUST be 0 | |
| # The 'first_player' variable determines who acts first in ACTIVE phase (Round 1) | |
| game_state.current_player = 0 | |
| # Start in MULLIGAN phase | |
| game_state.phase = Phase.MULLIGAN_P1 | |
| def get_rust_db_for_card_set(card_set: str = "compiled"): | |
| """Get the appropriate Rust database based on card_set selection.""" | |
| if card_set == "vanilla": | |
| if RUST_DB_VANILLA is None: | |
| raise Exception("RUST_DB_VANILLA not initialized") | |
| return RUST_DB_VANILLA | |
| else: | |
| if RUST_DB is None: | |
| raise Exception("RUST_DB not initialized") | |
| return RUST_DB | |
| def create_room_internal( | |
| room_id: str, | |
| mode: str = "pve", | |
| deck_type: str = "normal", | |
| public: bool = False, | |
| custom_decks: dict = None, | |
| card_set: str = "compiled", | |
| ) -> dict[str, Any]: | |
| """Helper to initialize a room using the RUST engine or Python fallback.""" | |
| print( | |
| f"DEBUG: Creating Room {room_id} (Mode: {mode}, Deck: {deck_type}, Public: {public}, CustomDecks: {bool(custom_decks)}, CardSet: {card_set})" | |
| ) | |
| # Try to use Rust engine, fall back to Python if unavailable | |
| gs = None | |
| engine_type = "python" # Default to Python | |
| try: | |
| rust_db = get_rust_db_for_card_set(card_set) | |
| if rust_db is not None: | |
| gs = engine_rust.PyGameState(rust_db) | |
| engine_type = "rust" | |
| print(f"Using RUST engine for room {room_id}") | |
| except (AttributeError, TypeError, Exception) as e: | |
| print(f"Rust engine unavailable ({e}), falling back to Python engine") | |
| gs = None | |
| # Fall back to Python engine if Rust failed | |
| if gs is None: | |
| print(f"Using PYTHON engine for room {room_id}") | |
| gs = GameState() | |
| engine_type = "python" | |
| # Get the correct card databases based on card_set | |
| current_member_db = vanilla_member_db if card_set == "vanilla" else member_db | |
| current_live_db = vanilla_live_db if card_set == "vanilla" else live_db | |
| current_energy_db = vanilla_energy_db if card_set == "vanilla" else energy_db | |
| # helper for deck generation | |
| # helper for deck generation | |
| def get_random_decks(): | |
| m_ids = list(current_member_db.keys()) | |
| l_ids = list(current_live_db.keys()) | |
| random.shuffle(m_ids) | |
| random.shuffle(l_ids) | |
| # 1. Select 48 Members (12 types * 4 copies) | |
| member_deck = [] | |
| # Use available members, cycling if needed | |
| needed_types = 12 | |
| selected_m_types = [] | |
| if len(m_ids) < needed_types: | |
| # Fallback if DB small | |
| selected_m_types = m_ids * (needed_types // len(m_ids) + 1) | |
| else: | |
| selected_m_types = m_ids | |
| for mid in selected_m_types[:needed_types]: | |
| member_deck.extend([mid] * 4) | |
| # 2. Select 12 Lives (3 types * 4 copies) | |
| live_deck = [] | |
| needed_lives = 3 | |
| selected_l_types = [] | |
| if len(l_ids) < needed_lives: | |
| selected_l_types = l_ids * (needed_lives // len(l_ids) + 1) | |
| else: | |
| selected_l_types = l_ids | |
| for lid in selected_l_types[:needed_lives]: | |
| live_deck.extend([lid] * 4) | |
| # Shuffle main deck parts? | |
| # Actually initialize_game expects specific vectors. | |
| # Logic.rs: d0.extend(p0_lives); self.players[0].deck = SmallVec::from_vec(d0); | |
| # So we should pass them separately. | |
| # But wait, logic.rs `initialize_game_with_seed`: | |
| # let mut d0 = p0_deck; d0.extend(p0_lives); | |
| # So p0_deck should be ONLY members (48), and p0_lives should be ONLY lives (12). | |
| # Energy | |
| e_deck = [list(current_energy_db.keys())[0]] * 12 if current_energy_db else [40000] * 12 | |
| # Return strict separated vectors | |
| return member_deck, e_deck, live_deck | |
| # Defaults | |
| p0_m, p0_e, p0_l = get_random_decks() | |
| p1_m, p1_e, p1_l = get_random_decks() | |
| # Override with custom decks if provided | |
| final_custom_decks = {0: {"main": [], "energy": []}, 1: {"main": [], "energy": []}} | |
| if custom_decks: | |
| final_custom_decks.update(custom_decks) | |
| for pid in [0, 1]: | |
| cdeck = custom_decks.get(str(pid)) or custom_decks.get(pid) | |
| if cdeck and cdeck.get("main"): | |
| # Convert strings to IDs using the appropriate card set mapping | |
| all_main_ids = convert_deck_strings_to_ids(cdeck["main"], card_set=card_set) | |
| # Partition into Members and Lives | |
| members = [] | |
| lives = [] | |
| for uid in all_main_ids: | |
| base_id = uid & BASE_ID_MASK | |
| if base_id in current_member_db: | |
| members.append(uid) | |
| elif base_id in current_live_db: | |
| lives.append(uid) | |
| # Truncate to standard limits (48 members, 12 lives) | |
| # This ensures the engine receives exactly what it expects | |
| if len(members) > 48: | |
| members = members[:48] | |
| if len(lives) > 12: | |
| lives = lives[:12] | |
| if pid == 0: | |
| p0_m = members | |
| p0_l = lives | |
| else: | |
| p1_m = members | |
| p1_l = lives | |
| # Energy (Strictly 12) | |
| if cdeck.get("energy"): | |
| e_ids = convert_deck_strings_to_ids(cdeck["energy"], card_set=card_set) | |
| # Filter to only valid energy cards | |
| e_ids = [e_id for e_id in e_ids if (e_id & BASE_ID_MASK) in current_energy_db] | |
| # Pad with default energy if needed | |
| while len(e_ids) < 12 and current_energy_db: | |
| e_ids.append(list(current_energy_db.keys())[0]) | |
| if len(e_ids) > 12: | |
| e_ids = e_ids[:12] | |
| if pid == 0: | |
| p0_e = e_ids | |
| else: | |
| p1_e = e_ids | |
| # Validate energy decks: ensure they only contain valid energy cards | |
| def validate_energy_deck(p_idx, e_deck, db_name="energy"): | |
| """Filter energy deck to only valid energy card IDs""" | |
| if not e_deck: | |
| return [] | |
| valid_ids = [] | |
| for card_id in e_deck: | |
| base_id = card_id & BASE_ID_MASK | |
| if base_id in current_energy_db: | |
| valid_ids.append(card_id) | |
| else: | |
| print(f"WARNING: Card ID {card_id} (base {base_id}) in Player {p_idx}'s {db_name} deck is NOT an energy card - filtering out") | |
| return valid_ids | |
| # Validate and filter energy decks | |
| p0_e = validate_energy_deck(0, p0_e) | |
| p1_e = validate_energy_deck(1, p1_e) | |
| # Pad with default energy cards if needed | |
| if current_energy_db: | |
| default_energy = list(current_energy_db.keys())[0] | |
| while len(p0_e) < 12: | |
| p0_e.append(default_energy) | |
| while len(p1_e) < 12: | |
| p1_e.append(default_energy) | |
| # Warning: We are not extracting initial lives from main deck for p0_l/p1_l if custom. | |
| # The engine probably draws them? | |
| # If `p0_l` is required, we should pick random 3 from lives in deck or DB? | |
| # For now, let's keep random lives for the Live Zone if not specified, or just reuse random ones. | |
| # Initialize game based on engine type | |
| try: | |
| if engine_type == "rust": | |
| # Rust engine initialization | |
| gs.initialize_game(p0_m, p1_m, p0_e, p1_e, p0_l, p1_l) | |
| else: | |
| # Python engine initialization - for now provide sensible defaults | |
| print(f"Python engine doesn't require explicit initialize_game call") | |
| # GameState should auto-init on creation | |
| except Exception as init_error: | |
| print(f"Warning: initialization error: {init_error}") | |
| # Set up AI agent based on engine and game mode | |
| room_ai_agent = None | |
| if engine_type == "python" and mode == "pve": | |
| # For Python engine in PvE mode, we need an AI agent | |
| try: | |
| if AI_AVAILABLE: | |
| room_ai_agent = SmartAgent() | |
| except Exception as ai_error: | |
| print(f"Warning: Could not instantiate AI agent: {ai_error}") | |
| # Fall back to None if AI creation fails - the game will still work | |
| return { | |
| "state": gs, | |
| "mode": mode, | |
| "card_set": card_set, | |
| "public": public, | |
| "created_at": datetime.now(), | |
| "last_active": datetime.now(), | |
| "ai_agent": room_ai_agent, # MCTS is built-in for Rust; AI agent for Python in PvE | |
| "custom_decks": final_custom_decks, | |
| "sessions": {}, | |
| "usernames": {}, # PID -> username | |
| "engine": engine_type, | |
| "planner_lab": {"sessions": {}, "last_results": {}}, | |
| # History tracking for undo/redo | |
| "history_stack": [gs], # Store raw state for on-demand localization | |
| "history_index": 0, | |
| } | |
| def join_room_logic(room_id: str, username: str = None) -> dict[str, Any]: | |
| """ | |
| Logic to add a user session to a room. | |
| Returns {"session_id": str, "player_id": int} | |
| """ | |
| if room_id not in ROOMS: | |
| return {"error": "Room not found"} | |
| room = ROOMS[room_id] | |
| sessions = room["sessions"] | |
| usernames = room.get("usernames", {}) | |
| new_pid = -1 | |
| session_id = str(uuid.uuid4()) | |
| # 1. Recovery Logic: Check if username already exists in this room | |
| if username: | |
| username = username.strip() | |
| for pid, existing_user in usernames.items(): | |
| if existing_user == username: | |
| print(f"DEBUG: Recovering session for '{username}' as Player {pid}") | |
| sessions[session_id] = pid | |
| return {"session_id": session_id, "player_id": pid, "recovered": True} | |
| # 2. Assignment Logic: Check current players | |
| taken_pids = set(sessions.values()) | |
| if 0 not in taken_pids: | |
| new_pid = 0 | |
| elif 1 not in taken_pids: | |
| new_pid = 1 | |
| else: | |
| # Both full. Spectator? | |
| new_pid = -1 | |
| # 3. Store identity | |
| sessions[session_id] = new_pid | |
| if username and new_pid != -1: | |
| usernames[new_pid] = username | |
| room["usernames"] = usernames | |
| return {"session_id": session_id, "player_id": new_pid} | |
| # --- ROOM MANAGEMENT API --- | |
| def create_new_room(): | |
| print("DEBUG: Entered create_new_room endpoint", file=sys.stderr) | |
| try: | |
| data = request.json or {} | |
| except Exception as e: | |
| print(f"DEBUG: Failed to parse JSON: {e}", file=sys.stderr) | |
| data = {} | |
| # Extract parameters | |
| room_id = get_room_id() | |
| mode = data.get("mode", "pve") | |
| is_public = data.get("public", False) | |
| card_set = data.get("card_set", "compiled") | |
| custom_decks = None | |
| print(f"DEBUG: Generated room_id {room_id}, acquiring lock...", file=sys.stderr) | |
| # Handle frontend parameters if 'decks' is not present | |
| if not custom_decks: | |
| p0_main = data.get("p0_deck") | |
| p0_energy = data.get("p0_energy", []) | |
| p1_main = data.get("p1_deck") | |
| p1_energy = data.get("p1_energy", []) | |
| if p0_main or p1_main: | |
| custom_decks = {} | |
| if p0_main: | |
| custom_decks["0"] = {"main": p0_main, "energy": p0_energy} | |
| if p1_main: | |
| custom_decks["1"] = {"main": p1_main, "energy": p1_energy} | |
| res = {} | |
| res = {} | |
| with game_lock: | |
| print("DEBUG: Lock acquired. Creating room internal...", file=sys.stderr) | |
| ROOMS[room_id] = create_room_internal(room_id, mode, public=is_public, custom_decks=custom_decks, card_set=card_set) | |
| print("DEBUG: Room created internally. Joining creator...", file=sys.stderr) | |
| # Auto-join creator with username if provided | |
| username = data.get("username") | |
| join_res = join_room_logic(room_id, username=username) | |
| print("DEBUG: Returning response.", file=sys.stderr) | |
| return jsonify({"success": True, "room_id": room_id, "mode": mode, "card_set": card_set, "session": join_res}) | |
| def list_public_rooms(): | |
| """Return a list of public rooms.""" | |
| public_rooms = [] | |
| with game_lock: | |
| for rid, room in ROOMS.items(): | |
| if room.get("public", False): | |
| # Calculate player count | |
| sessions = room.get("sessions", {}) | |
| player_count = len(set(sessions.values())) # Approximate, might need better logic if spectators exist | |
| # Or just count occupied slots (0 and 1) | |
| occupied_slots = 0 | |
| taken_pids = set(sessions.values()) | |
| if 0 in taken_pids: | |
| occupied_slots += 1 | |
| if 1 in taken_pids: | |
| occupied_slots += 1 | |
| # Basic Info | |
| gs = room.get("state") | |
| # Handle both Rust (turn) and Python (turn_number) attributes | |
| turn = getattr(gs, "turn_number", getattr(gs, "turn", 0)) | |
| phase = str(gs.phase) if gs else "?" | |
| public_rooms.append( | |
| { | |
| "room_id": rid, | |
| "mode": room.get("mode", "pve"), | |
| "players": occupied_slots, | |
| "turn": turn, | |
| "phase": phase, | |
| "created_at": room.get("created_at", datetime.now()).isoformat(), | |
| } | |
| ) | |
| # Sort by creation time desc | |
| public_rooms.sort(key=lambda x: x["created_at"], reverse=True) | |
| return jsonify({"success": True, "rooms": public_rooms}) | |
| def join_room(): | |
| print("DEBUG: Entered join_room", file=sys.stderr) | |
| data = request.json or {} | |
| room_id = data.get("room_id", "").upper().strip() | |
| print(f"DEBUG: Entered join_room for ID: '{room_id}'", file=sys.stderr) | |
| with game_lock: | |
| if room_id in ROOMS: | |
| mode = ROOMS[room_id]["mode"] | |
| card_set = ROOMS[room_id].get("card_set", "compiled") | |
| print(f"DEBUG: Found room {room_id}, mode={mode}, card_set={card_set}", file=sys.stderr) | |
| # Assign a session/seat to the joining player | |
| username = data.get("username") | |
| join_res = join_room_logic(room_id, username=username) | |
| if "error" in join_res: | |
| return jsonify({"success": False, "error": join_res["error"]}), 400 | |
| # CRITICAL: Use the "session" key to match frontend expectations | |
| return jsonify( | |
| { | |
| "success": True, | |
| "room_id": room_id, | |
| "mode": mode, | |
| "card_set": card_set, | |
| "session": join_res, | |
| "recovered": join_res.get("recovered", False), | |
| } | |
| ) | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| def leave_room(): | |
| """Allow a player to leave a room.""" | |
| room_id = get_room_id() | |
| session_token = request.headers.get("X-Session-Token") | |
| if not room_id or room_id not in ROOMS: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| with game_lock: | |
| room = ROOMS.get(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| sessions = room.get("sessions", {}) | |
| # Remove the session | |
| if session_token and session_token in sessions: | |
| del sessions[session_token] | |
| print(f"DEBUG: Session {session_token} left room {room_id}", file=sys.stderr) | |
| # Update last_active | |
| room["last_active"] = datetime.now() | |
| # Check if room should be immediately deleted: | |
| # If there are no sessions left, delete the room immediately. | |
| if not sessions: | |
| print(f"DEBUG: No sessions left. Immediately deleting room {room_id}", file=sys.stderr) | |
| del ROOMS[room_id] | |
| return jsonify({"success": True}) | |
| def index(): | |
| return send_from_directory(WEB_UI_DIR, "index.html") | |
| def game_board(): | |
| return send_from_directory(WEB_UI_DIR, "game_board.html") # Assuming it exists there | |
| def serve_js(filename): | |
| return send_from_directory(os.path.join(WEB_UI_DIR, "js"), filename) | |
| def serve_css(filename): | |
| return send_from_directory(os.path.join(WEB_UI_DIR, "css"), filename) | |
| def serve_icon(): | |
| # If icon is in root or img, adjust. Assuming img for now or checking existence. | |
| # Fallback to IMG_DIR or WEB_UI_DIR | |
| return send_from_directory(IMG_DIR, "icon_blade.png") | |
| def serve_deck_builder(): | |
| return send_from_directory(WEB_UI_DIR, "deck_builder.html") | |
| def serve_data(filename): | |
| return send_from_directory(DATA_DIR, filename) | |
| import threading | |
| import time | |
| # Threading setup | |
| game_lock = threading.RLock() # Re-entrant lock to prevent self-deadlock | |
| game_thread = None | |
| _cleanup_counter = 0 # Counter for cleanup interval | |
| def cleanup_inactive_rooms(): | |
| """ | |
| Remove rooms that have been inactive for too long or have no active sessions. | |
| Called periodically from background_game_loop. | |
| """ | |
| global _cleanup_counter | |
| _cleanup_counter += 1 | |
| # Only run cleanup every ROOM_CLEANUP_INTERVAL seconds (based on 0.1s sleep = 600 loops) | |
| if _cleanup_counter < ROOM_CLEANUP_INTERVAL: | |
| return | |
| _cleanup_counter = 0 | |
| now = datetime.now() | |
| timeout = timedelta(minutes=ROOM_INACTIVE_TIMEOUT_MINUTES) | |
| rooms_to_remove = [] | |
| for rid, room in ROOMS.items(): | |
| # Skip SINGLE_PLAYER room - it's special | |
| if rid == "SINGLE_PLAYER": | |
| continue | |
| last_active = room.get("last_active") | |
| sessions = room.get("sessions", {}) | |
| # Check if room has any active sessions | |
| active_sessions = [sid for sid, pid in sessions.items() if pid != -1] | |
| # Remove if: | |
| # 1. No active sessions (all players left) | |
| # 2. Inactive for too long | |
| should_remove = False | |
| reason = "" | |
| if not active_sessions: | |
| should_remove = True | |
| reason = "no active sessions" | |
| elif last_active and (now - last_active) > timeout: | |
| should_remove = True | |
| reason = f"inactive for {(now - last_active).total_seconds() / 60:.1f} minutes" | |
| if should_remove: | |
| rooms_to_remove.append((rid, reason)) | |
| # Remove the rooms | |
| for rid, reason in rooms_to_remove: | |
| print(f"DEBUG: Cleaning up room {rid}: {reason}", file=sys.stderr) | |
| del ROOMS[rid] | |
| if rooms_to_remove: | |
| print(f"DEBUG: Cleaned up {len(rooms_to_remove)} inactive rooms", file=sys.stderr) | |
| def background_game_loop(): | |
| """ | |
| Runs the game logic (AI and auto-phases) for ALL active rooms. | |
| """ | |
| print("Background Game Loop Started (Multi-Room)", file=sys.stderr) | |
| while True: | |
| try: | |
| # print("DEBUG: Background Loop acquiring lock...", file=sys.stderr) | |
| # Run room cleanup periodically | |
| cleanup_inactive_rooms() | |
| with game_lock: | |
| # Iterate over a copy of keys to avoid modification issues if needed | |
| active_room_ids = list(ROOMS.keys()) | |
| for rid in active_room_ids: | |
| # print(f"DEBUG: Processing room {rid}...", file=sys.stderr) | |
| room = ROOMS.get(rid) | |
| if not room: | |
| continue | |
| # Skip rooms with no active sessions (except SINGLE_PLAYER) | |
| if rid != "SINGLE_PLAYER": | |
| sessions = room.get("sessions", {}) | |
| active_sessions = [sid for sid, pid in sessions.items() if pid != -1] | |
| if not active_sessions: | |
| continue | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| ai_agent = room["ai_agent"] | |
| if not gs.is_terminal(): | |
| # 1. Auto-Advance Phases | |
| if gs.phase in ( | |
| Phase.ACTIVE, | |
| Phase.ENERGY, | |
| Phase.DRAW, | |
| Phase.PERFORMANCE_P1, | |
| Phase.PERFORMANCE_P2, | |
| ): | |
| # Safe attribute access for Rust engine compatibility | |
| p_choices = getattr(gs, "pending_choices", []) | |
| p_effects = getattr(gs, "pending_effects", []) | |
| if not (p_choices or p_effects): | |
| res = gs.step(0) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| elif gs.current_player == 1 and game_mode == "pve": | |
| is_continue_choice = False | |
| if gs.pending_choices and gs.pending_choices[0][0].startswith("CONTINUE"): | |
| is_continue_choice = True | |
| if gs.phase == Phase.LIVE_RESULT and is_continue_choice: | |
| # Wait for Human | |
| pass | |
| else: | |
| if gs.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2): | |
| aid = 0 | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| else: | |
| if room.get("engine") == "rust": | |
| # Use TurnSequencer planner for vanilla mode, greedy for others | |
| if room.get("card_set") == "vanilla": | |
| gs.step_opponent_turnseq() | |
| else: | |
| gs.step_opponent_greedy() | |
| else: | |
| if ai_agent is not None: | |
| aid = ai_agent.choose_action(gs, 1) | |
| else: | |
| aid = 0 | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| time.sleep(0.1) | |
| except Exception as e: | |
| print(f"Error in game loop: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| time.sleep(1.0) | |
| def get_state(): | |
| try: | |
| room_id = get_room_id() | |
| session_token = request.headers.get("X-Session-Token") | |
| with game_lock: | |
| # Development convenience: Auto-create room if missing IF it's "SINGLE_PLAYER" | |
| if room_id == "SINGLE_PLAYER" and room_id not in ROOMS: | |
| try: | |
| ROOMS[room_id] = create_room_internal(room_id) | |
| except Exception as e: | |
| print(f"Error creating room: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": f"Failed to create room: {e}"}), 500 | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found or expired"}), 404 | |
| # Check if we're in history navigation mode (rewind/redo) | |
| history_stack = room.get("history_stack", []) | |
| history_index = room.get("history_index", 0) | |
| if history_stack and history_index < len(history_stack): | |
| # Return state from history | |
| try: | |
| gs_history = history_stack[history_index] | |
| mode = room["mode"] | |
| lang = get_lang() | |
| # Serialize on-demand with correct language | |
| if room.get("engine") == "rust": | |
| try: | |
| s_state = rust_serializer.serialize_state(gs_history, viewer_idx=0, mode=mode, is_pvp=False, lang=lang) | |
| except Exception as e: | |
| print(f"Error serializing Rust state: {e}") | |
| return jsonify({"success": False, "error": f"Serialization error: {e}"}), 500 | |
| else: | |
| s_state = serialize_state(gs_history, viewer_idx=0, is_pvp=False, mode=mode, lang=lang) | |
| cdecks = room.get("custom_decks", {}) | |
| meta = { | |
| "p0_deck_set": bool(cdecks.get(0, {}).get("main") or cdecks.get("0", {}).get("main")), | |
| "p1_deck_set": bool(cdecks.get(1, {}).get("main") or cdecks.get("1", {}).get("main")), | |
| "mode": mode, | |
| "history_mode": True, | |
| "history_index": history_index, | |
| "history_length": len(history_stack), | |
| } | |
| return jsonify({"success": True, "state": s_state, "meta": meta}) | |
| except Exception as history_error: | |
| print(f"Error processing history state: {history_error}") | |
| import traceback | |
| traceback.print_exc() | |
| # Non-history case: current game state | |
| try: | |
| gs = room["state"] | |
| mode = room["mode"] | |
| viewer_idx = get_player_idx(room) | |
| lang = get_lang() | |
| if room.get("engine") == "rust": | |
| try: | |
| s_state = rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=mode, is_pvp=False, lang=lang) | |
| except Exception as e: | |
| print(f"Error serializing Rust state: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": f"Serialization error: {e}"}), 500 | |
| else: | |
| s_state = serialize_state( | |
| gs, | |
| viewer_idx=viewer_idx, | |
| is_pvp=False, | |
| mode=mode, | |
| lang=lang, | |
| ) | |
| # Meta info about decks | |
| cdecks = room.get("custom_decks", {}) | |
| meta = { | |
| "p0_deck_set": bool(cdecks.get(0, {}).get("main") or cdecks.get("0", {}).get("main")), | |
| "p1_deck_set": bool(cdecks.get(1, {}).get("main") or cdecks.get("1", {}).get("main")), | |
| "mode": mode, | |
| } | |
| return jsonify({"success": True, "state": s_state, "meta": meta}) | |
| except Exception as state_error: | |
| print(f"Error processing current state: {state_error}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": f"Failed to serialize state: {state_error}"}), 500 | |
| except Exception as e: | |
| print(f"Unexpected error in get_state: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": f"Internal error: {e}"}), 500 | |
| def set_deck(): | |
| """Accept a custom deck for a player in a specific room.""" | |
| data = request.json | |
| player_id = data.get("player", 0) | |
| deck_ids = data.get("deck", []) # List of card_no strings | |
| energy_ids = data.get("energy_deck", []) | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| # For setting deck, we might want to allow it even if room doesn't exist yet? | |
| # But conceptually, you create a room, then set deck, then reset/start. | |
| if not room: | |
| # Auto-create for dev workflow | |
| ROOMS[room_id] = create_room_internal(room_id) | |
| room = ROOMS[room_id] | |
| room["custom_decks"][player_id] = {"main": deck_ids, "energy": energy_ids} | |
| return jsonify( | |
| { | |
| "status": "ok", | |
| "player": player_id, | |
| "deck_size": len(deck_ids), | |
| "message": f"Deck set for Player {player_id + 1} in Room {room_id}. Reset game to apply.", | |
| } | |
| ) | |
| def get_room_assets(): | |
| """Return a unique list of image paths for all cards used in the current room.""" | |
| room_id = get_room_id() | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"error": "Room not found"}), 404 | |
| assets = set() | |
| def add_card_assets(cid): | |
| if cid is None or cid < 0: | |
| return | |
| base_id = int(cid) & BASE_ID_MASK | |
| bid_str = str(base_id) | |
| if bid_str in member_db: | |
| assets.add(member_db[bid_str].img_path) | |
| elif bid_str in live_db: | |
| assets.add(live_db[bid_str].img_path) | |
| elif bid_str in energy_db: | |
| assets.add(energy_db[bid_str].img_path) | |
| # 1. From Custom Decks (Pending) | |
| custom_decks = room.get("custom_decks", {}) | |
| for _pid, deck_info in custom_decks.items(): | |
| # 'main' and 'energy' contain card_no strings | |
| for card_no in deck_info.get("main", []): | |
| if card_no in card_no_to_id: | |
| add_card_assets(card_no_to_id[card_no]) | |
| for card_no in deck_info.get("energy", []): | |
| if card_no in card_no_to_id: | |
| add_card_assets(card_no_to_id[card_no]) | |
| # 2. From Current Game State (Active) | |
| if room.get("state"): | |
| gs = room["state"] | |
| for p_idx in [0, 1]: | |
| try: | |
| p = gs.get_player(p_idx) | |
| # Check all zones | |
| zones = [p.hand, p.stage, p.live_zone, p.energy_zone, p.discard, p.success_lives] | |
| for zone in zones: | |
| for cid in zone: | |
| add_card_assets(cid) | |
| except Exception as e: | |
| print(f"DEBUG: Error extracting assets from player {p_idx}: {e}") | |
| # Convert to list and filter out None | |
| final_assets = sorted([a for a in assets if a]) | |
| return jsonify({"success": True, "assets": final_assets}) | |
| def upload_deck(): | |
| """Accept a raw deck file content (decktest.txt style) and load it.""" | |
| data = request.json | |
| content = data.get("content", "") | |
| player_id = data.get("player", 0) | |
| room_id = get_room_id() | |
| # Parse content | |
| try: | |
| if content.strip().startswith("{") or content.strip().startswith("["): | |
| # JSON format | |
| deck_data = json.loads(content) | |
| # Support both simple list and object | |
| if isinstance(deck_data, list): | |
| main_deck = deck_data | |
| energy_deck = [] # JSON list implies only main deck usually? | |
| elif "main" in deck_data: | |
| main_deck = deck_data["main"] | |
| energy_deck = deck_data.get("energy", []) | |
| else: | |
| return jsonify( | |
| {"success": False, "error": "Invalid JSON deck format. Expected list or object with 'main' key."} | |
| ) | |
| else: | |
| # HTML/Text format | |
| card_db = {} | |
| try: | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| with open(cards_path, "r", encoding="utf-8") as f: | |
| card_db = json.load(f) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": f"Failed to load card DB for validation: {e}"}) | |
| main_deck, energy_deck, _, errors = extract_deck_data(content, card_db) # Pass DB for validation | |
| if errors: | |
| return jsonify({"success": False, "error": "Validation Errors:\n" + "\n".join(errors)}) | |
| except json.JSONDecodeError: | |
| return jsonify({"success": False, "error": "Invalid JSON format."}) | |
| except Exception as e: | |
| print(f"Deck parsing error: {e}") | |
| return jsonify({"success": False, "error": str(e)}) | |
| if not main_deck and not energy_deck: | |
| return jsonify({"success": False, "error": "No cards found in file."}) | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| ROOMS[room_id] = create_room_internal(room_id) | |
| room = ROOMS[room_id] | |
| room["custom_decks"][player_id] = {"main": main_deck, "energy": energy_deck} | |
| # Auto-apply? | |
| # Re-init room with "custom" logic? | |
| # For now, let's just create a new room state using these decks immediately for convenience | |
| # But we need to respect the loop. | |
| # Actually existing logic calls init_game(deck_type="custom"). | |
| # We'll just trigger a reset logic manually | |
| # This duplicates logic in reset() but scoped to this room + custom deck applied. | |
| # For simplicity, we just store it. User must click "Reset" or we call reset internal? | |
| # The frontend usually expects upload to just work. | |
| pass | |
| # Trigger Reset via API logic simulation or just return success and let caller Reset? | |
| # Existing behavior: calls init_game("custom"). | |
| # So we should probably do the same: reset the room's state using these custom decks. | |
| # We can reuse the create_room_internal logic if we modify it to accept custom decks directly? | |
| # Or just rely on the room["custom_decks"] being set. | |
| # Let's call reset internal logic here? | |
| # Better: Update endpoints first, then we can verify flow. | |
| # For now, we assume user clicks Reset or we simulate it. | |
| # Actually, let's just return success. The frontend typically reloads or resets. | |
| return jsonify( | |
| { | |
| "success": True, | |
| "main_count": len(main_deck), | |
| "energy_count": len(energy_deck), | |
| "room_id": room_id, | |
| "message": f"Deck Loaded! ({len(main_deck)} Main, {len(energy_deck)} Energy). Please Reset.", | |
| } | |
| ) | |
| def get_test_deck_api(): | |
| """Read deck files from ai/decks/ directory and return card list.""" | |
| from engine.game.deck_utils import extract_deck_data | |
| deck_name = request.args.get("deck", "") # Optional deck name parameter | |
| # Path to ai/decks directory | |
| # Use PROJECT_ROOT for reliability | |
| ai_decks_dir = os.path.join(PROJECT_ROOT, "ai", "decks") | |
| if not os.path.exists(ai_decks_dir): | |
| # Fallback: try CWD relative | |
| ai_decks_dir = os.path.abspath(os.path.join("ai", "decks")) | |
| if not os.path.exists(ai_decks_dir): | |
| return jsonify({"success": False, "error": "ai/decks directory not found"}) | |
| # List available decks (excluding verify script) | |
| available_decks = [] | |
| for f in os.listdir(ai_decks_dir): | |
| if f.endswith(".txt") and not f.startswith("verify"): | |
| available_decks.append(f.replace(".txt", "")) | |
| # If no deck specified, return list of available decks | |
| if not deck_name: | |
| # Default to aqours_cup for "Load Test Deck" button compatibility | |
| deck_name = "aqours_cup" | |
| message = "Defaulting to 'aqours_cup'. Specify ?deck=NAME to load a specific deck." | |
| else: | |
| message = f"Loaded '{deck_name}'" | |
| # Find matching deck file | |
| deck_file = os.path.join(ai_decks_dir, f"{deck_name}.txt") | |
| if not os.path.exists(deck_file): | |
| return jsonify({"success": False, "error": f"Deck '{deck_name}' not found", "available_decks": available_decks}) | |
| try: | |
| with open(deck_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # Load card DB for parsing | |
| card_db_path = os.path.join(CURRENT_DIR, "..", "data", "cards.json") | |
| card_db = {} | |
| if os.path.exists(card_db_path): | |
| with open(card_db_path, "r", encoding="utf-8") as f_db: | |
| card_db = json.load(f_db) | |
| # Use the unified parser | |
| main_deck, energy_deck, type_counts, errors = extract_deck_data(content, card_db) | |
| return jsonify( | |
| { | |
| "success": True, | |
| "deck_name": deck_name, | |
| "content": main_deck, # For compatibility with older frontend | |
| "main_deck": main_deck, | |
| "energy_deck": energy_deck, | |
| "available_decks": available_decks, | |
| "message": f"{message} ({len(main_deck)} Main, {len(energy_deck)} Energy)", | |
| "errors": errors, | |
| } | |
| ) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def validate_cards(): | |
| """Validate card IDs against the database and provide type breakdown.""" | |
| data = request.json | |
| card_ids = data.get("card_ids", []) | |
| card_counts = data.get("card_counts", {}) # Optional: {card_id: quantity} | |
| # Ensure mapping is built | |
| if not card_no_to_id: | |
| print("DEBUG: validation - mapping empty, rebuilding...", flush=True) | |
| build_card_no_mapping() | |
| print(f"DEBUG: validation - map size: {len(card_no_to_id)}", flush=True) | |
| test_key = "PL!SP-bp1-004-R" | |
| if test_key in card_no_to_id: | |
| print(f"DEBUG: validation - found {test_key}: {card_no_to_id[test_key]}", flush=True) | |
| else: | |
| print(f"DEBUG: validation - {test_key} NOT FOUND in map!", flush=True) | |
| known = [] | |
| unknown = [] | |
| card_info = {} # card_id -> {type, name, internal_id} | |
| # Type counters | |
| member_count = 0 | |
| live_count = 0 | |
| energy_count = 0 | |
| for card_id in card_ids: | |
| # print(f"DEBUG: Checking {card_id}", flush=True) | |
| qty = card_counts.get(card_id, 1) | |
| norm_id = UnifiedDeckParser.normalize_code(card_id) | |
| if norm_id in card_no_to_id: | |
| internal_id = card_no_to_id[norm_id] | |
| known.append(card_id) | |
| # Determine type and get name | |
| if internal_id in member_db: | |
| card_info[card_id] = {"type": "Member", "name": member_db[internal_id].name} | |
| member_count += qty | |
| elif internal_id in live_db: | |
| card_info[card_id] = {"type": "Live", "name": live_db[internal_id].name} | |
| live_count += qty | |
| elif internal_id in energy_db: | |
| card_info[card_id] = {"type": "Energy", "name": energy_db[internal_id].name} | |
| energy_count += qty | |
| else: | |
| unknown.append(card_id) | |
| debug_info = { | |
| "map_size": len(card_no_to_id), | |
| "test_key_exists": "PL!SP-bp1-004-R" in card_no_to_id, | |
| "test_key_val": card_no_to_id.get("PL!SP-bp1-004-R", "N/A"), | |
| "first_5_keys": list(card_no_to_id.keys())[:5], | |
| } | |
| return jsonify( | |
| { | |
| "known": known, | |
| "unknown": unknown, | |
| "known_count": len(known), | |
| "unknown_count": len(unknown), | |
| "card_info": card_info, | |
| "breakdown": {"member": member_count, "live": live_count, "energy": energy_count}, | |
| "_debug": debug_info, | |
| } | |
| ) | |
| def clear_performance(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if room: | |
| gs = room["state"] | |
| # Clear the results dictionary | |
| gs.performance_results.clear() | |
| return jsonify({"status": "ok"}) | |
| def record_game_state_to_history(room): | |
| """Save the current game state to the history stack, truncating any redo states.""" | |
| if not room: | |
| return | |
| try: | |
| gs = room["state"] | |
| # In Rust engine, gs is typically a new object from gs.step(). | |
| # We store the object itself to allow on-demand localization later. | |
| state_to_store = gs | |
| # Initialize history if not present (for backward compatibility) | |
| if "history_stack" not in room: | |
| room["history_stack"] = [state_to_store] | |
| room["history_index"] = 0 | |
| return | |
| # Truncate redo states (if we're not at the end of history) | |
| history = room["history_stack"] | |
| idx = room["history_index"] | |
| if idx < len(history) - 1: | |
| room["history_stack"] = history[:idx + 1] | |
| # Add new state | |
| room["history_stack"].append(state_to_store) | |
| room["history_index"] = len(room["history_stack"]) - 1 | |
| # Limit history size to prevent memory bloat (keep last 100 states) | |
| if len(room["history_stack"]) > 100: | |
| # Trim from the beginning | |
| excess = len(room["history_stack"]) - 100 | |
| room["history_stack"] = room["history_stack"][excess:] | |
| room["history_index"] = max(0, room["history_index"] - excess) | |
| except Exception as e: | |
| print(f"Error recording game state to history: {e}") | |
| def do_action(): | |
| room_id = get_room_id() | |
| session_token = request.headers.get("X-Session-Token") | |
| with game_lock: | |
| start_time = time.time() | |
| try: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| # Exit history mode if actively in it (history_index < end of history) | |
| history_stack = room.get("history_stack", []) | |
| history_index = room.get("history_index", 0) | |
| if history_stack and history_index < len(history_stack) - 1: | |
| # We're rewound/redone, truncate future history and continue from here | |
| room["history_stack"] = history_stack[:history_index + 1] | |
| room["history_index"] = history_index | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| ai_agent = room["ai_agent"] | |
| sessions = room.get("sessions", {}) | |
| # Session Validation (Enforce Turn) | |
| if session_token and session_token in sessions: | |
| pid = sessions[session_token] | |
| if pid != -1: | |
| # Check Pending Choice Turn | |
| p_choices = getattr(gs, "pending_choices", []) | |
| if p_choices: | |
| # Handle both Rust (str, str) and Python (str, dict) formats | |
| params = _coerce_pending_choice_params(p_choices[0][1]) | |
| choice_pid = params.get("player_id", gs.current_player) | |
| if choice_pid != pid: | |
| return jsonify( | |
| {"success": False, "error": f"Not your turn to choose (Waiting for P{choice_pid})"} | |
| ), 403 | |
| # Check Main Turn | |
| elif gs.current_player != pid: | |
| return jsonify( | |
| {"success": False, "error": f"Not your turn (Waiting for P{gs.current_player})"} | |
| ), 403 | |
| data = request.json | |
| action_id = data.get("action_id", 0) | |
| force = data.get("force", False) | |
| requester_idx = get_player_idx(room) | |
| if room.get("engine") == "rust" and requester_idx == gs.current_player and is_planner_root_phase(gs.phase): | |
| ensure_planner_session(room, gs, requester_idx) | |
| legal_mask = gs.get_legal_actions() | |
| # Validate Action | |
| if not (0 <= action_id < len(legal_mask)): | |
| return jsonify({"success": False, "error": "Invalid action ID"}), 400 | |
| # Enforce Perspective/Active Player consistency in PvP | |
| if game_mode == "pvp": | |
| if requester_idx != gs.current_player: | |
| # Special check for RPS phase: both players can act independently | |
| # but if current_player is fixed on one, we may need to allow the other. | |
| # In Rust engine, RPS actions usually don't care about current_player if they are valid. | |
| # But if the backend blocks it, we have a lock. | |
| # Logic: If phase is RPS or TurnChoice, allow both or ensure transition is smooth | |
| is_special = (gs.phase in (Phase.RPS, Phase.TurnChoice)) or ( | |
| hasattr(gs.phase, "name") and gs.phase.name in ("RPS", "TurnChoice") | |
| ) | |
| if not is_special: | |
| return jsonify( | |
| {"success": False, "error": f"Not your turn! It's P{gs.current_player + 1}'s turn."} | |
| ), 403 | |
| elif game_mode == "pve": | |
| # In PvE, normally block manual actions when it's the AI player's turn (P1). | |
| # However, special setup phases like RPS and TurnChoice allow both players | |
| # to submit choices independently — permit those here. | |
| is_special = (gs.phase in (Phase.RPS, Phase.TurnChoice)) or ( | |
| hasattr(gs.phase, "name") and gs.phase.name in ("RPS", "TurnChoice") | |
| ) | |
| if gs.current_player == 1 and not is_special: | |
| return jsonify({"success": False, "error": "AI is playing, please wait."}), 403 | |
| is_legal = legal_mask[action_id] | |
| if force or is_legal: | |
| print(f"[DEBUG] Executing action {action_id} in phase {gs.phase} for player {gs.current_player}") | |
| # Step 1: Execute User Action | |
| res = gs.step(action_id) | |
| if room.get("engine") == "rust": | |
| append_planner_action(room, requester_idx, action_id) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| # Step 2: Auto-Advance & AI Handling | |
| max_safety = 50 | |
| while not gs.is_terminal() and max_safety > 0: | |
| max_safety -= 1 | |
| print(f"[DEBUG] Loop: phase={gs.phase} current_player={gs.current_player}") | |
| is_special_phase = (gs.phase in (Phase.RPS, Phase.TurnChoice)) or ( | |
| hasattr(gs.phase, "name") and gs.phase.name in ("RPS", "TurnChoice") | |
| ) | |
| # A. Automatic Phases (SETUP, 1=Active, 2=Energy, 3=Draw, 6=Perf1, 7=Perf2, 8=LiveResult) | |
| if gs.phase == Phase.SETUP or gs.phase in (1, 2, 3, 6, 7, 8): | |
| print(f"[DEBUG] Auto-advancing phase {gs.phase}") | |
| res = gs.step(0) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| continue | |
| # Special setup phases in PvE can bounce between human and AI. | |
| # If the human still has a legal setup choice after a tie/reset, | |
| # stop here so the client gets another shot instead of the AI | |
| # immediately chaining through the next RPS/turn-order action. | |
| if game_mode == "pve" and is_special_phase: | |
| try: | |
| human_legal_mask = gs.get_legal_actions(player_idx=0) | |
| ai_legal_mask = gs.get_legal_actions(player_idx=1) | |
| human_can_act = bool(np.any(human_legal_mask)) | |
| ai_can_act = bool(np.any(ai_legal_mask)) | |
| print( | |
| f"[DEBUG] Special PvE phase readiness: " | |
| f"human_can_act={human_can_act} ai_can_act={ai_can_act}" | |
| ) | |
| if human_can_act: | |
| print(f"[DEBUG] Breaking loop for human input in special phase {gs.phase}") | |
| break | |
| if ai_can_act: | |
| print(f"[DEBUG] AI handling special phase {gs.phase}") | |
| if room.get("engine") == "rust": | |
| if room.get("card_set") == "vanilla": | |
| gs.step_opponent_turnseq() | |
| else: | |
| gs.step_opponent_mcts(10) | |
| else: | |
| if ai_agent is not None: | |
| aid = ai_agent.choose_action(gs, 1) | |
| else: | |
| aid = 0 | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| continue | |
| except Exception as special_phase_error: | |
| print(f"[DEBUG] Special PvE phase handling fallback: {special_phase_error}") | |
| # B. AI Turn (P1) - ONLY if PVE | |
| if gs.current_player == 1 and game_mode == "pve": | |
| print(f"[DEBUG] AI taking turn in phase {gs.phase}") | |
| if room.get("engine") == "rust": | |
| # Use TurnSequencer planner for vanilla mode (optimized for lower turn counts) | |
| if room.get("card_set") == "vanilla": | |
| gs.step_opponent_turnseq() | |
| else: | |
| gs.step_opponent_mcts(10) | |
| else: | |
| # Python AI | |
| if ai_agent is not None: | |
| aid = ai_agent.choose_action(gs, 1) | |
| else: | |
| aid = 0 | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| continue | |
| print(f"[DEBUG] Breaking loop at phase {gs.phase}") | |
| break | |
| viewer_idx = get_player_idx() | |
| lang = get_lang() | |
| duration = time.time() - start_time | |
| print(f"[PERF] /api/action took {duration:.3f}s (Action: {action_id})") | |
| if room.get("engine") == "rust": | |
| maybe_finalize_planner_session(room, gs, requester_idx, lang) | |
| # Record state to history stack after successful action | |
| record_game_state_to_history(room) | |
| # Serialize state appropriately based on engine type | |
| if room.get("engine") == "rust": | |
| serialized_state = rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=game_mode, lang=lang) | |
| else: | |
| serialized_state = serialize_state(gs, viewer_idx=viewer_idx, is_pvp=False, mode=game_mode, lang=lang) | |
| return jsonify( | |
| { | |
| "success": True, | |
| "state": serialized_state, | |
| } | |
| ) | |
| else: | |
| return jsonify({"success": False, "error": f"Illegal action {action_id}"}), 400 | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| # Auto-report issue on crash | |
| try: | |
| report_dir = os.path.join(CURRENT_DIR, "reports") | |
| os.makedirs(report_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| crash_file = os.path.join(report_dir, f"crash_{timestamp}.json") | |
| try: | |
| lang = get_lang() | |
| if room is not None and room.get("engine") == "rust": | |
| serialized_state = rust_serializer.serialize_state( | |
| gs, viewer_idx=get_player_idx(), mode=game_mode, lang=lang | |
| ) | |
| else: | |
| serialized_state = serialize_state( | |
| gs, viewer_idx=get_player_idx(), is_pvp=(game_mode == "pvp"), mode=game_mode, lang=lang | |
| ) | |
| with open(crash_file, "w", encoding="utf-8") as f: | |
| # Use app.json.dumps to handle Numpy types | |
| f.write( | |
| app.json.dumps( | |
| { | |
| "error": str(e), | |
| "trace": traceback.format_exc(), | |
| "state": serialized_state, | |
| } | |
| ) | |
| ) | |
| except Exception as inner_e: | |
| # Fallback if serialization fails | |
| with open(crash_file, "w", encoding="utf-8") as f: | |
| f.write( | |
| app.json.dumps( | |
| {"error": str(e), "trace": traceback.format_exc(), "serialization_error": str(inner_e)} | |
| ) | |
| ) | |
| except OSError: | |
| pass | |
| return jsonify({"success": False, "error": str(e), "trace": traceback.format_exc()}), 500 | |
| def index_route(): | |
| return send_from_directory(app.static_folder, "index.html") | |
| def static_proxy(path): | |
| return send_from_directory(app.static_folder, path) | |
| def god_mode(): | |
| room_id = get_room_id() | |
| code = request.json.get("code", "") | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}) | |
| gs = room["state"] | |
| try: | |
| p = gs.active_player | |
| exec(code, {"state": gs, "p": p, "np": np}) | |
| return jsonify( | |
| {"success": True, "state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"), lang=get_lang())} | |
| ) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def reset(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| data = request.json or {} | |
| deck_type = data.get("deck_type", "normal") | |
| # Allow changing mode on reset | |
| new_mode = data.get("mode") # Optional | |
| # Check if room exists to preserve existing params if not specified | |
| old_room = ROOMS.get(room_id) | |
| mode = new_mode if new_mode else (old_room["mode"] if old_room else "pve") | |
| ROOMS[room_id] = create_room_internal(room_id, mode, deck_type) | |
| room = ROOMS[room_id] | |
| # Check for custom decks and apply them if they exist for this room | |
| if old_room and "custom_decks" in old_room: | |
| room["custom_decks"] = old_room["custom_decks"] | |
| # Preserve sessions | |
| if old_room and "sessions" in old_room: | |
| room["sessions"] = old_room["sessions"] | |
| if deck_type == "custom": | |
| # Apply custom decks to the fresh state | |
| gs = room["state"] | |
| for pid in [0, 1]: | |
| cdeck = room["custom_decks"].get(pid) | |
| if cdeck and cdeck["main"]: | |
| gs.players[pid].main_deck = convert_deck_strings_to_ids(cdeck["main"]) | |
| random.shuffle(gs.players[pid].main_deck) | |
| # Re-draw hand? | |
| gs.players[pid].hand = [] | |
| gs.players[pid].hand_added_turn = [] | |
| for _ in range(6): | |
| if gs.players[pid].main_deck: | |
| gs.players[pid].hand.append(gs.players[pid].main_deck.pop()) | |
| gs.players[pid].hand_added_turn.append(0) | |
| if cdeck and cdeck["energy"]: | |
| # Re-fill energy | |
| gs.players[pid].energy_deck = convert_deck_strings_to_ids(cdeck["energy"]) | |
| gs.players[pid].energy_zone = [] | |
| for _ in range(3): | |
| if gs.players[pid].energy_deck: | |
| gs.players[pid].energy_zone.append(gs.players[pid].energy_deck.pop(0)) | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| # Auto-advance (AI goes first or Init steps) | |
| max_safety = 100 | |
| while not gs.is_terminal() and max_safety > 0: | |
| max_safety -= 1 | |
| # Automatic phases | |
| if gs.phase in (-2, 1, 2, 3, 6, 7, 8): | |
| gs.step(0) | |
| continue | |
| # AI Turn (P1) | |
| if gs.current_player == 1 and game_mode == "pve": | |
| gs.step_opponent_mcts(10) | |
| continue | |
| break # P0 turn or user input needed | |
| return jsonify({"success": True, "state": rust_serializer.serialize_state(gs, mode=game_mode, lang=get_lang())}) | |
| def ai_suggest(): | |
| room_id = get_room_id() | |
| data = request.json or {} | |
| sims = data.get("sims", 10) | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"error": "Room not found"}), 404 | |
| gs = room["state"] | |
| # Only run if not terminal | |
| if gs.is_terminal(): | |
| return jsonify({"suggestions": []}) | |
| stats = gs.get_mcts_suggestions(sims) | |
| # Shim for get_action_desc | |
| class RustShim: | |
| def __init__(self, gs): | |
| self.phase = gs.phase | |
| self.current_player = gs.current_player | |
| self.active_player = gs.get_player(gs.current_player) | |
| self.member_db = member_db | |
| self.live_db = live_db | |
| self.pending_choices = [] # TODO: expose from rust if needed | |
| shim = RustShim(gs) | |
| # Enrich stats with descriptions | |
| enriched = [] | |
| lang = get_lang() | |
| for action, value, visits in stats: | |
| desc = get_action_desc(action, shim, lang=lang) | |
| enriched.append({"action_id": action, "value": float(value), "visits": int(visits), "desc": desc}) | |
| return jsonify({"success": True, "suggestions": enriched}) | |
| def get_turn_planner(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = ROOMS.get(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| if room.get("engine") != "rust": | |
| return jsonify({"success": False, "error": "Turn planner is only available with the Rust engine."}), 400 | |
| gs = room["state"] | |
| player_idx = get_player_idx(room) | |
| planner = build_planner_payload(room, gs, player_idx, get_lang()) | |
| return jsonify({"success": True, "planner": planner}) | |
| def score_turn_planner_sequence(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = ROOMS.get(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| if room.get("engine") != "rust": | |
| return jsonify({"success": False, "error": "Turn planner is only available with the Rust engine."}), 400 | |
| gs = room["state"] | |
| player_idx = get_player_idx(room) | |
| planner_store = get_planner_store(room) | |
| session = planner_store["sessions"].get(str(player_idx)) | |
| if not session: | |
| return jsonify({"success": False, "error": "No tracked player sequence is available to score."}), 400 | |
| planner = build_planner_analysis_from_session(session, get_lang()) | |
| planner["active"] = True | |
| planner["available"] = True | |
| return jsonify({"success": True, "planner": planner}) | |
| def list_replays(): | |
| replays = [] | |
| # 1. Root replays | |
| try: | |
| if os.path.exists("replays"): | |
| for f in os.listdir("replays"): | |
| if f.endswith(".json") and os.path.isfile(os.path.join("replays", f)): | |
| replays.append({"filename": f, "folder": ""}) | |
| # 2. Tournament subfolder | |
| tourney_dir = os.path.join("replays", "tournament") | |
| if os.path.exists(tourney_dir): | |
| for f in os.listdir(tourney_dir): | |
| if f.endswith(".json"): | |
| # We need to handle pathing. The frontend might expect just filename. | |
| # But get_replay takes "filename". | |
| # We should probably update get_replay to handle subpaths or encode it. | |
| # For now let's just use the relative path as the filename | |
| replays.append({"filename": f"tournament/{f}", "folder": "tournament"}) | |
| except Exception as e: | |
| print(f"Error listing replays: {e}") | |
| return jsonify({"success": False, "error": str(e)}) | |
| # Sort by filename desc (usually timestamp) | |
| replays.sort(key=lambda x: x["filename"], reverse=True) | |
| return jsonify({"success": True, "replays": replays}) | |
| def get_replay(filename): | |
| """Serve replay JSON files""" | |
| replay_path = f"replays/{filename}" | |
| if os.path.exists(replay_path): | |
| with open(replay_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Auto-inflate if it's an optimized replay | |
| if "registry" in data and "states" in data: | |
| print(f"Inflating optimized replay: {filename}") | |
| inflated_states = inflate_history(data, member_db, live_db, energy_db) | |
| # Reconstruct standard format | |
| data["states"] = inflated_states | |
| # Remove registry to avoid confusing frontend if it doesn't expect it | |
| del data["registry"] | |
| return jsonify(data) | |
| return jsonify({"error": "Replay not found"}), 404 | |
| def advance(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| gs = room["state"] | |
| ai_agent = room["ai_agent"] | |
| # Run auto-advance loop | |
| max_safety = 50 | |
| while not gs.is_terminal() and max_safety > 0: | |
| max_safety -= 1 | |
| # Advance if in an automatic phase (AND no choices pending) | |
| if not gs.pending_choices and gs.phase in ( | |
| Phase.ACTIVE, | |
| Phase.ENERGY, | |
| Phase.DRAW, | |
| Phase.PERFORMANCE_P1, | |
| Phase.PERFORMANCE_P2, | |
| ): | |
| gs = gs.step(0) | |
| room["state"] = gs | |
| continue | |
| # Determine who should act (Check pending choices first) | |
| next_actor = gs.current_player | |
| if gs.pending_choices: | |
| # Handle both Rust (str, str) and Python (str, dict) formats | |
| params = _coerce_pending_choice_params(gs.pending_choices[0][1]) | |
| next_actor = params.get("player_id", gs.current_player) | |
| # If it's the AI's turn (P1) or the AI has a pending choice, let it act immediately | |
| if next_actor == 1 and not gs.is_terminal(): | |
| if ai_agent is not None: | |
| aid = ai_agent.choose_action(gs, 1) | |
| else: | |
| aid = 0 | |
| gs = gs.step(aid) | |
| room["state"] = gs | |
| continue | |
| break | |
| return jsonify( | |
| { | |
| "success": True, | |
| "state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"), mode=room["mode"], lang=get_lang()), | |
| } | |
| ) | |
| def get_full_log(): | |
| """Return the complete rule log without truncation.""" | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"log": [], "total_entries": 0}) | |
| gs = room["state"] | |
| return jsonify({"log": gs.rule_log, "total_entries": len(gs.rule_log)}) | |
| def set_ai(): | |
| room_id = get_room_id() | |
| data = request.json | |
| mode = data.get("ai_mode", "smart") | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}) | |
| if mode == "random": | |
| room["ai_agent"] = RandomAgent() | |
| elif mode == "smart": | |
| room["ai_agent"] = SmartAgent() | |
| else: | |
| return jsonify({"success": False, "error": f"Unknown AI mode: {mode}"}) | |
| return jsonify({"success": True, "mode": mode}) | |
| def report_issue(): | |
| """Save the current game state and user explanation to a report file.""" | |
| try: | |
| room_id = get_room_id() | |
| room = get_room(room_id) | |
| gs = room["state"] if room else None | |
| data = request.json | |
| explanation = data.get("explanation", "") | |
| # We can take the current state from the request or just use our global game_state | |
| # Providing it in the request is safer if the user is looking at a specific frame (e.g. in replay mode) | |
| # But for now, let's use the provided state if it exists, otherwise capture the current one. | |
| if room and room.get("engine") == "rust": | |
| serialized = rust_serializer.serialize_state(gs, viewer_idx=0, mode=room.get("mode", "pve")) | |
| else: | |
| serialized = serialize_state(gs, is_pvp=(room["mode"] == "pvp" if room else False)) | |
| state_to_save = data.get("state") or serialized | |
| history = data.get("history", []) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| os.makedirs("reports", exist_ok=True) | |
| filename = f"reports/report_{timestamp}.json" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump( | |
| { | |
| "timestamp": timestamp, | |
| "explanation": explanation, | |
| "state": state_to_save, | |
| "history": history, | |
| "performance_history": state_to_save.get("performance_history", []), | |
| "performance_results": state_to_save.get("performance_results", {}), | |
| "action_desc": get_action_desc(state_to_save.get("last_action", 0), gs, lang=get_lang()) | |
| if gs and "last_action" in state_to_save | |
| else "N/A", | |
| }, | |
| f, | |
| indent=2, | |
| ensure_ascii=False, | |
| ) | |
| return jsonify({"success": True, "filename": filename}) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": str(e)}), 500 | |
| def debug_rewind(): | |
| """Undo the last action by moving back in history.""" | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| if "history_stack" not in room or not room["history_stack"]: | |
| return jsonify({"success": False, "error": "No history to rewind"}), 400 | |
| # Move back one step if possible | |
| idx = room.get("history_index", 0) | |
| if idx > 0: | |
| room["history_index"] = idx - 1 | |
| # Restore the previous state from history | |
| serialized_state = room["history_stack"][room["history_index"]] | |
| # We need to reconstruct the GameState from the serialized state | |
| # For now, we'll just return success and let the frontend fetch the new state | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"success": False, "error": "Already at start of history"}), 400 | |
| def debug_redo(): | |
| """Redo the last undone action by moving forward in history.""" | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| if "history_stack" not in room or not room["history_stack"]: | |
| return jsonify({"success": False, "error": "No history to redo"}), 400 | |
| # Move forward one step if possible | |
| idx = room.get("history_index", 0) | |
| max_idx = len(room["history_stack"]) - 1 | |
| if idx < max_idx: | |
| room["history_index"] = idx + 1 | |
| # Restore the next state from history | |
| serialized_state = room["history_stack"][room["history_index"]] | |
| # We need to reconstruct the GameState from the serialized state | |
| # For now, we'll just return success and let the frontend fetch the new state | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"success": False, "error": "Already at end of history"}), 400 | |
| def export_game(): | |
| """Export the current game state with history as minimal JSON.""" | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| # Get current state | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| history_stack = room.get("history_stack", []) | |
| history_index = room.get("history_index", 0) | |
| # Serialize current state | |
| serialized = rust_serializer.serialize_state(gs, viewer_idx=0, mode=game_mode, lang=get_lang()) | |
| export_data = { | |
| "export_timestamp": datetime.now().isoformat(), | |
| "game_mode": game_mode, | |
| "current_state": serialized, | |
| "history": history_stack, | |
| "history_index": history_index, | |
| "custom_decks": room.get("custom_decks", {}), | |
| } | |
| return jsonify(export_data) | |
| def import_game(): | |
| """Import a previously exported game state with full history.""" | |
| room_id = get_room_id() | |
| data = request.json | |
| if not data or "current_state" not in data: | |
| return jsonify({"success": False, "error": "Invalid import data"}), 400 | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| try: | |
| # Restore history | |
| room["history_stack"] = data.get("history", [data.get("current_state")]) | |
| room["history_index"] = data.get("history_index", 0) | |
| # The frontend should fetch the state after import to reconstruct GameState | |
| # For now, we'll store the import data and let the next fetchState reconstruct it | |
| room["pending_import_state"] = data.get("current_state") | |
| return jsonify({"success": True, "message": "Game imported successfully"}) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": str(e)}), 500 | |
| def generate_random_deck_list(member_db, live_db) -> list[str]: | |
| "Generate a valid random deck list (card_no strings)." | |
| deck = [] | |
| # 1. Select Members (48) | |
| available_members = [c.card_no for c in member_db.values()] | |
| if available_members: | |
| member_bucket = [] | |
| for m_no in available_members: | |
| member_bucket.extend([m_no] * 4) | |
| random.shuffle(member_bucket) | |
| while len(member_bucket) < 48: | |
| member_bucket.extend(available_members) | |
| deck.extend(member_bucket[:48]) | |
| # 2. Select Lives (12) | |
| available_lives = [c.card_no for c in live_db.values()] | |
| if available_lives: | |
| live_bucket = [] | |
| for l_no in available_lives: | |
| live_bucket.extend([l_no] * 4) | |
| random.shuffle(live_bucket) | |
| while len(live_bucket) < 12: | |
| live_bucket.extend(available_lives) | |
| deck.extend(live_bucket[:12]) | |
| return deck | |
| def get_random_deck_api(): | |
| global member_db, live_db | |
| deck_list = generate_random_deck_list(member_db, live_db) | |
| return jsonify( | |
| {"success": True, "content": deck_list, "message": f"Generated Random Deck ({len(deck_list)} cards)"} | |
| ) | |
| def get_presets(): | |
| """Return list of preset decks from tests/presets.json.""" | |
| try: | |
| preset_path = os.path.join(CURRENT_DIR, "..", "tests", "presets.json") | |
| if os.path.exists(preset_path): | |
| with open(preset_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return jsonify({"success": True, "presets": data}) | |
| return jsonify({"success": False, "error": "presets.json not found", "presets": []}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| if __name__ == "__main__": | |
| # PyInstaller Bundle Check | |
| if getattr(sys, "frozen", False): | |
| # If frozen, we might need to adjust static folder or templates folder depending on how flask finds them. | |
| # However, we added paths with --add-data, so they should be in sys._MEIPASS. | |
| # Flask's root_path defaults to __main__ directory, which in onefile mode is temporary. | |
| # We need to explicitly point static_folder to the MEIPASS location. | |
| bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore | |
| app.static_folder = os.path.join(bundle_dir, "web_ui") | |
| # app.template_folder = os.path.join(bundle_dir, 'templates') # if we used templates | |
| # Also need to make sure data loader finds 'data/cards.json' | |
| # CardDataLoader expects relative path. We might need to chdir or patch it. | |
| # Easiest is to chdir to the bundle dir so relative paths work? | |
| # BUT 'replays' need to be written to writable cwd, not temp dir. | |
| # So we should NOT chdir globally. | |
| # Instead, we should update filenames to be absolute paths based on bundle_dir if read-only. | |
| # Monkey patch the loader path just for this instance if needed, | |
| # but CardDataLoader takes a path arg. | |
| # We need to ensure 'init_game' calls it with the correct absolute path. | |
| pass | |
| # Patched init_game for Frozen state to find data | |
| original_init_game = init_game | |
| def frozen_init_game(deck_type="normal"): | |
| if getattr(sys, "frozen", False): | |
| bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore | |
| os.path.join(bundle_dir, "data", "cards.json") | |
| # We need to temporarily force the loader to use this path | |
| # But init_game hardcodes "data/cards.json" in correct logic? | |
| # actually checking init_game source: | |
| # loader = CardDataLoader("data/cards.json") | |
| # We need to change that line or intercept. | |
| # Use os.chdir to temp dir for READS? No, we need writes to real dir. | |
| # Best way: Just ensure data/cards.json exists in CWD? No, user won't have it. | |
| # HACK: We can't easily change the hardcoded string inside init_game without rewriting it. | |
| # However, we can patch CardDataLoader class to fix the path! | |
| # Assuming CardDataLoader is imported from engine.game.data_loader | |
| from engine.game.data_loader import CardDataLoader | |
| ops_init = CardDataLoader.__init__ | |
| def new_init(self, filepath): | |
| if not os.path.exists(filepath) and getattr(sys, "frozen", False): | |
| # Try bundle path | |
| bundle_path = os.path.join(sys._MEIPASS, filepath) # type: ignore | |
| if os.path.exists(bundle_path): | |
| filepath = bundle_path | |
| ops_init(self, filepath) | |
| CardDataLoader.__init__ = new_init # type: ignore[method-assign] | |
| original_init_game(deck_type) | |
| init_game = frozen_init_game | |
| # Run Server | |
| # use_reloader=False is crucial for PyInstaller to implicit avoid spawning subprocesses incorrectly | |
| port = int(os.environ.get("PORT", 8000)) | |
| # Auto-open browser | |
| import webbrowser | |
| from threading import Timer | |
| def open_browser(): | |
| webbrowser.open_new(f"http://localhost:{port}/") | |
| if not getattr(sys, "frozen", False) or os.environ.get("OPEN_BROWSER", "true").lower() == "true": | |
| Timer(1.5, open_browser).start() | |
| # Start Background Game Loop | |
| if game_thread is None: | |
| game_thread = threading.Thread(target=background_game_loop, daemon=True) | |
| game_thread.start() | |
| if __name__ == "__main__": | |
| load_game_data() # Ensure data is loaded | |
| port = int(os.environ.get("PORT", 7860)) | |
| local_ip = get_local_ip() | |
| print("\n" + "=" * 50) | |
| print("Love Live Card Game Server Running!") | |
| print(f"Local: http://localhost:{port}") | |
| print(f"Network: http://{local_ip}:{port}") | |
| print("=" * 50 + "\n") | |
| # In production/container, usually don't want debug mode | |
| debug_mode = os.environ.get("FLASK_DEBUG", "True").lower() == "true" | |
| app.run(host="0.0.0.0", port=port, debug=debug_mode) | |