LovecaSim / backend /server.py
trioskosmos's picture
Upload folder using huggingface_hub
b30643d verified
"""
Flask Backend for Love Live Card Game Web UI
"""
import json
import os
import random
import sys
import threading
import uuid
from datetime import datetime
from typing import Any
import numpy as np
from flask import Flask, jsonify, request, send_from_directory
from flask.json.provider import DefaultJSONProvider
# Ensure project root is in sys.path for absolute imports
if getattr(sys, "frozen", False):
PROJECT_ROOT = sys._MEIPASS # type: ignore
CURRENT_DIR = os.path.join(PROJECT_ROOT, "backend")
else:
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, ".."))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
# Rust Engine
import engine_rust
try:
from ai.headless_runner import RandomAgent, create_easy_cards
from ai.headless_runner import SmartHeuristicAgent as SmartAgent
AI_AVAILABLE = True
except ImportError:
print("Warning: AI modules not found. AI features will be disabled.")
AI_AVAILABLE = False
class RandomAgent: pass
class SmartAgent: pass
def create_easy_cards(): return None, None
from engine.game.data_loader import CardDataLoader
from engine.game.desc_utils import get_action_desc
from engine.game.enums import Phase
from engine.game.game_state import GameState
from engine.game.replay_manager import inflate_history, optimize_history
from engine.game.serializer import serialize_state
from engine.game.state_utils import create_uid
try:
from rust_serializer import RustGameStateSerializer
except ImportError:
from backend.rust_serializer import RustGameStateSerializer
INSTANCE_SHIFT = 20
BASE_ID_MASK = 0xFFFFF
# --- MODULE DIRECTORIES ---
ENGINE_DIR = os.path.join(PROJECT_ROOT, "engine")
AI_DIR = os.path.join(PROJECT_ROOT, "ai")
TOOLS_DIR = os.path.join(PROJECT_ROOT, "tools")
DATA_DIR = os.path.join(PROJECT_ROOT, "data")
# Tools imports (optional)
try:
from tools.deck_extractor import extract_deck_data
except ImportError:
print("Warning: Could not import deck_extractor from tools.")
def extract_deck_data(content, db):
return [], [], {}, ["Importer not found"]
# Static folder is now in frontend/web_ui
FRONTEND_DIR = os.path.join(PROJECT_ROOT, "frontend")
WEB_UI_DIR = os.path.join(FRONTEND_DIR, "web_ui")
IMG_DIR = os.path.join(FRONTEND_DIR, "img") # Images seem to be in frontend/img
# Note: frontend/web_ui has its own js/css folders which index.html likely uses
app = Flask(__name__, static_folder=WEB_UI_DIR)
class NumpyJSONProvider(DefaultJSONProvider):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
app.json = NumpyJSONProvider(app)
@app.route("/img/<path:filename>")
def serve_img(filename):
# Sanitize and normalize the filename
filename = filename.replace("\\", "/").lstrip("/")
# Check if this is a card image request
if filename.startswith("cards/") or filename.startswith("cards_webp/"):
# Remove old nested 'cards/' prefix if it's there
pure_filename = os.path.basename(filename)
webp_path = os.path.join(IMG_DIR, "cards_webp", pure_filename)
# Priority 1: Flat WebP folder
if os.path.exists(webp_path) and os.path.isfile(webp_path):
return send_from_directory(os.path.join(IMG_DIR, "cards_webp"), pure_filename)
# Priority 2: Try falling back to original nested PNGs for backward compatibility/backup
# (This is mostly for non-compiled access or manual links)
pass
# Define possible search directories relative to PROJECT_ROOT
search_dirs = [
os.path.join(IMG_DIR, "cards_webp"), # Flattened WebP first
IMG_DIR, # frontend/img
os.path.join(IMG_DIR, "texticon"), # frontend/img/texticon
os.path.join(WEB_UI_DIR, "img"), # frontend/web_ui/img
FRONTEND_DIR # Allow direct frontend access if needed
]
for base_dir in search_dirs:
full_path = os.path.join(base_dir, filename)
if os.path.exists(full_path) and os.path.isfile(full_path):
return send_from_directory(base_dir, filename)
# Fallback for .webp requesting .png or vice-versa
if filename.endswith(".webp"):
png_fallback = filename[:-5] + ".png"
full_png_path = os.path.join(base_dir, png_fallback)
if os.path.exists(full_png_path) and os.path.isfile(full_png_path):
return send_from_directory(base_dir, png_fallback)
# Extra fallback for common icons if they are misplaced
if filename == "icon_blade.png" or "icon_blade" in filename:
# Try to find it anywhere in frontend/img
for root, dirs, files in os.walk(IMG_DIR):
if "icon_blade.png" in files:
return send_from_directory(root, "icon_blade.png")
print(f"DEBUG_IMG_404: Could not find {filename} in {search_dirs}")
return "Image not found", 404
@app.route("/icon_blade.png")
def serve_icon_root():
return serve_img("icon_blade.png")
# ai_agent = SmartHeuristicAgent()
ai_agent = SmartAgent() # Use original heuristic AI
# Global game state
# Room Registry
ROOMS: dict[str, dict[str, Any]] = {}
game_lock = threading.Lock()
# Rust Card DB (Global Singleton for performance)
RUST_DB = None
try:
compiled_data_path = os.path.join(DATA_DIR, "cards_compiled.json")
with open(compiled_data_path, "r", encoding="utf-8") as f:
RUST_DB = engine_rust.PyCardDatabase(f.read())
except Exception as e:
print(f"Warning: Failed to load RUST_DB from {compiled_data_path}: {e}")
# Python DBs (for metadata/serialization)
member_db: dict[int, Any] = {}
live_db: dict[int, Any] = {}
energy_db: dict[int, Any] = {}
rust_serializer = None # Initialized after data load
game_history: list[dict] = [] # Global replay history (might need per-room later)
# Legacy custom deck globals (used by init_game)
custom_deck_p0: list[str] | None = None
custom_deck_p1: list[str] | None = None
custom_energy_deck_p0: list[str] | None = None
custom_energy_deck_p1: list[str] | None = None
def load_game_data():
"""Load card data into global databases."""
global member_db, live_db, energy_db, rust_serializer
try:
cards_path = os.path.join(DATA_DIR, "cards.json")
print(f"Loading card data from: {cards_path}")
loader = CardDataLoader(cards_path)
m, l, e = loader.load()
member_db.update(m)
live_db.update(l)
energy_db.update(e)
# Initialize rust_serializer
rust_serializer = RustGameStateSerializer(member_db, live_db, energy_db)
# Build mapping
build_card_no_mapping()
print(f"Data loaded: {len(member_db)} Members, {len(live_db)} Lives, {len(energy_db)} Energy")
print(f"DEBUG PATHS: PROJECT_ROOT={PROJECT_ROOT}")
print(f"DEBUG PATHS: FRONTEND_DIR={FRONTEND_DIR}")
print(f"DEBUG PATHS: WEB_UI_DIR={WEB_UI_DIR}")
print(f"DEBUG PATHS: IMG_DIR={IMG_DIR}")
except Exception as ex:
print(f"CRITICAL ERROR loading card data: {ex}")
import sys
sys.exit(1)
# Load data immediately on import
def get_room_id() -> str:
"""Extract room_id from request header or query param."""
# Priority: Header > Query Param > Default "SINGLE_PLAYER"
rid = request.headers.get("X-Room-Id") or request.args.get("room_id")
if not rid:
# Debug why no ID found
# print(f"DEBUG: No X-Room-Id or room_id param. Headers: {request.headers}", file=sys.stderr)
rid = "SINGLE_PLAYER"
return rid
def get_player_idx():
"""Extract player perspective from X-Player-Idx header or viewer query param."""
# Try query param 'viewer' first (commonly used by frontend)
viewer = request.args.get("viewer")
if viewer is not None:
try:
return int(viewer)
except (ValueError, TypeError):
pass
# Fallback to header
try:
return int(request.headers.get("X-Player-Idx", 0))
except (ValueError, TypeError):
return 0
def get_room(room_id: str) -> dict[str, Any] | None:
"""Get room data safely."""
with game_lock:
room = ROOMS.get(room_id)
if room:
room["last_active"] = datetime.now()
return room
# Reverse mapping: card_no string -> internal integer ID
card_no_to_id: dict[str, int] = {}
def build_card_no_mapping():
"""Build reverse lookup from card_no string to internal ID using compiled data.
Ensures consistency with the Rust engine's internal ID assignments.
"""
global card_no_to_id
card_no_to_id = {}
try:
compiled_path = os.path.join(DATA_DIR, "cards_compiled.json")
if not os.path.exists(compiled_path):
print(f"Warning: {compiled_path} not found. Mapping will be empty.")
return
with open(compiled_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Build mapping from dbs
count = 0
for db_name in ["member_db", "live_db", "energy_db"]:
db = data.get(db_name, {})
for internal_id, card_data in db.items():
card_no = card_data.get("card_no")
if card_no:
# Convert string key to integer ID
card_no_to_id[card_no] = int(internal_id)
count += 1
print(f"Built card_no_to_id mapping from compiled data: {count} entries")
except Exception as e:
print(f"Error building mapping from compiled data: {e}")
# Load data immediately on import
load_game_data()
# Initialize mapping on startup
build_card_no_mapping()
def convert_deck_strings_to_ids(deck_strings):
"""Convert list of card_no strings to internal IDs (Unique Instance IDs)."""
ids = []
counts = {}
for card_no in deck_strings:
if card_no in card_no_to_id:
base_id = card_no_to_id[card_no]
count = counts.get(base_id, 0)
uid = create_uid(base_id, count)
counts[base_id] = count + 1
ids.append(uid)
else:
print(f"Warning: Unknown card_no '{card_no}', skipping.")
return ids
def save_replay(gs: GameState | None = None):
"""Save the provided game state's history to a file."""
if gs is None or not gs.rule_log:
return
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("replays", exist_ok=True)
filename = f"replays/replay_{timestamp}.json"
filename_opt = f"replays/replay_{timestamp}_opt.json"
# Use historical states from rule_log or history if we maintain one
# For now, we assume GS has what we need or we pass history
history = [] # In this engine, standard replays are often built from logs or incremental states
# 1. Save Standard Replay (Compatible)
data = {
"game_id": 0,
"timestamp": timestamp,
"winner": gs.winner if gs else -1,
"states": history,
}
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
print(f"Replay saved to {filename}")
# 2. Save Optimized Replay (Dict Encoded)
try:
print("Optimizing replay...")
# Gather Level 3 Context
deck_info = None
if gs:
deck_info = {
"p0_deck": list(getattr(gs.players[0], "initial_deck_indices", [])),
"p1_deck": list(getattr(gs.players[1], "initial_deck_indices", [])),
}
opt_data = optimize_history(
history,
member_db,
live_db,
energy_db,
exclude_db_cards=True,
# seed=current_seed,
# action_log=action_log,
deck_info=deck_info,
)
final_opt = {
"game_id": 0,
"timestamp": timestamp,
"winner": gs.winner if gs else -1,
}
# Merge optimization data
if "level" in opt_data and opt_data["level"] == 3:
final_opt.update(opt_data) # seed, decks, action_log
print("Level 3 Optimization Active (Action Log)")
else:
final_opt["states"] = opt_data["states"]
with open(filename_opt, "w", encoding="utf-8") as f:
json.dump(final_opt, f, ensure_ascii=False)
# Calculate savings
size_std = os.path.getsize(filename)
size_opt = os.path.getsize(filename_opt)
savings = (1 - size_opt / size_std) * 100
print(f"Optimized replay saved to {filename_opt}")
print(f"Compression: {size_std / 1024:.1f}KB -> {size_opt / 1024:.1f}KB ({savings:.1f}% savings)")
except Exception as e:
print(f"Failed to save optimized replay: {e}")
import traceback
traceback.print_exc()
except Exception as e:
print(f"Failed to save replay: {e}")
game_history = [] # For replay recording
action_log = [] # For action-based replay
current_seed = 0 # For deterministic replay
def init_game(deck_type="normal"):
global game_state, member_db, live_db, energy_db, game_history, current_seed, action_log
# Ensure true randomness for each game
import time
real_seed = int(time.time() * 1000) % (2**31)
current_seed = real_seed
random.seed(real_seed)
# Store action history separately for Level 3 Replay
global action_log
action_log = []
# DATA PATH: data/cards.json
cards_path = os.path.join(DATA_DIR, "cards.json")
loader = CardDataLoader(cards_path)
member_db, live_db, energy_db = loader.load()
# CRITICAL: Populate GameState static DBs so validations work
# Use initialize_class_db to ensure proper wrapping with MaskedDB
GameState.initialize_class_db(member_db, live_db)
GameState.energy_db = energy_db
# Initialize JIT arrays for performance
GameState._init_jit_arrays()
# Build reverse mapping for custom deck support
build_card_no_mapping()
# Pre-calculate Start Deck card IDs
# Load raw JSON to check product field for filtering
cards_path = os.path.join(DATA_DIR, "cards.json")
with open(cards_path, "r", encoding="utf-8") as f:
json.load(f)
for _cid, _m in member_db.items():
# Find raw key by matching name/cost/type? Or better, DataLoader should store product.
# Since DataLoader doesn't verify product yet, we'll try to guess or just use ALL valid cards
# that are from Start Deck (usually ID < 100 for this mock loader or by string ID).
# Actually, let's just use ALL loaded members/lives for 'normal' and specific ones for 'starter'.
# For 'start_deck', we can filter by card string ID prefix 'PL!-sd1' or 'LL-E'.
# But 'member_db' keys are integers 0..N. We need a way to link back.
# The loader assigns IDs sequentially.
# Let's just build a random valid deck from ALL cards for now,
# unless 'easy' mode.
pass
# If deck_type is 'easy', we use the simple mock cards for logic testing.
# If deck_type is 'normal' or 'starter', we use REAL cards.
if deck_type == "easy":
easy_m, easy_l = create_easy_cards()
member_db[easy_m.card_id] = easy_m
live_db[easy_l.card_id] = easy_l
game_state = GameState()
# Setup players
for pidx, p in enumerate(game_state.players):
# Check for custom deck first
custom_deck = custom_deck_p0 if pidx == 0 else custom_deck_p1
if custom_deck:
# Use custom deck
p.main_deck = convert_deck_strings_to_ids(custom_deck)
random.shuffle(p.main_deck) # Shuffle custom deck for variety
print(f"Player {pidx}: Using custom deck ({len(p.main_deck)} cards, shuffled)")
elif deck_type == "easy":
# Use Easy Cards (888/999) but mapped to real images
p.main_deck = [888] * 48 + [999] * 12
else:
# NORMAL / STARTER MODE: Build a valid deck
# Rule: Max 4 copies of same card number.
# Total: 48 Members + 12 Lives (Total 60 in main deck per game_state spec)
p.main_deck = []
# 1. Select Members (48)
available_members = list(member_db.keys())
if available_members:
# Shuffle availability to vary decks
random.shuffle(available_members)
member_bucket = []
for mid in available_members:
# Add 4 copies of each until we have enough
# Use create_uid for unique instance IDs
for i in range(4):
uid = create_uid(mid, i)
member_bucket.append(uid)
if len(member_bucket) >= 150: # Optimization: Don't build massive list
break
# Pick 48 from the bucket
if len(member_bucket) < 48:
# Fallback if DB too small
while len(member_bucket) < 48:
member_bucket.extend(available_members)
# Ensure we don't accidentally pick >4 if we just slice
# Actually, simply taking the first 48 from our constructed bucket (which has 4 of each distinct card)
# guarantees validity if we shuffle the CARDS/TYPES, not the final list.
# Steps:
# 1. Shuffle types.
# 2. Add 4 of Type A, 4 of Type B...
# 3. Take first 48 cards.
p.main_deck.extend(member_bucket[:48])
# 2. Select Lives (12)
available_lives = list(live_db.keys())
if available_lives:
random.shuffle(available_lives)
live_bucket = []
for lid in available_lives:
# live_bucket.extend([lid] * 4)
for i in range(4):
uid = create_uid(lid, i)
live_bucket.append(uid)
if len(live_bucket) >= 50:
break
if len(live_bucket) < 12:
while len(live_bucket) < 12:
live_bucket.extend(available_lives)
p.main_deck.extend(live_bucket[:12])
random.shuffle(p.main_deck)
# Energy Deck (12 cards)
# Use actual Energy Card ID if available (2000+)
if energy_db:
eid = list(energy_db.keys())[0] # Take first energy card type found
p.energy_deck = [eid] * 12
else:
p.energy_deck = [40000] * 12 # Fallback
# Custom Energy Deck Override
custom_energy = custom_energy_deck_p0 if pidx == 0 else custom_energy_deck_p1
if custom_energy:
p.energy_deck = convert_deck_strings_to_ids(custom_energy)
print(f"Player {pidx}: Using custom energy deck ({len(p.energy_deck)} cards)")
# Explicit shuffle before drawing
random.shuffle(p.main_deck)
if game_state.players.index(p) == 0:
print(f"DEBUG: P0 Deck Shuffled. Top 5: {p.main_deck[-5:]}")
# Initial draw (6 cards - standard Mulligan start)
for _ in range(6):
if p.main_deck:
p.hand.append(p.main_deck.pop())
p.hand_added_turn.append(game_state.turn_number)
# Initial energy: 3 cards (Rule 6.2.1.7)
for _ in range(3):
if p.energy_deck:
p.energy_zone.append(p.energy_deck.pop(0))
# Randomly determine first player
game_state.first_player = random.randint(0, 1)
# For Mulligan Phase (P1/Index 0), Current Player MUST be 0
# The 'first_player' variable determines who acts first in ACTIVE phase (Round 1)
game_state.current_player = 0
# Start in MULLIGAN phase
game_state.phase = Phase.MULLIGAN_P1
def create_room_internal(
room_id: str,
mode: str = "pve",
deck_type: str = "normal",
public: bool = False,
custom_decks: dict = None,
) -> dict[str, Any]:
"""Helper to initialize a room using the RUST engine."""
print(
f"DEBUG: Creating Rust Room {room_id} (Mode: {mode}, Deck: {deck_type}, Public: {public}, CustomDecks: {bool(custom_decks)})"
)
if RUST_DB is None:
raise Exception("RUST_DB not initialized")
gs = engine_rust.PyGameState(RUST_DB)
# helper for deck generation
def get_random_decks():
m_ids = list(member_db.keys())
l_ids = list(live_db.keys())
random.shuffle(m_ids)
random.shuffle(l_ids)
main_ids = []
for mid in m_ids[:15]:
main_ids.extend([mid] * 4)
for lid in l_ids[:4]:
main_ids.extend([lid] * 4)
random.shuffle(main_ids)
e_deck = [list(energy_db.keys())[0]] * 12 if energy_db else [40000] * 12
l_deck = l_ids[
:3
] # Actually the Rust engine treats lives as a separate param in some versions or part of deck?
# Checked engine_rust/src/py_bindings.rs: initialize_game needs p0_deck, p1_deck, p0_energy, p1_energy, p0_lives, p1_lives
return main_ids[:60], e_deck, l_ids[:3]
# Defaults
p0_m, p0_e, p0_l = get_random_decks()
p1_m, p1_e, p1_l = get_random_decks()
# Override with custom decks if provided
final_custom_decks = {0: {"main": [], "energy": []}, 1: {"main": [], "energy": []}}
if custom_decks:
final_custom_decks.update(custom_decks)
for pid in [0, 1]:
cdeck = custom_decks.get(str(pid)) or custom_decks.get(pid)
if cdeck and cdeck.get("main"):
# Convert strings to IDs
main_ids = convert_deck_strings_to_ids(cdeck["main"])
random.shuffle(main_ids)
# Extract Live cards for the initial Live Zone (3 cards)
# Note: cid is a UID, so we must mask it to compare with live_db keys
live_ids = [cid for cid in main_ids if (cid & BASE_ID_MASK) in live_db]
if len(main_ids) > 0:
if pid == 0:
p0_m = main_ids
if len(live_ids) >= 3:
p0_l = live_ids[:3] # Pick first 3 as starting lives
elif len(live_ids) > 0:
p0_l = live_ids # Use whatever lives are available
else:
p1_m = main_ids
if len(live_ids) >= 3:
p1_l = live_ids[:3]
elif len(live_ids) > 0:
p1_l = live_ids
# Energy
if cdeck.get("energy"):
e_ids = convert_deck_strings_to_ids(cdeck["energy"])
if pid == 0:
p0_e = e_ids
else:
p1_e = e_ids
# Warning: We are not extracting initial lives from main deck for p0_l/p1_l if custom.
# The engine probably draws them?
# If `p0_l` is required, we should pick random 3 from lives in deck or DB?
# For now, let's keep random lives for the Live Zone if not specified, or just reuse random ones.
gs.initialize_game(p0_m, p1_m, p0_e, p1_e, p0_l, p1_l)
return {
"state": gs,
"mode": mode,
"public": public,
"created_at": datetime.now(),
"last_active": datetime.now(),
"ai_agent": None, # MCTS is built-in
"custom_decks": final_custom_decks,
"sessions": {},
"engine": "rust",
}
def join_room_logic(room_id: str) -> dict[str, Any]:
"""
Logic to add a user session to a room.
Returns {"session_id": str, "player_id": int}
"""
if room_id not in ROOMS:
return {"error": "Room not found"}
room = ROOMS[room_id]
sessions = room["sessions"]
# Simple assignment logic:
# If 0 is free, take 0.
# If 1 is free, take 1.
# Else, maybe return spectator? For now, just return -1 or error.
# Check current players
taken_pids = set(sessions.values())
new_pid = -1
if 0 not in taken_pids:
new_pid = 0
elif 1 not in taken_pids:
new_pid = 1
else:
# Both full. Spectator?
new_pid = -1
# For spectator, maybe we still give a session but with pid -1?
session_id = str(uuid.uuid4())
sessions[session_id] = new_pid
return {"session_id": session_id, "player_id": new_pid}
# --- ROOM MANAGEMENT API ---
@app.route("/api/rooms/create", methods=["POST"])
def create_new_room():
print("DEBUG: Entered create_new_room endpoint", file=sys.stderr)
try:
data = request.json or {}
except Exception as e:
print(f"DEBUG: Failed to parse JSON: {e}", file=sys.stderr)
data = {}
mode = data.get("mode", "pve")
is_public = data.get("public", False)
custom_decks = data.get("decks", None) # Optional initial decks
# Generate 4-char code
import string
chars = string.ascii_uppercase + string.digits
while True:
room_id = "".join(random.choices(chars, k=4))
if room_id not in ROOMS:
break
print(f"DEBUG: Generated room_id {room_id}, acquiring lock...", file=sys.stderr)
res = {}
with game_lock:
print("DEBUG: Lock acquired. Creating room internal...", file=sys.stderr)
ROOMS[room_id] = create_room_internal(room_id, mode, public=is_public, custom_decks=custom_decks)
print("DEBUG: Room created internally. Joining creator...", file=sys.stderr)
# Auto-join creator
join_res = join_room_logic(room_id)
print("DEBUG: Returning response.", file=sys.stderr)
return jsonify({"success": True, "room_id": room_id, "mode": mode, "session": join_res})
@app.route("/api/rooms/list", methods=["GET"])
def list_public_rooms():
"""Return a list of public rooms."""
public_rooms = []
with game_lock:
for rid, room in ROOMS.items():
if room.get("public", False):
# Calculate player count
sessions = room.get("sessions", {})
player_count = len(set(sessions.values())) # Approximate, might need better logic if spectators exist
# Or just count occupied slots (0 and 1)
occupied_slots = 0
taken_pids = set(sessions.values())
if 0 in taken_pids:
occupied_slots += 1
if 1 in taken_pids:
occupied_slots += 1
# Basic Info
gs = room.get("state")
turn = gs.turn_number if gs else 0
phase = str(gs.phase) if gs else "?"
public_rooms.append(
{
"room_id": rid,
"mode": room.get("mode", "pve"),
"players": occupied_slots,
"turn": turn,
"phase": phase,
"created_at": room.get("created_at", datetime.now()).isoformat(),
}
)
# Sort by creation time desc
public_rooms.sort(key=lambda x: x["created_at"], reverse=True)
return jsonify({"success": True, "rooms": public_rooms})
@app.route("/api/rooms/join", methods=["POST"])
def join_room():
print("DEBUG: Entered join_room", file=sys.stderr)
data = request.json or {}
room_id = data.get("room_id", "").upper().strip()
print(f"DEBUG: Entered join_room for ID: '{room_id}'", file=sys.stderr)
with game_lock:
if room_id in ROOMS:
mode = ROOMS[room_id]["mode"]
print(f"DEBUG: Found room {room_id}, mode={mode}", file=sys.stderr)
# Assign a session/seat to the joining player
join_res = join_room_logic(room_id)
if "error" in join_res:
return jsonify({"success": False, "error": join_res["error"]}), 400
return jsonify(
{
"success": True,
"room_id": room_id,
"mode": mode,
"session_id": join_res.get("session_id"),
"player_id": join_res.get("player_id"),
}
)
return jsonify({"success": False, "error": "Room not found"}), 404
@app.route("/")
def index():
return send_from_directory(WEB_UI_DIR, "index.html")
@app.route("/board")
def game_board():
return send_from_directory(WEB_UI_DIR, "game_board.html") # Assuming it exists there
@app.route("/js/<path:filename>")
def serve_js(filename):
return send_from_directory(os.path.join(WEB_UI_DIR, "js"), filename)
@app.route("/css/<path:filename>")
def serve_css(filename):
return send_from_directory(os.path.join(WEB_UI_DIR, "css"), filename)
@app.route("/icon_blade.png")
def serve_icon():
# If icon is in root or img, adjust. Assuming img for now or checking existence.
# Fallback to IMG_DIR or WEB_UI_DIR
return send_from_directory(IMG_DIR, "icon_blade.png")
@app.route("/deck_builder.html")
def serve_deck_builder():
return send_from_directory(WEB_UI_DIR, "deck_builder.html")
@app.route("/data/<path:filename>")
def serve_data(filename):
return send_from_directory(DATA_DIR, filename)
import threading
import time
# Threading setup
game_lock = threading.RLock() # Re-entrant lock to prevent self-deadlock
game_thread = None
def background_game_loop():
"""
Runs the game logic (AI and auto-phases) for ALL active rooms.
"""
print("Background Game Loop Started (Multi-Room)", file=sys.stderr)
while True:
try:
# print("DEBUG: Background Loop acquiring lock...", file=sys.stderr)
with game_lock:
# Iterate over a copy of keys to avoid modification issues if needed
active_room_ids = list(ROOMS.keys())
for rid in active_room_ids:
# print(f"DEBUG: Processing room {rid}...", file=sys.stderr)
room = ROOMS.get(rid)
if not room:
continue
gs = room["state"]
game_mode = room["mode"]
ai_agent = room["ai_agent"]
if not gs.is_terminal():
# 1. Auto-Advance Phases
if gs.phase in (
Phase.ACTIVE,
Phase.ENERGY,
Phase.DRAW,
Phase.PERFORMANCE_P1,
Phase.PERFORMANCE_P2,
):
# Safe attribute access for Rust engine compatibility
p_choices = getattr(gs, "pending_choices", [])
p_effects = getattr(gs, "pending_effects", [])
if not (p_choices or p_effects):
res = gs.step(0)
if res is not None:
room["state"] = res
gs = res
elif gs.current_player == 1 and game_mode == "pve":
is_continue_choice = False
if gs.pending_choices and gs.pending_choices[0][0].startswith("CONTINUE"):
is_continue_choice = True
if gs.phase == Phase.LIVE_RESULT and is_continue_choice:
# Wait for Human
pass
else:
if gs.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2):
aid = 0
res = gs.step(aid)
if res is not None:
room["state"] = res
else:
if room.get("engine") == "rust":
# Use Greedy (1-ply) AI for Rust engine in PVE to maximize responsiveness
gs.step_opponent_greedy()
else:
aid = ai_agent.choose_action(gs, 1)
res = gs.step(aid)
if res is not None:
room["state"] = res
time.sleep(0.1)
except Exception as e:
print(f"Error in game loop: {e}")
import traceback
traceback.print_exc()
time.sleep(1.0)
@app.route("/api/state")
def get_state():
room_id = get_room_id()
session_token = request.headers.get("X-Session-Token")
with game_lock:
# Development convenience: Auto-create room if missing IF it's "SINGLE_PLAYER"
if room_id == "SINGLE_PLAYER" and room_id not in ROOMS:
ROOMS[room_id] = create_room_internal(room_id)
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found or expired"}), 404
gs = room["state"]
mode = room["mode"]
viewer_idx = get_player_idx()
if room.get("engine") == "rust":
s_state = rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=mode, is_pvp=(mode == "pvp"))
else:
s_state = serialize_state(
gs,
viewer_idx=viewer_idx,
is_pvp=(mode == "pvp" and request.headers.get("X-Player-Idx") is None),
mode=mode,
)
# Meta info about decks
cdecks = room.get("custom_decks", {})
meta = {
"p0_deck_set": bool(cdecks.get(0, {}).get("main") or cdecks.get("0", {}).get("main")),
"p1_deck_set": bool(cdecks.get(1, {}).get("main") or cdecks.get("1", {}).get("main")),
"mode": mode,
}
return jsonify({"success": True, "state": s_state, "meta": meta})
@app.route("/api/set_deck", methods=["POST"])
def set_deck():
"""Accept a custom deck for a player in a specific room."""
data = request.json
player_id = data.get("player", 0)
deck_ids = data.get("deck", []) # List of card_no strings
energy_ids = data.get("energy_deck", [])
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
# For setting deck, we might want to allow it even if room doesn't exist yet?
# But conceptually, you create a room, then set deck, then reset/start.
if not room:
# Auto-create for dev workflow
ROOMS[room_id] = create_room_internal(room_id)
room = ROOMS[room_id]
room["custom_decks"][player_id] = {"main": deck_ids, "energy": energy_ids}
return jsonify(
{
"status": "ok",
"player": player_id,
"deck_size": len(deck_ids),
"message": f"Deck set for Player {player_id + 1} in Room {room_id}. Reset game to apply.",
}
)
@app.route("/api/upload_deck", methods=["POST"])
def upload_deck():
"""Accept a raw deck file content (decktest.txt style) and load it."""
data = request.json
content = data.get("content", "")
player_id = data.get("player", 0)
room_id = get_room_id()
# Parse content
try:
if content.strip().startswith("{") or content.strip().startswith("["):
# JSON format
deck_data = json.loads(content)
# Support both simple list and object
if isinstance(deck_data, list):
main_deck = deck_data
energy_deck = [] # JSON list implies only main deck usually?
elif "main" in deck_data:
main_deck = deck_data["main"]
energy_deck = deck_data.get("energy", [])
else:
return jsonify(
{"success": False, "error": "Invalid JSON deck format. Expected list or object with 'main' key."}
)
else:
# HTML/Text format
card_db = {}
try:
cards_path = os.path.join(DATA_DIR, "cards.json")
with open(cards_path, "r", encoding="utf-8") as f:
card_db = json.load(f)
except Exception as e:
return jsonify({"success": False, "error": f"Failed to load card DB for validation: {e}"})
main_deck, energy_deck, _, errors = extract_deck_data(content, card_db) # Pass DB for validation
if errors:
return jsonify({"success": False, "error": "Validation Errors:\n" + "\n".join(errors)})
except json.JSONDecodeError:
return jsonify({"success": False, "error": "Invalid JSON format."})
except Exception as e:
print(f"Deck parsing error: {e}")
return jsonify({"success": False, "error": str(e)})
if not main_deck and not energy_deck:
return jsonify({"success": False, "error": "No cards found in file."})
with game_lock:
room = get_room(room_id)
if not room:
ROOMS[room_id] = create_room_internal(room_id)
room = ROOMS[room_id]
room["custom_decks"][player_id] = {"main": main_deck, "energy": energy_deck}
# Auto-apply?
# Re-init room with "custom" logic?
# For now, let's just create a new room state using these decks immediately for convenience
# But we need to respect the loop.
# Actually existing logic calls init_game(deck_type="custom").
# We'll just trigger a reset logic manually
# This duplicates logic in reset() but scoped to this room + custom deck applied.
# For simplicity, we just store it. User must click "Reset" or we call reset internal?
# The frontend usually expects upload to just work.
pass
# Trigger Reset via API logic simulation or just return success and let caller Reset?
# Existing behavior: calls init_game("custom").
# So we should probably do the same: reset the room's state using these custom decks.
# We can reuse the create_room_internal logic if we modify it to accept custom decks directly?
# Or just rely on the room["custom_decks"] being set.
# Let's call reset internal logic here?
# Better: Update endpoints first, then we can verify flow.
# For now, we assume user clicks Reset or we simulate it.
# Actually, let's just return success. The frontend typically reloads or resets.
return jsonify(
{
"success": True,
"main_count": len(main_deck),
"energy_count": len(energy_deck),
"room_id": room_id,
"message": f"Deck Loaded! ({len(main_deck)} Main, {len(energy_deck)} Energy). Please Reset.",
}
)
@app.route("/api/get_test_deck", methods=["GET"])
def get_test_deck_api():
"""Read deck files from ai/decks/ directory and return card list."""
from engine.game.deck_utils import extract_deck_data
deck_name = request.args.get("deck", "") # Optional deck name parameter
# Path to ai/decks directory
# Use PROJECT_ROOT for reliability
ai_decks_dir = os.path.join(PROJECT_ROOT, "ai", "decks")
if not os.path.exists(ai_decks_dir):
# Fallback: try CWD relative
ai_decks_dir = os.path.abspath(os.path.join("ai", "decks"))
if not os.path.exists(ai_decks_dir):
return jsonify({"success": False, "error": "ai/decks directory not found"})
# List available decks (excluding verify script)
available_decks = []
for f in os.listdir(ai_decks_dir):
if f.endswith(".txt") and not f.startswith("verify"):
available_decks.append(f.replace(".txt", ""))
# If no deck specified, return list of available decks
if not deck_name:
# Default to aqours_cup for "Load Test Deck" button compatibility
deck_name = "aqours_cup"
message = "Defaulting to 'aqours_cup'. Specify ?deck=NAME to load a specific deck."
else:
message = f"Loaded '{deck_name}'"
# Find matching deck file
deck_file = os.path.join(ai_decks_dir, f"{deck_name}.txt")
if not os.path.exists(deck_file):
return jsonify({"success": False, "error": f"Deck '{deck_name}' not found", "available_decks": available_decks})
try:
with open(deck_file, "r", encoding="utf-8") as f:
content = f.read()
# Load card DB for parsing
card_db_path = os.path.join(CURRENT_DIR, "..", "data", "cards.json")
card_db = {}
if os.path.exists(card_db_path):
with open(card_db_path, "r", encoding="utf-8") as f_db:
card_db = json.load(f_db)
# Use the unified parser
main_deck, energy_deck, type_counts, errors = extract_deck_data(content, card_db)
return jsonify(
{
"success": True,
"deck_name": deck_name,
"content": main_deck, # For compatibility with older frontend
"main_deck": main_deck,
"energy_deck": energy_deck,
"available_decks": available_decks,
"message": f"{message} ({len(main_deck)} Main, {len(energy_deck)} Energy)",
"errors": errors,
}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/validate_cards", methods=["POST"])
def validate_cards():
"""Validate card IDs against the database and provide type breakdown."""
data = request.json
card_ids = data.get("card_ids", [])
card_counts = data.get("card_counts", {}) # Optional: {card_id: quantity}
# Ensure mapping is built
if not card_no_to_id:
print("DEBUG: validation - mapping empty, rebuilding...", flush=True)
build_card_no_mapping()
print(f"DEBUG: validation - map size: {len(card_no_to_id)}", flush=True)
test_key = "PL!SP-bp1-004-R"
if test_key in card_no_to_id:
print(f"DEBUG: validation - found {test_key}: {card_no_to_id[test_key]}", flush=True)
else:
print(f"DEBUG: validation - {test_key} NOT FOUND in map!", flush=True)
known = []
unknown = []
card_info = {} # card_id -> {type, name, internal_id}
# Type counters
member_count = 0
live_count = 0
energy_count = 0
for card_id in card_ids:
# print(f"DEBUG: Checking {card_id}", flush=True)
qty = card_counts.get(card_id, 1)
if card_id in card_no_to_id:
internal_id = card_no_to_id[card_id]
known.append(card_id)
# Determine type and get name
if internal_id in member_db:
card_info[card_id] = {"type": "Member", "name": member_db[internal_id].name}
member_count += qty
elif internal_id in live_db:
card_info[card_id] = {"type": "Live", "name": live_db[internal_id].name}
live_count += qty
elif internal_id in energy_db:
card_info[card_id] = {"type": "Energy", "name": energy_db[internal_id].name}
energy_count += qty
else:
unknown.append(card_id)
debug_info = {
"map_size": len(card_no_to_id),
"test_key_exists": "PL!SP-bp1-004-R" in card_no_to_id,
"test_key_val": card_no_to_id.get("PL!SP-bp1-004-R", "N/A"),
"first_5_keys": list(card_no_to_id.keys())[:5],
}
return jsonify(
{
"known": known,
"unknown": unknown,
"known_count": len(known),
"unknown_count": len(unknown),
"card_info": card_info,
"breakdown": {"member": member_count, "live": live_count, "energy": energy_count},
"_debug": debug_info,
}
)
@app.route("/api/clear_performance", methods=["POST"])
def clear_performance():
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
if room:
gs = room["state"]
# Clear the results dictionary
gs.performance_results.clear()
return jsonify({"status": "ok"})
@app.route("/api/action", methods=["POST"])
def do_action():
room_id = get_room_id()
session_token = request.headers.get("X-Session-Token")
with game_lock:
start_time = time.time()
try:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"}), 404
gs = room["state"]
game_mode = room["mode"]
ai_agent = room["ai_agent"]
sessions = room.get("sessions", {})
# Session Validation (Enforce Turn)
if session_token and session_token in sessions:
pid = sessions[session_token]
if pid != -1:
# Check Pending Choice Turn
p_choices = getattr(gs, "pending_choices", [])
if p_choices:
# Handle both Rust (str, str) and Python (str, dict) formats
params = p_choices[0][1]
if isinstance(params, str):
# Rust format: parse JSON
try:
params = json.loads(params)
except:
params = {}
choice_pid = params.get("player_id", gs.current_player)
if choice_pid != pid:
return jsonify(
{"success": False, "error": f"Not your turn to choose (Waiting for P{choice_pid})"}
), 403
# Check Main Turn
elif gs.current_player != pid:
return jsonify(
{"success": False, "error": f"Not your turn (Waiting for P{gs.current_player})"}
), 403
data = request.json
action_id = data.get("action_id", 0)
force = data.get("force", False)
legal_mask = gs.get_legal_actions()
# Validate Action
if not (0 <= action_id < len(legal_mask)):
return jsonify({"success": False, "error": "Invalid action ID"}), 400
# Enforce Perspective/Active Player consistency in PvP
requester_idx = get_player_idx()
if game_mode == "pvp":
if requester_idx != gs.current_player:
return jsonify(
{"success": False, "error": f"Not your turn! It's P{gs.current_player + 1}'s turn."}
), 403
elif game_mode == "pve":
# In PvE, if it's AI turn (P1), don't allow manual action from UI
if gs.current_player == 1:
return jsonify({"success": False, "error": "AI is playing, please wait."}), 403
is_legal = legal_mask[action_id]
if force or is_legal:
# Step 1: Execute User Action
res = gs.step(action_id)
if res is not None:
room["state"] = res
gs = res
# Step 2: Auto-Advance & AI Handling
max_safety = 50
while not gs.is_terminal() and max_safety > 0:
max_safety -= 1
# A. Automatic Phases (-2=Setup, 1=Active, 2=Energy, 3=Draw, 6=Perf1, 7=Perf2, 8=LiveResult)
if gs.phase in (-2, 1, 2, 3, 6, 7, 8):
res = gs.step(0)
if res is not None:
room["state"] = res
gs = res
continue
# B. AI Turn (P1) - ONLY if PVE
if gs.current_player == 1 and game_mode == "pve":
if room.get("engine") == "rust":
gs.step_opponent_mcts(10)
else:
# Python AI
aid = ai_agent.choose_action(gs, 1)
res = gs.step(aid)
if res is not None:
room["state"] = res
gs = res
continue
break
viewer_idx = get_player_idx()
duration = time.time() - start_time
print(f"[PERF] /api/action took {duration:.3f}s (Action: {action_id})")
return jsonify(
{
"success": True,
"state": rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=game_mode),
}
)
else:
return jsonify({"success": False, "error": f"Illegal action {action_id}"}), 400
except Exception as e:
import traceback
traceback.print_exc()
# Auto-report issue on crash
try:
report_dir = os.path.join(CURRENT_DIR, "reports")
os.makedirs(report_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
crash_file = os.path.join(report_dir, f"crash_{timestamp}.json")
try:
if room is not None and room.get("engine") == "rust":
serialized_state = rust_serializer.serialize_state(
gs, viewer_idx=get_player_idx(), mode=game_mode
)
else:
serialized_state = serialize_state(
gs, viewer_idx=get_player_idx(), is_pvp=(game_mode == "pvp"), mode=game_mode
)
with open(crash_file, "w", encoding="utf-8") as f:
# Use app.json.dumps to handle Numpy types
f.write(
app.json.dumps(
{
"error": str(e),
"trace": traceback.format_exc(),
"state": serialized_state,
}
)
)
except Exception as inner_e:
# Fallback if serialization fails
with open(crash_file, "w", encoding="utf-8") as f:
f.write(
app.json.dumps(
{"error": str(e), "trace": traceback.format_exc(), "serialization_error": str(inner_e)}
)
)
except:
pass
return jsonify({"success": False, "error": str(e), "trace": traceback.format_exc()}), 500
@app.route("/")
def index_route():
return send_from_directory(app.static_folder, "index.html")
@app.route("/<path:path>")
def static_proxy(path):
return send_from_directory(app.static_folder, path)
@app.route("/api/exec", methods=["POST"])
def god_mode():
room_id = get_room_id()
code = request.json.get("code", "")
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"})
gs = room["state"]
try:
p = gs.active_player
exec(code, {"state": gs, "p": p, "np": np})
return jsonify({"success": True, "state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"))})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/reset", methods=["POST"])
def reset():
room_id = get_room_id()
with game_lock:
data = request.json or {}
deck_type = data.get("deck_type", "normal")
# Allow changing mode on reset
new_mode = data.get("mode") # Optional
# Check if room exists to preserve existing params if not specified
old_room = ROOMS.get(room_id)
mode = new_mode if new_mode else (old_room["mode"] if old_room else "pve")
ROOMS[room_id] = create_room_internal(room_id, mode, deck_type)
room = ROOMS[room_id]
# Check for custom decks and apply them if they exist for this room
if old_room and "custom_decks" in old_room:
room["custom_decks"] = old_room["custom_decks"]
# Preserve sessions
if old_room and "sessions" in old_room:
room["sessions"] = old_room["sessions"]
if deck_type == "custom":
# Apply custom decks to the fresh state
gs = room["state"]
for pid in [0, 1]:
cdeck = room["custom_decks"].get(pid)
if cdeck and cdeck["main"]:
gs.players[pid].main_deck = convert_deck_strings_to_ids(cdeck["main"])
random.shuffle(gs.players[pid].main_deck)
# Re-draw hand?
gs.players[pid].hand = []
gs.players[pid].hand_added_turn = []
for _ in range(6):
if gs.players[pid].main_deck:
gs.players[pid].hand.append(gs.players[pid].main_deck.pop())
gs.players[pid].hand_added_turn.append(0)
if cdeck and cdeck["energy"]:
# Re-fill energy
gs.players[pid].energy_deck = convert_deck_strings_to_ids(cdeck["energy"])
gs.players[pid].energy_zone = []
for _ in range(3):
if gs.players[pid].energy_deck:
gs.players[pid].energy_zone.append(gs.players[pid].energy_deck.pop(0))
gs = room["state"]
game_mode = room["mode"]
# Auto-advance (AI goes first or Init steps)
max_safety = 100
while not gs.is_terminal() and max_safety > 0:
max_safety -= 1
# Automatic phases
if gs.phase in (-2, 1, 2, 3, 6, 7, 8):
gs.step(0)
continue
# AI Turn (P1)
if gs.current_player == 1 and game_mode == "pve":
gs.step_opponent_mcts(10)
continue
break # P0 turn or user input needed
return jsonify({"success": True, "state": rust_serializer.serialize_state(gs, mode=game_mode)})
@app.route("/api/ai_suggest", methods=["POST"])
def ai_suggest():
room_id = get_room_id()
data = request.json or {}
sims = data.get("sims", 10)
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"error": "Room not found"}), 404
gs = room["state"]
# Only run if not terminal
if gs.is_terminal():
return jsonify({"suggestions": []})
stats = gs.get_mcts_suggestions(sims)
# Shim for get_action_desc
class RustShim:
def __init__(self, gs):
self.phase = gs.phase
self.current_player = gs.current_player
self.active_player = gs.get_player(gs.current_player)
self.member_db = member_db
self.live_db = live_db
self.pending_choices = [] # TODO: expose from rust if needed
shim = RustShim(gs)
# Enrich stats with descriptions
enriched = []
for action, value, visits in stats:
desc = get_action_desc(action, shim)
enriched.append({"action_id": action, "value": float(value), "visits": int(visits), "desc": desc})
return jsonify({"success": True, "suggestions": enriched})
@app.route("/api/replays", methods=["GET"])
def list_replays():
# 1. Root replays
try:
if os.path.exists("replays"):
for f in os.listdir("replays"):
if f.endswith(".json") and os.path.isfile(os.path.join("replays", f)):
replays.append({"filename": f, "folder": ""})
# 2. Tournament subfolder
tourney_dir = os.path.join("replays", "tournament")
if os.path.exists(tourney_dir):
for f in os.listdir(tourney_dir):
if f.endswith(".json"):
# We need to handle pathing. The frontend might expect just filename.
# But get_replay takes "filename".
# We should probably update get_replay to handle subpaths or encode it.
# For now let's just use the relative path as the filename
replays.append({"filename": f"tournament/{f}", "folder": "tournament"})
except Exception as e:
print(f"Error listing replays: {e}")
return jsonify({"success": False, "error": str(e)})
# Sort by filename desc (usually timestamp)
replays.sort(key=lambda x: x["filename"], reverse=True)
return jsonify({"success": True, "replays": replays})
def get_replay(filename):
"""Serve replay JSON files"""
replay_path = f"replays/{filename}"
if os.path.exists(replay_path):
with open(replay_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Auto-inflate if it's an optimized replay
if "registry" in data and "states" in data:
print(f"Inflating optimized replay: {filename}")
inflated_states = inflate_history(data, member_db, live_db, energy_db)
# Reconstruct standard format
data["states"] = inflated_states
# Remove registry to avoid confusing frontend if it doesn't expect it
del data["registry"]
return jsonify(data)
return jsonify({"error": "Replay not found"}), 404
@app.route("/api/advance", methods=["POST"])
def advance():
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"}), 404
gs = room["state"]
ai_agent = room["ai_agent"]
# Run auto-advance loop
max_safety = 50
while not gs.is_terminal() and max_safety > 0:
max_safety -= 1
# Advance if in an automatic phase (AND no choices pending)
if not gs.pending_choices and gs.phase in (
Phase.ACTIVE,
Phase.ENERGY,
Phase.DRAW,
Phase.PERFORMANCE_P1,
Phase.PERFORMANCE_P2,
):
gs = gs.step(0)
room["state"] = gs
continue
# Determine who should act (Check pending choices first)
next_actor = gs.current_player
if gs.pending_choices:
# Handle both Rust (str, str) and Python (str, dict) formats
params = gs.pending_choices[0][1]
if isinstance(params, str):
try:
params = json.loads(params)
except:
params = {}
next_actor = params.get("player_id", gs.current_player)
# If it's the AI's turn (P1) or the AI has a pending choice, let it act immediately
if next_actor == 1 and not gs.is_terminal():
aid = ai_agent.choose_action(gs, 1)
gs = gs.step(aid)
room["state"] = gs
continue
break
return jsonify(
{
"success": True,
"state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"), mode=room["mode"]),
}
)
@app.route("/api/full_log", methods=["GET"])
def get_full_log():
"""Return the complete rule log without truncation."""
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"log": [], "total_entries": 0})
gs = room["state"]
return jsonify({"log": gs.rule_log, "total_entries": len(gs.rule_log)})
@app.route("/api/set_ai", methods=["POST"])
def set_ai():
room_id = get_room_id()
data = request.json
mode = data.get("ai_mode", "smart")
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"})
if mode == "random":
room["ai_agent"] = RandomAgent()
elif mode == "smart":
room["ai_agent"] = SmartAgent()
else:
return jsonify({"success": False, "error": f"Unknown AI mode: {mode}"})
return jsonify({"success": True, "mode": mode})
@app.route("/api/report_issue", methods=["POST"])
def report_issue():
"""Save the current game state and user explanation to a report file."""
try:
room_id = get_room_id()
room = get_room(room_id)
gs = room["state"] if room else None
data = request.json
explanation = data.get("explanation", "")
# We can take the current state from the request or just use our global game_state
# Providing it in the request is safer if the user is looking at a specific frame (e.g. in replay mode)
# But for now, let's use the provided state if it exists, otherwise capture the current one.
if room and room.get("engine") == "rust":
serialized = rust_serializer.serialize_state(gs, viewer_idx=0, mode=room.get("mode", "pve"))
else:
serialized = serialize_state(gs, is_pvp=(room["mode"] == "pvp" if room else False))
state_to_save = data.get("state") or serialized
history = data.get("history", [])
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("reports", exist_ok=True)
filename = f"reports/report_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(
{
"timestamp": timestamp,
"explanation": explanation,
"state": state_to_save,
"history": history,
"performance_history": state_to_save.get("performance_history", []),
"performance_results": state_to_save.get("performance_results", {}),
"action_desc": get_action_desc(state_to_save.get("last_action", 0), gs)
if gs and "last_action" in state_to_save
else "N/A",
},
f,
indent=2,
ensure_ascii=False,
)
return jsonify({"success": True, "filename": filename})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": str(e)}), 500
def generate_random_deck_list(member_db, live_db) -> list[str]:
"Generate a valid random deck list (card_no strings)."
deck = []
# 1. Select Members (48)
available_members = [c.card_no for c in member_db.values()]
if available_members:
member_bucket = []
for m_no in available_members:
member_bucket.extend([m_no] * 4)
random.shuffle(member_bucket)
while len(member_bucket) < 48:
member_bucket.extend(available_members)
deck.extend(member_bucket[:48])
# 2. Select Lives (12)
available_lives = [c.card_no for c in live_db.values()]
if available_lives:
live_bucket = []
for l_no in available_lives:
live_bucket.extend([l_no] * 4)
random.shuffle(live_bucket)
while len(live_bucket) < 12:
live_bucket.extend(available_lives)
deck.extend(live_bucket[:12])
return deck
@app.route("/api/get_random_deck", methods=["GET"])
def get_random_deck_api():
global member_db, live_db
deck_list = generate_random_deck_list(member_db, live_db)
return jsonify(
{"success": True, "content": deck_list, "message": f"Generated Random Deck ({len(deck_list)} cards)"}
)
@app.route("/api/presets", methods=["GET"])
def get_presets():
"""Return list of preset decks from tests/presets.json."""
try:
preset_path = os.path.join(CURRENT_DIR, "..", "tests", "presets.json")
if os.path.exists(preset_path):
with open(preset_path, "r", encoding="utf-8") as f:
data = json.load(f)
return jsonify({"success": True, "presets": data})
return jsonify({"success": False, "error": "presets.json not found", "presets": []})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
if __name__ == "__main__":
# PyInstaller Bundle Check
if getattr(sys, "frozen", False):
# If frozen, we might need to adjust static folder or templates folder depending on how flask finds them.
# However, we added paths with --add-data, so they should be in sys._MEIPASS.
# Flask's root_path defaults to __main__ directory, which in onefile mode is temporary.
# We need to explicitly point static_folder to the MEIPASS location.
bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore
app.static_folder = os.path.join(bundle_dir, "web_ui")
# app.template_folder = os.path.join(bundle_dir, 'templates') # if we used templates
# Also need to make sure data loader finds 'data/cards.json'
# CardDataLoader expects relative path. We might need to chdir or patch it.
# Easiest is to chdir to the bundle dir so relative paths work?
# BUT 'replays' need to be written to writable cwd, not temp dir.
# So we should NOT chdir globally.
# Instead, we should update filenames to be absolute paths based on bundle_dir if read-only.
# Monkey patch the loader path just for this instance if needed,
# but CardDataLoader takes a path arg.
# We need to ensure 'init_game' calls it with the correct absolute path.
pass
# Patched init_game for Frozen state to find data
original_init_game = init_game
def frozen_init_game(deck_type="normal"):
if getattr(sys, "frozen", False):
bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore
os.path.join(bundle_dir, "data", "cards.json")
# We need to temporarily force the loader to use this path
# But init_game hardcodes "data/cards.json" in correct logic?
# actually checking init_game source:
# loader = CardDataLoader("data/cards.json")
# We need to change that line or intercept.
# Use os.chdir to temp dir for READS? No, we need writes to real dir.
# Best way: Just ensure data/cards.json exists in CWD? No, user won't have it.
# HACK: We can't easily change the hardcoded string inside init_game without rewriting it.
# However, we can patch CardDataLoader class to fix the path!
# Assuming CardDataLoader is imported from engine.game.data_loader
from engine.game.data_loader import CardDataLoader
ops_init = CardDataLoader.__init__
def new_init(self, filepath):
if not os.path.exists(filepath) and getattr(sys, "frozen", False):
# Try bundle path
bundle_path = os.path.join(sys._MEIPASS, filepath) # type: ignore
if os.path.exists(bundle_path):
filepath = bundle_path
ops_init(self, filepath)
CardDataLoader.__init__ = new_init # type: ignore[method-assign]
original_init_game(deck_type)
init_game = frozen_init_game
# Run Server
# use_reloader=False is crucial for PyInstaller to implicit avoid spawning subprocesses incorrectly
port = int(os.environ.get("PORT", 8000))
# Auto-open browser
import webbrowser
from threading import Timer
def open_browser():
webbrowser.open_new(f"http://localhost:{port}/")
if not getattr(sys, "frozen", False) or os.environ.get("OPEN_BROWSER", "true").lower() == "true":
Timer(1.5, open_browser).start()
# Start Background Game Loop
if game_thread is None:
game_thread = threading.Thread(target=background_game_loop, daemon=True)
game_thread.start()
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
# In production/container, usually don't want debug mode
debug_mode = os.environ.get("FLASK_DEBUG", "True").lower() == "true"
app.run(host="0.0.0.0", port=port, debug=debug_mode)