Spaces:
Running
Running
File size: 71,546 Bytes
b872b7e b30643d b872b7e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 |
"""
Flask Backend for Love Live Card Game Web UI
"""
import json
import os
import random
import sys
import threading
import uuid
from datetime import datetime
from typing import Any
import numpy as np
from flask import Flask, jsonify, request, send_from_directory
from flask.json.provider import DefaultJSONProvider
# Ensure project root is in sys.path for absolute imports
if getattr(sys, "frozen", False):
PROJECT_ROOT = sys._MEIPASS # type: ignore
CURRENT_DIR = os.path.join(PROJECT_ROOT, "backend")
else:
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, ".."))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
# Rust Engine
import engine_rust
try:
from ai.headless_runner import RandomAgent, create_easy_cards
from ai.headless_runner import SmartHeuristicAgent as SmartAgent
AI_AVAILABLE = True
except ImportError:
print("Warning: AI modules not found. AI features will be disabled.")
AI_AVAILABLE = False
class RandomAgent: pass
class SmartAgent: pass
def create_easy_cards(): return None, None
from engine.game.data_loader import CardDataLoader
from engine.game.desc_utils import get_action_desc
from engine.game.enums import Phase
from engine.game.game_state import GameState
from engine.game.replay_manager import inflate_history, optimize_history
from engine.game.serializer import serialize_state
from engine.game.state_utils import create_uid
try:
from rust_serializer import RustGameStateSerializer
except ImportError:
from backend.rust_serializer import RustGameStateSerializer
INSTANCE_SHIFT = 20
BASE_ID_MASK = 0xFFFFF
# --- MODULE DIRECTORIES ---
ENGINE_DIR = os.path.join(PROJECT_ROOT, "engine")
AI_DIR = os.path.join(PROJECT_ROOT, "ai")
TOOLS_DIR = os.path.join(PROJECT_ROOT, "tools")
DATA_DIR = os.path.join(PROJECT_ROOT, "data")
# Tools imports (optional)
try:
from tools.deck_extractor import extract_deck_data
except ImportError:
print("Warning: Could not import deck_extractor from tools.")
def extract_deck_data(content, db):
return [], [], {}, ["Importer not found"]
# Static folder is now in frontend/web_ui
FRONTEND_DIR = os.path.join(PROJECT_ROOT, "frontend")
WEB_UI_DIR = os.path.join(FRONTEND_DIR, "web_ui")
IMG_DIR = os.path.join(FRONTEND_DIR, "img") # Images seem to be in frontend/img
# Note: frontend/web_ui has its own js/css folders which index.html likely uses
app = Flask(__name__, static_folder=WEB_UI_DIR)
class NumpyJSONProvider(DefaultJSONProvider):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
app.json = NumpyJSONProvider(app)
@app.route("/img/<path:filename>")
def serve_img(filename):
# Sanitize and normalize the filename
filename = filename.replace("\\", "/").lstrip("/")
# Check if this is a card image request
if filename.startswith("cards/") or filename.startswith("cards_webp/"):
# Remove old nested 'cards/' prefix if it's there
pure_filename = os.path.basename(filename)
webp_path = os.path.join(IMG_DIR, "cards_webp", pure_filename)
# Priority 1: Flat WebP folder
if os.path.exists(webp_path) and os.path.isfile(webp_path):
return send_from_directory(os.path.join(IMG_DIR, "cards_webp"), pure_filename)
# Priority 2: Try falling back to original nested PNGs for backward compatibility/backup
# (This is mostly for non-compiled access or manual links)
pass
# Define possible search directories relative to PROJECT_ROOT
search_dirs = [
os.path.join(IMG_DIR, "cards_webp"), # Flattened WebP first
IMG_DIR, # frontend/img
os.path.join(IMG_DIR, "texticon"), # frontend/img/texticon
os.path.join(WEB_UI_DIR, "img"), # frontend/web_ui/img
FRONTEND_DIR # Allow direct frontend access if needed
]
for base_dir in search_dirs:
full_path = os.path.join(base_dir, filename)
if os.path.exists(full_path) and os.path.isfile(full_path):
return send_from_directory(base_dir, filename)
# Fallback for .webp requesting .png or vice-versa
if filename.endswith(".webp"):
png_fallback = filename[:-5] + ".png"
full_png_path = os.path.join(base_dir, png_fallback)
if os.path.exists(full_png_path) and os.path.isfile(full_png_path):
return send_from_directory(base_dir, png_fallback)
# Extra fallback for common icons if they are misplaced
if filename == "icon_blade.png" or "icon_blade" in filename:
# Try to find it anywhere in frontend/img
for root, dirs, files in os.walk(IMG_DIR):
if "icon_blade.png" in files:
return send_from_directory(root, "icon_blade.png")
print(f"DEBUG_IMG_404: Could not find {filename} in {search_dirs}")
return "Image not found", 404
@app.route("/icon_blade.png")
def serve_icon_root():
return serve_img("icon_blade.png")
# ai_agent = SmartHeuristicAgent()
ai_agent = SmartAgent() # Use original heuristic AI
# Global game state
# Room Registry
ROOMS: dict[str, dict[str, Any]] = {}
game_lock = threading.Lock()
# Rust Card DB (Global Singleton for performance)
RUST_DB = None
try:
compiled_data_path = os.path.join(DATA_DIR, "cards_compiled.json")
with open(compiled_data_path, "r", encoding="utf-8") as f:
RUST_DB = engine_rust.PyCardDatabase(f.read())
except Exception as e:
print(f"Warning: Failed to load RUST_DB from {compiled_data_path}: {e}")
# Python DBs (for metadata/serialization)
member_db: dict[int, Any] = {}
live_db: dict[int, Any] = {}
energy_db: dict[int, Any] = {}
rust_serializer = None # Initialized after data load
game_history: list[dict] = [] # Global replay history (might need per-room later)
# Legacy custom deck globals (used by init_game)
custom_deck_p0: list[str] | None = None
custom_deck_p1: list[str] | None = None
custom_energy_deck_p0: list[str] | None = None
custom_energy_deck_p1: list[str] | None = None
def load_game_data():
"""Load card data into global databases."""
global member_db, live_db, energy_db, rust_serializer
try:
cards_path = os.path.join(DATA_DIR, "cards.json")
print(f"Loading card data from: {cards_path}")
loader = CardDataLoader(cards_path)
m, l, e = loader.load()
member_db.update(m)
live_db.update(l)
energy_db.update(e)
# Initialize rust_serializer
rust_serializer = RustGameStateSerializer(member_db, live_db, energy_db)
# Build mapping
build_card_no_mapping()
print(f"Data loaded: {len(member_db)} Members, {len(live_db)} Lives, {len(energy_db)} Energy")
print(f"DEBUG PATHS: PROJECT_ROOT={PROJECT_ROOT}")
print(f"DEBUG PATHS: FRONTEND_DIR={FRONTEND_DIR}")
print(f"DEBUG PATHS: WEB_UI_DIR={WEB_UI_DIR}")
print(f"DEBUG PATHS: IMG_DIR={IMG_DIR}")
except Exception as ex:
print(f"CRITICAL ERROR loading card data: {ex}")
import sys
sys.exit(1)
# Load data immediately on import
def get_room_id() -> str:
"""Extract room_id from request header or query param."""
# Priority: Header > Query Param > Default "SINGLE_PLAYER"
rid = request.headers.get("X-Room-Id") or request.args.get("room_id")
if not rid:
# Debug why no ID found
# print(f"DEBUG: No X-Room-Id or room_id param. Headers: {request.headers}", file=sys.stderr)
rid = "SINGLE_PLAYER"
return rid
def get_player_idx():
"""Extract player perspective from X-Player-Idx header or viewer query param."""
# Try query param 'viewer' first (commonly used by frontend)
viewer = request.args.get("viewer")
if viewer is not None:
try:
return int(viewer)
except (ValueError, TypeError):
pass
# Fallback to header
try:
return int(request.headers.get("X-Player-Idx", 0))
except (ValueError, TypeError):
return 0
def get_room(room_id: str) -> dict[str, Any] | None:
"""Get room data safely."""
with game_lock:
room = ROOMS.get(room_id)
if room:
room["last_active"] = datetime.now()
return room
# Reverse mapping: card_no string -> internal integer ID
card_no_to_id: dict[str, int] = {}
def build_card_no_mapping():
"""Build reverse lookup from card_no string to internal ID using compiled data.
Ensures consistency with the Rust engine's internal ID assignments.
"""
global card_no_to_id
card_no_to_id = {}
try:
compiled_path = os.path.join(DATA_DIR, "cards_compiled.json")
if not os.path.exists(compiled_path):
print(f"Warning: {compiled_path} not found. Mapping will be empty.")
return
with open(compiled_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Build mapping from dbs
count = 0
for db_name in ["member_db", "live_db", "energy_db"]:
db = data.get(db_name, {})
for internal_id, card_data in db.items():
card_no = card_data.get("card_no")
if card_no:
# Convert string key to integer ID
card_no_to_id[card_no] = int(internal_id)
count += 1
print(f"Built card_no_to_id mapping from compiled data: {count} entries")
except Exception as e:
print(f"Error building mapping from compiled data: {e}")
# Load data immediately on import
load_game_data()
# Initialize mapping on startup
build_card_no_mapping()
def convert_deck_strings_to_ids(deck_strings):
"""Convert list of card_no strings to internal IDs (Unique Instance IDs)."""
ids = []
counts = {}
for card_no in deck_strings:
if card_no in card_no_to_id:
base_id = card_no_to_id[card_no]
count = counts.get(base_id, 0)
uid = create_uid(base_id, count)
counts[base_id] = count + 1
ids.append(uid)
else:
print(f"Warning: Unknown card_no '{card_no}', skipping.")
return ids
def save_replay(gs: GameState | None = None):
"""Save the provided game state's history to a file."""
if gs is None or not gs.rule_log:
return
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("replays", exist_ok=True)
filename = f"replays/replay_{timestamp}.json"
filename_opt = f"replays/replay_{timestamp}_opt.json"
# Use historical states from rule_log or history if we maintain one
# For now, we assume GS has what we need or we pass history
history = [] # In this engine, standard replays are often built from logs or incremental states
# 1. Save Standard Replay (Compatible)
data = {
"game_id": 0,
"timestamp": timestamp,
"winner": gs.winner if gs else -1,
"states": history,
}
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
print(f"Replay saved to {filename}")
# 2. Save Optimized Replay (Dict Encoded)
try:
print("Optimizing replay...")
# Gather Level 3 Context
deck_info = None
if gs:
deck_info = {
"p0_deck": list(getattr(gs.players[0], "initial_deck_indices", [])),
"p1_deck": list(getattr(gs.players[1], "initial_deck_indices", [])),
}
opt_data = optimize_history(
history,
member_db,
live_db,
energy_db,
exclude_db_cards=True,
# seed=current_seed,
# action_log=action_log,
deck_info=deck_info,
)
final_opt = {
"game_id": 0,
"timestamp": timestamp,
"winner": gs.winner if gs else -1,
}
# Merge optimization data
if "level" in opt_data and opt_data["level"] == 3:
final_opt.update(opt_data) # seed, decks, action_log
print("Level 3 Optimization Active (Action Log)")
else:
final_opt["states"] = opt_data["states"]
with open(filename_opt, "w", encoding="utf-8") as f:
json.dump(final_opt, f, ensure_ascii=False)
# Calculate savings
size_std = os.path.getsize(filename)
size_opt = os.path.getsize(filename_opt)
savings = (1 - size_opt / size_std) * 100
print(f"Optimized replay saved to {filename_opt}")
print(f"Compression: {size_std / 1024:.1f}KB -> {size_opt / 1024:.1f}KB ({savings:.1f}% savings)")
except Exception as e:
print(f"Failed to save optimized replay: {e}")
import traceback
traceback.print_exc()
except Exception as e:
print(f"Failed to save replay: {e}")
game_history = [] # For replay recording
action_log = [] # For action-based replay
current_seed = 0 # For deterministic replay
def init_game(deck_type="normal"):
global game_state, member_db, live_db, energy_db, game_history, current_seed, action_log
# Ensure true randomness for each game
import time
real_seed = int(time.time() * 1000) % (2**31)
current_seed = real_seed
random.seed(real_seed)
# Store action history separately for Level 3 Replay
global action_log
action_log = []
# DATA PATH: data/cards.json
cards_path = os.path.join(DATA_DIR, "cards.json")
loader = CardDataLoader(cards_path)
member_db, live_db, energy_db = loader.load()
# CRITICAL: Populate GameState static DBs so validations work
# Use initialize_class_db to ensure proper wrapping with MaskedDB
GameState.initialize_class_db(member_db, live_db)
GameState.energy_db = energy_db
# Initialize JIT arrays for performance
GameState._init_jit_arrays()
# Build reverse mapping for custom deck support
build_card_no_mapping()
# Pre-calculate Start Deck card IDs
# Load raw JSON to check product field for filtering
cards_path = os.path.join(DATA_DIR, "cards.json")
with open(cards_path, "r", encoding="utf-8") as f:
json.load(f)
for _cid, _m in member_db.items():
# Find raw key by matching name/cost/type? Or better, DataLoader should store product.
# Since DataLoader doesn't verify product yet, we'll try to guess or just use ALL valid cards
# that are from Start Deck (usually ID < 100 for this mock loader or by string ID).
# Actually, let's just use ALL loaded members/lives for 'normal' and specific ones for 'starter'.
# For 'start_deck', we can filter by card string ID prefix 'PL!-sd1' or 'LL-E'.
# But 'member_db' keys are integers 0..N. We need a way to link back.
# The loader assigns IDs sequentially.
# Let's just build a random valid deck from ALL cards for now,
# unless 'easy' mode.
pass
# If deck_type is 'easy', we use the simple mock cards for logic testing.
# If deck_type is 'normal' or 'starter', we use REAL cards.
if deck_type == "easy":
easy_m, easy_l = create_easy_cards()
member_db[easy_m.card_id] = easy_m
live_db[easy_l.card_id] = easy_l
game_state = GameState()
# Setup players
for pidx, p in enumerate(game_state.players):
# Check for custom deck first
custom_deck = custom_deck_p0 if pidx == 0 else custom_deck_p1
if custom_deck:
# Use custom deck
p.main_deck = convert_deck_strings_to_ids(custom_deck)
random.shuffle(p.main_deck) # Shuffle custom deck for variety
print(f"Player {pidx}: Using custom deck ({len(p.main_deck)} cards, shuffled)")
elif deck_type == "easy":
# Use Easy Cards (888/999) but mapped to real images
p.main_deck = [888] * 48 + [999] * 12
else:
# NORMAL / STARTER MODE: Build a valid deck
# Rule: Max 4 copies of same card number.
# Total: 48 Members + 12 Lives (Total 60 in main deck per game_state spec)
p.main_deck = []
# 1. Select Members (48)
available_members = list(member_db.keys())
if available_members:
# Shuffle availability to vary decks
random.shuffle(available_members)
member_bucket = []
for mid in available_members:
# Add 4 copies of each until we have enough
# Use create_uid for unique instance IDs
for i in range(4):
uid = create_uid(mid, i)
member_bucket.append(uid)
if len(member_bucket) >= 150: # Optimization: Don't build massive list
break
# Pick 48 from the bucket
if len(member_bucket) < 48:
# Fallback if DB too small
while len(member_bucket) < 48:
member_bucket.extend(available_members)
# Ensure we don't accidentally pick >4 if we just slice
# Actually, simply taking the first 48 from our constructed bucket (which has 4 of each distinct card)
# guarantees validity if we shuffle the CARDS/TYPES, not the final list.
# Steps:
# 1. Shuffle types.
# 2. Add 4 of Type A, 4 of Type B...
# 3. Take first 48 cards.
p.main_deck.extend(member_bucket[:48])
# 2. Select Lives (12)
available_lives = list(live_db.keys())
if available_lives:
random.shuffle(available_lives)
live_bucket = []
for lid in available_lives:
# live_bucket.extend([lid] * 4)
for i in range(4):
uid = create_uid(lid, i)
live_bucket.append(uid)
if len(live_bucket) >= 50:
break
if len(live_bucket) < 12:
while len(live_bucket) < 12:
live_bucket.extend(available_lives)
p.main_deck.extend(live_bucket[:12])
random.shuffle(p.main_deck)
# Energy Deck (12 cards)
# Use actual Energy Card ID if available (2000+)
if energy_db:
eid = list(energy_db.keys())[0] # Take first energy card type found
p.energy_deck = [eid] * 12
else:
p.energy_deck = [40000] * 12 # Fallback
# Custom Energy Deck Override
custom_energy = custom_energy_deck_p0 if pidx == 0 else custom_energy_deck_p1
if custom_energy:
p.energy_deck = convert_deck_strings_to_ids(custom_energy)
print(f"Player {pidx}: Using custom energy deck ({len(p.energy_deck)} cards)")
# Explicit shuffle before drawing
random.shuffle(p.main_deck)
if game_state.players.index(p) == 0:
print(f"DEBUG: P0 Deck Shuffled. Top 5: {p.main_deck[-5:]}")
# Initial draw (6 cards - standard Mulligan start)
for _ in range(6):
if p.main_deck:
p.hand.append(p.main_deck.pop())
p.hand_added_turn.append(game_state.turn_number)
# Initial energy: 3 cards (Rule 6.2.1.7)
for _ in range(3):
if p.energy_deck:
p.energy_zone.append(p.energy_deck.pop(0))
# Randomly determine first player
game_state.first_player = random.randint(0, 1)
# For Mulligan Phase (P1/Index 0), Current Player MUST be 0
# The 'first_player' variable determines who acts first in ACTIVE phase (Round 1)
game_state.current_player = 0
# Start in MULLIGAN phase
game_state.phase = Phase.MULLIGAN_P1
def create_room_internal(
room_id: str,
mode: str = "pve",
deck_type: str = "normal",
public: bool = False,
custom_decks: dict = None,
) -> dict[str, Any]:
"""Helper to initialize a room using the RUST engine."""
print(
f"DEBUG: Creating Rust Room {room_id} (Mode: {mode}, Deck: {deck_type}, Public: {public}, CustomDecks: {bool(custom_decks)})"
)
if RUST_DB is None:
raise Exception("RUST_DB not initialized")
gs = engine_rust.PyGameState(RUST_DB)
# helper for deck generation
def get_random_decks():
m_ids = list(member_db.keys())
l_ids = list(live_db.keys())
random.shuffle(m_ids)
random.shuffle(l_ids)
main_ids = []
for mid in m_ids[:15]:
main_ids.extend([mid] * 4)
for lid in l_ids[:4]:
main_ids.extend([lid] * 4)
random.shuffle(main_ids)
e_deck = [list(energy_db.keys())[0]] * 12 if energy_db else [40000] * 12
l_deck = l_ids[
:3
] # Actually the Rust engine treats lives as a separate param in some versions or part of deck?
# Checked engine_rust/src/py_bindings.rs: initialize_game needs p0_deck, p1_deck, p0_energy, p1_energy, p0_lives, p1_lives
return main_ids[:60], e_deck, l_ids[:3]
# Defaults
p0_m, p0_e, p0_l = get_random_decks()
p1_m, p1_e, p1_l = get_random_decks()
# Override with custom decks if provided
final_custom_decks = {0: {"main": [], "energy": []}, 1: {"main": [], "energy": []}}
if custom_decks:
final_custom_decks.update(custom_decks)
for pid in [0, 1]:
cdeck = custom_decks.get(str(pid)) or custom_decks.get(pid)
if cdeck and cdeck.get("main"):
# Convert strings to IDs
main_ids = convert_deck_strings_to_ids(cdeck["main"])
random.shuffle(main_ids)
# Extract Live cards for the initial Live Zone (3 cards)
# Note: cid is a UID, so we must mask it to compare with live_db keys
live_ids = [cid for cid in main_ids if (cid & BASE_ID_MASK) in live_db]
if len(main_ids) > 0:
if pid == 0:
p0_m = main_ids
if len(live_ids) >= 3:
p0_l = live_ids[:3] # Pick first 3 as starting lives
elif len(live_ids) > 0:
p0_l = live_ids # Use whatever lives are available
else:
p1_m = main_ids
if len(live_ids) >= 3:
p1_l = live_ids[:3]
elif len(live_ids) > 0:
p1_l = live_ids
# Energy
if cdeck.get("energy"):
e_ids = convert_deck_strings_to_ids(cdeck["energy"])
if pid == 0:
p0_e = e_ids
else:
p1_e = e_ids
# Warning: We are not extracting initial lives from main deck for p0_l/p1_l if custom.
# The engine probably draws them?
# If `p0_l` is required, we should pick random 3 from lives in deck or DB?
# For now, let's keep random lives for the Live Zone if not specified, or just reuse random ones.
gs.initialize_game(p0_m, p1_m, p0_e, p1_e, p0_l, p1_l)
return {
"state": gs,
"mode": mode,
"public": public,
"created_at": datetime.now(),
"last_active": datetime.now(),
"ai_agent": None, # MCTS is built-in
"custom_decks": final_custom_decks,
"sessions": {},
"engine": "rust",
}
def join_room_logic(room_id: str) -> dict[str, Any]:
"""
Logic to add a user session to a room.
Returns {"session_id": str, "player_id": int}
"""
if room_id not in ROOMS:
return {"error": "Room not found"}
room = ROOMS[room_id]
sessions = room["sessions"]
# Simple assignment logic:
# If 0 is free, take 0.
# If 1 is free, take 1.
# Else, maybe return spectator? For now, just return -1 or error.
# Check current players
taken_pids = set(sessions.values())
new_pid = -1
if 0 not in taken_pids:
new_pid = 0
elif 1 not in taken_pids:
new_pid = 1
else:
# Both full. Spectator?
new_pid = -1
# For spectator, maybe we still give a session but with pid -1?
session_id = str(uuid.uuid4())
sessions[session_id] = new_pid
return {"session_id": session_id, "player_id": new_pid}
# --- ROOM MANAGEMENT API ---
@app.route("/api/rooms/create", methods=["POST"])
def create_new_room():
print("DEBUG: Entered create_new_room endpoint", file=sys.stderr)
try:
data = request.json or {}
except Exception as e:
print(f"DEBUG: Failed to parse JSON: {e}", file=sys.stderr)
data = {}
mode = data.get("mode", "pve")
is_public = data.get("public", False)
custom_decks = data.get("decks", None) # Optional initial decks
# Generate 4-char code
import string
chars = string.ascii_uppercase + string.digits
while True:
room_id = "".join(random.choices(chars, k=4))
if room_id not in ROOMS:
break
print(f"DEBUG: Generated room_id {room_id}, acquiring lock...", file=sys.stderr)
res = {}
with game_lock:
print("DEBUG: Lock acquired. Creating room internal...", file=sys.stderr)
ROOMS[room_id] = create_room_internal(room_id, mode, public=is_public, custom_decks=custom_decks)
print("DEBUG: Room created internally. Joining creator...", file=sys.stderr)
# Auto-join creator
join_res = join_room_logic(room_id)
print("DEBUG: Returning response.", file=sys.stderr)
return jsonify({"success": True, "room_id": room_id, "mode": mode, "session": join_res})
@app.route("/api/rooms/list", methods=["GET"])
def list_public_rooms():
"""Return a list of public rooms."""
public_rooms = []
with game_lock:
for rid, room in ROOMS.items():
if room.get("public", False):
# Calculate player count
sessions = room.get("sessions", {})
player_count = len(set(sessions.values())) # Approximate, might need better logic if spectators exist
# Or just count occupied slots (0 and 1)
occupied_slots = 0
taken_pids = set(sessions.values())
if 0 in taken_pids:
occupied_slots += 1
if 1 in taken_pids:
occupied_slots += 1
# Basic Info
gs = room.get("state")
turn = gs.turn_number if gs else 0
phase = str(gs.phase) if gs else "?"
public_rooms.append(
{
"room_id": rid,
"mode": room.get("mode", "pve"),
"players": occupied_slots,
"turn": turn,
"phase": phase,
"created_at": room.get("created_at", datetime.now()).isoformat(),
}
)
# Sort by creation time desc
public_rooms.sort(key=lambda x: x["created_at"], reverse=True)
return jsonify({"success": True, "rooms": public_rooms})
@app.route("/api/rooms/join", methods=["POST"])
def join_room():
print("DEBUG: Entered join_room", file=sys.stderr)
data = request.json or {}
room_id = data.get("room_id", "").upper().strip()
print(f"DEBUG: Entered join_room for ID: '{room_id}'", file=sys.stderr)
with game_lock:
if room_id in ROOMS:
mode = ROOMS[room_id]["mode"]
print(f"DEBUG: Found room {room_id}, mode={mode}", file=sys.stderr)
# Assign a session/seat to the joining player
join_res = join_room_logic(room_id)
if "error" in join_res:
return jsonify({"success": False, "error": join_res["error"]}), 400
return jsonify(
{
"success": True,
"room_id": room_id,
"mode": mode,
"session_id": join_res.get("session_id"),
"player_id": join_res.get("player_id"),
}
)
return jsonify({"success": False, "error": "Room not found"}), 404
@app.route("/")
def index():
return send_from_directory(WEB_UI_DIR, "index.html")
@app.route("/board")
def game_board():
return send_from_directory(WEB_UI_DIR, "game_board.html") # Assuming it exists there
@app.route("/js/<path:filename>")
def serve_js(filename):
return send_from_directory(os.path.join(WEB_UI_DIR, "js"), filename)
@app.route("/css/<path:filename>")
def serve_css(filename):
return send_from_directory(os.path.join(WEB_UI_DIR, "css"), filename)
@app.route("/icon_blade.png")
def serve_icon():
# If icon is in root or img, adjust. Assuming img for now or checking existence.
# Fallback to IMG_DIR or WEB_UI_DIR
return send_from_directory(IMG_DIR, "icon_blade.png")
@app.route("/deck_builder.html")
def serve_deck_builder():
return send_from_directory(WEB_UI_DIR, "deck_builder.html")
@app.route("/data/<path:filename>")
def serve_data(filename):
return send_from_directory(DATA_DIR, filename)
import threading
import time
# Threading setup
game_lock = threading.RLock() # Re-entrant lock to prevent self-deadlock
game_thread = None
def background_game_loop():
"""
Runs the game logic (AI and auto-phases) for ALL active rooms.
"""
print("Background Game Loop Started (Multi-Room)", file=sys.stderr)
while True:
try:
# print("DEBUG: Background Loop acquiring lock...", file=sys.stderr)
with game_lock:
# Iterate over a copy of keys to avoid modification issues if needed
active_room_ids = list(ROOMS.keys())
for rid in active_room_ids:
# print(f"DEBUG: Processing room {rid}...", file=sys.stderr)
room = ROOMS.get(rid)
if not room:
continue
gs = room["state"]
game_mode = room["mode"]
ai_agent = room["ai_agent"]
if not gs.is_terminal():
# 1. Auto-Advance Phases
if gs.phase in (
Phase.ACTIVE,
Phase.ENERGY,
Phase.DRAW,
Phase.PERFORMANCE_P1,
Phase.PERFORMANCE_P2,
):
# Safe attribute access for Rust engine compatibility
p_choices = getattr(gs, "pending_choices", [])
p_effects = getattr(gs, "pending_effects", [])
if not (p_choices or p_effects):
res = gs.step(0)
if res is not None:
room["state"] = res
gs = res
elif gs.current_player == 1 and game_mode == "pve":
is_continue_choice = False
if gs.pending_choices and gs.pending_choices[0][0].startswith("CONTINUE"):
is_continue_choice = True
if gs.phase == Phase.LIVE_RESULT and is_continue_choice:
# Wait for Human
pass
else:
if gs.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2):
aid = 0
res = gs.step(aid)
if res is not None:
room["state"] = res
else:
if room.get("engine") == "rust":
# Use Greedy (1-ply) AI for Rust engine in PVE to maximize responsiveness
gs.step_opponent_greedy()
else:
aid = ai_agent.choose_action(gs, 1)
res = gs.step(aid)
if res is not None:
room["state"] = res
time.sleep(0.1)
except Exception as e:
print(f"Error in game loop: {e}")
import traceback
traceback.print_exc()
time.sleep(1.0)
@app.route("/api/state")
def get_state():
room_id = get_room_id()
session_token = request.headers.get("X-Session-Token")
with game_lock:
# Development convenience: Auto-create room if missing IF it's "SINGLE_PLAYER"
if room_id == "SINGLE_PLAYER" and room_id not in ROOMS:
ROOMS[room_id] = create_room_internal(room_id)
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found or expired"}), 404
gs = room["state"]
mode = room["mode"]
viewer_idx = get_player_idx()
if room.get("engine") == "rust":
s_state = rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=mode, is_pvp=(mode == "pvp"))
else:
s_state = serialize_state(
gs,
viewer_idx=viewer_idx,
is_pvp=(mode == "pvp" and request.headers.get("X-Player-Idx") is None),
mode=mode,
)
# Meta info about decks
cdecks = room.get("custom_decks", {})
meta = {
"p0_deck_set": bool(cdecks.get(0, {}).get("main") or cdecks.get("0", {}).get("main")),
"p1_deck_set": bool(cdecks.get(1, {}).get("main") or cdecks.get("1", {}).get("main")),
"mode": mode,
}
return jsonify({"success": True, "state": s_state, "meta": meta})
@app.route("/api/set_deck", methods=["POST"])
def set_deck():
"""Accept a custom deck for a player in a specific room."""
data = request.json
player_id = data.get("player", 0)
deck_ids = data.get("deck", []) # List of card_no strings
energy_ids = data.get("energy_deck", [])
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
# For setting deck, we might want to allow it even if room doesn't exist yet?
# But conceptually, you create a room, then set deck, then reset/start.
if not room:
# Auto-create for dev workflow
ROOMS[room_id] = create_room_internal(room_id)
room = ROOMS[room_id]
room["custom_decks"][player_id] = {"main": deck_ids, "energy": energy_ids}
return jsonify(
{
"status": "ok",
"player": player_id,
"deck_size": len(deck_ids),
"message": f"Deck set for Player {player_id + 1} in Room {room_id}. Reset game to apply.",
}
)
@app.route("/api/upload_deck", methods=["POST"])
def upload_deck():
"""Accept a raw deck file content (decktest.txt style) and load it."""
data = request.json
content = data.get("content", "")
player_id = data.get("player", 0)
room_id = get_room_id()
# Parse content
try:
if content.strip().startswith("{") or content.strip().startswith("["):
# JSON format
deck_data = json.loads(content)
# Support both simple list and object
if isinstance(deck_data, list):
main_deck = deck_data
energy_deck = [] # JSON list implies only main deck usually?
elif "main" in deck_data:
main_deck = deck_data["main"]
energy_deck = deck_data.get("energy", [])
else:
return jsonify(
{"success": False, "error": "Invalid JSON deck format. Expected list or object with 'main' key."}
)
else:
# HTML/Text format
card_db = {}
try:
cards_path = os.path.join(DATA_DIR, "cards.json")
with open(cards_path, "r", encoding="utf-8") as f:
card_db = json.load(f)
except Exception as e:
return jsonify({"success": False, "error": f"Failed to load card DB for validation: {e}"})
main_deck, energy_deck, _, errors = extract_deck_data(content, card_db) # Pass DB for validation
if errors:
return jsonify({"success": False, "error": "Validation Errors:\n" + "\n".join(errors)})
except json.JSONDecodeError:
return jsonify({"success": False, "error": "Invalid JSON format."})
except Exception as e:
print(f"Deck parsing error: {e}")
return jsonify({"success": False, "error": str(e)})
if not main_deck and not energy_deck:
return jsonify({"success": False, "error": "No cards found in file."})
with game_lock:
room = get_room(room_id)
if not room:
ROOMS[room_id] = create_room_internal(room_id)
room = ROOMS[room_id]
room["custom_decks"][player_id] = {"main": main_deck, "energy": energy_deck}
# Auto-apply?
# Re-init room with "custom" logic?
# For now, let's just create a new room state using these decks immediately for convenience
# But we need to respect the loop.
# Actually existing logic calls init_game(deck_type="custom").
# We'll just trigger a reset logic manually
# This duplicates logic in reset() but scoped to this room + custom deck applied.
# For simplicity, we just store it. User must click "Reset" or we call reset internal?
# The frontend usually expects upload to just work.
pass
# Trigger Reset via API logic simulation or just return success and let caller Reset?
# Existing behavior: calls init_game("custom").
# So we should probably do the same: reset the room's state using these custom decks.
# We can reuse the create_room_internal logic if we modify it to accept custom decks directly?
# Or just rely on the room["custom_decks"] being set.
# Let's call reset internal logic here?
# Better: Update endpoints first, then we can verify flow.
# For now, we assume user clicks Reset or we simulate it.
# Actually, let's just return success. The frontend typically reloads or resets.
return jsonify(
{
"success": True,
"main_count": len(main_deck),
"energy_count": len(energy_deck),
"room_id": room_id,
"message": f"Deck Loaded! ({len(main_deck)} Main, {len(energy_deck)} Energy). Please Reset.",
}
)
@app.route("/api/get_test_deck", methods=["GET"])
def get_test_deck_api():
"""Read deck files from ai/decks/ directory and return card list."""
from engine.game.deck_utils import extract_deck_data
deck_name = request.args.get("deck", "") # Optional deck name parameter
# Path to ai/decks directory
# Use PROJECT_ROOT for reliability
ai_decks_dir = os.path.join(PROJECT_ROOT, "ai", "decks")
if not os.path.exists(ai_decks_dir):
# Fallback: try CWD relative
ai_decks_dir = os.path.abspath(os.path.join("ai", "decks"))
if not os.path.exists(ai_decks_dir):
return jsonify({"success": False, "error": "ai/decks directory not found"})
# List available decks (excluding verify script)
available_decks = []
for f in os.listdir(ai_decks_dir):
if f.endswith(".txt") and not f.startswith("verify"):
available_decks.append(f.replace(".txt", ""))
# If no deck specified, return list of available decks
if not deck_name:
# Default to aqours_cup for "Load Test Deck" button compatibility
deck_name = "aqours_cup"
message = "Defaulting to 'aqours_cup'. Specify ?deck=NAME to load a specific deck."
else:
message = f"Loaded '{deck_name}'"
# Find matching deck file
deck_file = os.path.join(ai_decks_dir, f"{deck_name}.txt")
if not os.path.exists(deck_file):
return jsonify({"success": False, "error": f"Deck '{deck_name}' not found", "available_decks": available_decks})
try:
with open(deck_file, "r", encoding="utf-8") as f:
content = f.read()
# Load card DB for parsing
card_db_path = os.path.join(CURRENT_DIR, "..", "data", "cards.json")
card_db = {}
if os.path.exists(card_db_path):
with open(card_db_path, "r", encoding="utf-8") as f_db:
card_db = json.load(f_db)
# Use the unified parser
main_deck, energy_deck, type_counts, errors = extract_deck_data(content, card_db)
return jsonify(
{
"success": True,
"deck_name": deck_name,
"content": main_deck, # For compatibility with older frontend
"main_deck": main_deck,
"energy_deck": energy_deck,
"available_decks": available_decks,
"message": f"{message} ({len(main_deck)} Main, {len(energy_deck)} Energy)",
"errors": errors,
}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/validate_cards", methods=["POST"])
def validate_cards():
"""Validate card IDs against the database and provide type breakdown."""
data = request.json
card_ids = data.get("card_ids", [])
card_counts = data.get("card_counts", {}) # Optional: {card_id: quantity}
# Ensure mapping is built
if not card_no_to_id:
print("DEBUG: validation - mapping empty, rebuilding...", flush=True)
build_card_no_mapping()
print(f"DEBUG: validation - map size: {len(card_no_to_id)}", flush=True)
test_key = "PL!SP-bp1-004-R"
if test_key in card_no_to_id:
print(f"DEBUG: validation - found {test_key}: {card_no_to_id[test_key]}", flush=True)
else:
print(f"DEBUG: validation - {test_key} NOT FOUND in map!", flush=True)
known = []
unknown = []
card_info = {} # card_id -> {type, name, internal_id}
# Type counters
member_count = 0
live_count = 0
energy_count = 0
for card_id in card_ids:
# print(f"DEBUG: Checking {card_id}", flush=True)
qty = card_counts.get(card_id, 1)
if card_id in card_no_to_id:
internal_id = card_no_to_id[card_id]
known.append(card_id)
# Determine type and get name
if internal_id in member_db:
card_info[card_id] = {"type": "Member", "name": member_db[internal_id].name}
member_count += qty
elif internal_id in live_db:
card_info[card_id] = {"type": "Live", "name": live_db[internal_id].name}
live_count += qty
elif internal_id in energy_db:
card_info[card_id] = {"type": "Energy", "name": energy_db[internal_id].name}
energy_count += qty
else:
unknown.append(card_id)
debug_info = {
"map_size": len(card_no_to_id),
"test_key_exists": "PL!SP-bp1-004-R" in card_no_to_id,
"test_key_val": card_no_to_id.get("PL!SP-bp1-004-R", "N/A"),
"first_5_keys": list(card_no_to_id.keys())[:5],
}
return jsonify(
{
"known": known,
"unknown": unknown,
"known_count": len(known),
"unknown_count": len(unknown),
"card_info": card_info,
"breakdown": {"member": member_count, "live": live_count, "energy": energy_count},
"_debug": debug_info,
}
)
@app.route("/api/clear_performance", methods=["POST"])
def clear_performance():
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
if room:
gs = room["state"]
# Clear the results dictionary
gs.performance_results.clear()
return jsonify({"status": "ok"})
@app.route("/api/action", methods=["POST"])
def do_action():
room_id = get_room_id()
session_token = request.headers.get("X-Session-Token")
with game_lock:
start_time = time.time()
try:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"}), 404
gs = room["state"]
game_mode = room["mode"]
ai_agent = room["ai_agent"]
sessions = room.get("sessions", {})
# Session Validation (Enforce Turn)
if session_token and session_token in sessions:
pid = sessions[session_token]
if pid != -1:
# Check Pending Choice Turn
p_choices = getattr(gs, "pending_choices", [])
if p_choices:
# Handle both Rust (str, str) and Python (str, dict) formats
params = p_choices[0][1]
if isinstance(params, str):
# Rust format: parse JSON
try:
params = json.loads(params)
except:
params = {}
choice_pid = params.get("player_id", gs.current_player)
if choice_pid != pid:
return jsonify(
{"success": False, "error": f"Not your turn to choose (Waiting for P{choice_pid})"}
), 403
# Check Main Turn
elif gs.current_player != pid:
return jsonify(
{"success": False, "error": f"Not your turn (Waiting for P{gs.current_player})"}
), 403
data = request.json
action_id = data.get("action_id", 0)
force = data.get("force", False)
legal_mask = gs.get_legal_actions()
# Validate Action
if not (0 <= action_id < len(legal_mask)):
return jsonify({"success": False, "error": "Invalid action ID"}), 400
# Enforce Perspective/Active Player consistency in PvP
requester_idx = get_player_idx()
if game_mode == "pvp":
if requester_idx != gs.current_player:
return jsonify(
{"success": False, "error": f"Not your turn! It's P{gs.current_player + 1}'s turn."}
), 403
elif game_mode == "pve":
# In PvE, if it's AI turn (P1), don't allow manual action from UI
if gs.current_player == 1:
return jsonify({"success": False, "error": "AI is playing, please wait."}), 403
is_legal = legal_mask[action_id]
if force or is_legal:
# Step 1: Execute User Action
res = gs.step(action_id)
if res is not None:
room["state"] = res
gs = res
# Step 2: Auto-Advance & AI Handling
max_safety = 50
while not gs.is_terminal() and max_safety > 0:
max_safety -= 1
# A. Automatic Phases (-2=Setup, 1=Active, 2=Energy, 3=Draw, 6=Perf1, 7=Perf2, 8=LiveResult)
if gs.phase in (-2, 1, 2, 3, 6, 7, 8):
res = gs.step(0)
if res is not None:
room["state"] = res
gs = res
continue
# B. AI Turn (P1) - ONLY if PVE
if gs.current_player == 1 and game_mode == "pve":
if room.get("engine") == "rust":
gs.step_opponent_mcts(10)
else:
# Python AI
aid = ai_agent.choose_action(gs, 1)
res = gs.step(aid)
if res is not None:
room["state"] = res
gs = res
continue
break
viewer_idx = get_player_idx()
duration = time.time() - start_time
print(f"[PERF] /api/action took {duration:.3f}s (Action: {action_id})")
return jsonify(
{
"success": True,
"state": rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=game_mode),
}
)
else:
return jsonify({"success": False, "error": f"Illegal action {action_id}"}), 400
except Exception as e:
import traceback
traceback.print_exc()
# Auto-report issue on crash
try:
report_dir = os.path.join(CURRENT_DIR, "reports")
os.makedirs(report_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
crash_file = os.path.join(report_dir, f"crash_{timestamp}.json")
try:
if room is not None and room.get("engine") == "rust":
serialized_state = rust_serializer.serialize_state(
gs, viewer_idx=get_player_idx(), mode=game_mode
)
else:
serialized_state = serialize_state(
gs, viewer_idx=get_player_idx(), is_pvp=(game_mode == "pvp"), mode=game_mode
)
with open(crash_file, "w", encoding="utf-8") as f:
# Use app.json.dumps to handle Numpy types
f.write(
app.json.dumps(
{
"error": str(e),
"trace": traceback.format_exc(),
"state": serialized_state,
}
)
)
except Exception as inner_e:
# Fallback if serialization fails
with open(crash_file, "w", encoding="utf-8") as f:
f.write(
app.json.dumps(
{"error": str(e), "trace": traceback.format_exc(), "serialization_error": str(inner_e)}
)
)
except:
pass
return jsonify({"success": False, "error": str(e), "trace": traceback.format_exc()}), 500
@app.route("/")
def index_route():
return send_from_directory(app.static_folder, "index.html")
@app.route("/<path:path>")
def static_proxy(path):
return send_from_directory(app.static_folder, path)
@app.route("/api/exec", methods=["POST"])
def god_mode():
room_id = get_room_id()
code = request.json.get("code", "")
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"})
gs = room["state"]
try:
p = gs.active_player
exec(code, {"state": gs, "p": p, "np": np})
return jsonify({"success": True, "state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"))})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route("/api/reset", methods=["POST"])
def reset():
room_id = get_room_id()
with game_lock:
data = request.json or {}
deck_type = data.get("deck_type", "normal")
# Allow changing mode on reset
new_mode = data.get("mode") # Optional
# Check if room exists to preserve existing params if not specified
old_room = ROOMS.get(room_id)
mode = new_mode if new_mode else (old_room["mode"] if old_room else "pve")
ROOMS[room_id] = create_room_internal(room_id, mode, deck_type)
room = ROOMS[room_id]
# Check for custom decks and apply them if they exist for this room
if old_room and "custom_decks" in old_room:
room["custom_decks"] = old_room["custom_decks"]
# Preserve sessions
if old_room and "sessions" in old_room:
room["sessions"] = old_room["sessions"]
if deck_type == "custom":
# Apply custom decks to the fresh state
gs = room["state"]
for pid in [0, 1]:
cdeck = room["custom_decks"].get(pid)
if cdeck and cdeck["main"]:
gs.players[pid].main_deck = convert_deck_strings_to_ids(cdeck["main"])
random.shuffle(gs.players[pid].main_deck)
# Re-draw hand?
gs.players[pid].hand = []
gs.players[pid].hand_added_turn = []
for _ in range(6):
if gs.players[pid].main_deck:
gs.players[pid].hand.append(gs.players[pid].main_deck.pop())
gs.players[pid].hand_added_turn.append(0)
if cdeck and cdeck["energy"]:
# Re-fill energy
gs.players[pid].energy_deck = convert_deck_strings_to_ids(cdeck["energy"])
gs.players[pid].energy_zone = []
for _ in range(3):
if gs.players[pid].energy_deck:
gs.players[pid].energy_zone.append(gs.players[pid].energy_deck.pop(0))
gs = room["state"]
game_mode = room["mode"]
# Auto-advance (AI goes first or Init steps)
max_safety = 100
while not gs.is_terminal() and max_safety > 0:
max_safety -= 1
# Automatic phases
if gs.phase in (-2, 1, 2, 3, 6, 7, 8):
gs.step(0)
continue
# AI Turn (P1)
if gs.current_player == 1 and game_mode == "pve":
gs.step_opponent_mcts(10)
continue
break # P0 turn or user input needed
return jsonify({"success": True, "state": rust_serializer.serialize_state(gs, mode=game_mode)})
@app.route("/api/ai_suggest", methods=["POST"])
def ai_suggest():
room_id = get_room_id()
data = request.json or {}
sims = data.get("sims", 10)
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"error": "Room not found"}), 404
gs = room["state"]
# Only run if not terminal
if gs.is_terminal():
return jsonify({"suggestions": []})
stats = gs.get_mcts_suggestions(sims)
# Shim for get_action_desc
class RustShim:
def __init__(self, gs):
self.phase = gs.phase
self.current_player = gs.current_player
self.active_player = gs.get_player(gs.current_player)
self.member_db = member_db
self.live_db = live_db
self.pending_choices = [] # TODO: expose from rust if needed
shim = RustShim(gs)
# Enrich stats with descriptions
enriched = []
for action, value, visits in stats:
desc = get_action_desc(action, shim)
enriched.append({"action_id": action, "value": float(value), "visits": int(visits), "desc": desc})
return jsonify({"success": True, "suggestions": enriched})
@app.route("/api/replays", methods=["GET"])
def list_replays():
# 1. Root replays
try:
if os.path.exists("replays"):
for f in os.listdir("replays"):
if f.endswith(".json") and os.path.isfile(os.path.join("replays", f)):
replays.append({"filename": f, "folder": ""})
# 2. Tournament subfolder
tourney_dir = os.path.join("replays", "tournament")
if os.path.exists(tourney_dir):
for f in os.listdir(tourney_dir):
if f.endswith(".json"):
# We need to handle pathing. The frontend might expect just filename.
# But get_replay takes "filename".
# We should probably update get_replay to handle subpaths or encode it.
# For now let's just use the relative path as the filename
replays.append({"filename": f"tournament/{f}", "folder": "tournament"})
except Exception as e:
print(f"Error listing replays: {e}")
return jsonify({"success": False, "error": str(e)})
# Sort by filename desc (usually timestamp)
replays.sort(key=lambda x: x["filename"], reverse=True)
return jsonify({"success": True, "replays": replays})
def get_replay(filename):
"""Serve replay JSON files"""
replay_path = f"replays/{filename}"
if os.path.exists(replay_path):
with open(replay_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Auto-inflate if it's an optimized replay
if "registry" in data and "states" in data:
print(f"Inflating optimized replay: {filename}")
inflated_states = inflate_history(data, member_db, live_db, energy_db)
# Reconstruct standard format
data["states"] = inflated_states
# Remove registry to avoid confusing frontend if it doesn't expect it
del data["registry"]
return jsonify(data)
return jsonify({"error": "Replay not found"}), 404
@app.route("/api/advance", methods=["POST"])
def advance():
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"}), 404
gs = room["state"]
ai_agent = room["ai_agent"]
# Run auto-advance loop
max_safety = 50
while not gs.is_terminal() and max_safety > 0:
max_safety -= 1
# Advance if in an automatic phase (AND no choices pending)
if not gs.pending_choices and gs.phase in (
Phase.ACTIVE,
Phase.ENERGY,
Phase.DRAW,
Phase.PERFORMANCE_P1,
Phase.PERFORMANCE_P2,
):
gs = gs.step(0)
room["state"] = gs
continue
# Determine who should act (Check pending choices first)
next_actor = gs.current_player
if gs.pending_choices:
# Handle both Rust (str, str) and Python (str, dict) formats
params = gs.pending_choices[0][1]
if isinstance(params, str):
try:
params = json.loads(params)
except:
params = {}
next_actor = params.get("player_id", gs.current_player)
# If it's the AI's turn (P1) or the AI has a pending choice, let it act immediately
if next_actor == 1 and not gs.is_terminal():
aid = ai_agent.choose_action(gs, 1)
gs = gs.step(aid)
room["state"] = gs
continue
break
return jsonify(
{
"success": True,
"state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"), mode=room["mode"]),
}
)
@app.route("/api/full_log", methods=["GET"])
def get_full_log():
"""Return the complete rule log without truncation."""
room_id = get_room_id()
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"log": [], "total_entries": 0})
gs = room["state"]
return jsonify({"log": gs.rule_log, "total_entries": len(gs.rule_log)})
@app.route("/api/set_ai", methods=["POST"])
def set_ai():
room_id = get_room_id()
data = request.json
mode = data.get("ai_mode", "smart")
with game_lock:
room = get_room(room_id)
if not room:
return jsonify({"success": False, "error": "Room not found"})
if mode == "random":
room["ai_agent"] = RandomAgent()
elif mode == "smart":
room["ai_agent"] = SmartAgent()
else:
return jsonify({"success": False, "error": f"Unknown AI mode: {mode}"})
return jsonify({"success": True, "mode": mode})
@app.route("/api/report_issue", methods=["POST"])
def report_issue():
"""Save the current game state and user explanation to a report file."""
try:
room_id = get_room_id()
room = get_room(room_id)
gs = room["state"] if room else None
data = request.json
explanation = data.get("explanation", "")
# We can take the current state from the request or just use our global game_state
# Providing it in the request is safer if the user is looking at a specific frame (e.g. in replay mode)
# But for now, let's use the provided state if it exists, otherwise capture the current one.
if room and room.get("engine") == "rust":
serialized = rust_serializer.serialize_state(gs, viewer_idx=0, mode=room.get("mode", "pve"))
else:
serialized = serialize_state(gs, is_pvp=(room["mode"] == "pvp" if room else False))
state_to_save = data.get("state") or serialized
history = data.get("history", [])
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("reports", exist_ok=True)
filename = f"reports/report_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(
{
"timestamp": timestamp,
"explanation": explanation,
"state": state_to_save,
"history": history,
"performance_history": state_to_save.get("performance_history", []),
"performance_results": state_to_save.get("performance_results", {}),
"action_desc": get_action_desc(state_to_save.get("last_action", 0), gs)
if gs and "last_action" in state_to_save
else "N/A",
},
f,
indent=2,
ensure_ascii=False,
)
return jsonify({"success": True, "filename": filename})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": str(e)}), 500
def generate_random_deck_list(member_db, live_db) -> list[str]:
"Generate a valid random deck list (card_no strings)."
deck = []
# 1. Select Members (48)
available_members = [c.card_no for c in member_db.values()]
if available_members:
member_bucket = []
for m_no in available_members:
member_bucket.extend([m_no] * 4)
random.shuffle(member_bucket)
while len(member_bucket) < 48:
member_bucket.extend(available_members)
deck.extend(member_bucket[:48])
# 2. Select Lives (12)
available_lives = [c.card_no for c in live_db.values()]
if available_lives:
live_bucket = []
for l_no in available_lives:
live_bucket.extend([l_no] * 4)
random.shuffle(live_bucket)
while len(live_bucket) < 12:
live_bucket.extend(available_lives)
deck.extend(live_bucket[:12])
return deck
@app.route("/api/get_random_deck", methods=["GET"])
def get_random_deck_api():
global member_db, live_db
deck_list = generate_random_deck_list(member_db, live_db)
return jsonify(
{"success": True, "content": deck_list, "message": f"Generated Random Deck ({len(deck_list)} cards)"}
)
@app.route("/api/presets", methods=["GET"])
def get_presets():
"""Return list of preset decks from tests/presets.json."""
try:
preset_path = os.path.join(CURRENT_DIR, "..", "tests", "presets.json")
if os.path.exists(preset_path):
with open(preset_path, "r", encoding="utf-8") as f:
data = json.load(f)
return jsonify({"success": True, "presets": data})
return jsonify({"success": False, "error": "presets.json not found", "presets": []})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
if __name__ == "__main__":
# PyInstaller Bundle Check
if getattr(sys, "frozen", False):
# If frozen, we might need to adjust static folder or templates folder depending on how flask finds them.
# However, we added paths with --add-data, so they should be in sys._MEIPASS.
# Flask's root_path defaults to __main__ directory, which in onefile mode is temporary.
# We need to explicitly point static_folder to the MEIPASS location.
bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore
app.static_folder = os.path.join(bundle_dir, "web_ui")
# app.template_folder = os.path.join(bundle_dir, 'templates') # if we used templates
# Also need to make sure data loader finds 'data/cards.json'
# CardDataLoader expects relative path. We might need to chdir or patch it.
# Easiest is to chdir to the bundle dir so relative paths work?
# BUT 'replays' need to be written to writable cwd, not temp dir.
# So we should NOT chdir globally.
# Instead, we should update filenames to be absolute paths based on bundle_dir if read-only.
# Monkey patch the loader path just for this instance if needed,
# but CardDataLoader takes a path arg.
# We need to ensure 'init_game' calls it with the correct absolute path.
pass
# Patched init_game for Frozen state to find data
original_init_game = init_game
def frozen_init_game(deck_type="normal"):
if getattr(sys, "frozen", False):
bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore
os.path.join(bundle_dir, "data", "cards.json")
# We need to temporarily force the loader to use this path
# But init_game hardcodes "data/cards.json" in correct logic?
# actually checking init_game source:
# loader = CardDataLoader("data/cards.json")
# We need to change that line or intercept.
# Use os.chdir to temp dir for READS? No, we need writes to real dir.
# Best way: Just ensure data/cards.json exists in CWD? No, user won't have it.
# HACK: We can't easily change the hardcoded string inside init_game without rewriting it.
# However, we can patch CardDataLoader class to fix the path!
# Assuming CardDataLoader is imported from engine.game.data_loader
from engine.game.data_loader import CardDataLoader
ops_init = CardDataLoader.__init__
def new_init(self, filepath):
if not os.path.exists(filepath) and getattr(sys, "frozen", False):
# Try bundle path
bundle_path = os.path.join(sys._MEIPASS, filepath) # type: ignore
if os.path.exists(bundle_path):
filepath = bundle_path
ops_init(self, filepath)
CardDataLoader.__init__ = new_init # type: ignore[method-assign]
original_init_game(deck_type)
init_game = frozen_init_game
# Run Server
# use_reloader=False is crucial for PyInstaller to implicit avoid spawning subprocesses incorrectly
port = int(os.environ.get("PORT", 8000))
# Auto-open browser
import webbrowser
from threading import Timer
def open_browser():
webbrowser.open_new(f"http://localhost:{port}/")
if not getattr(sys, "frozen", False) or os.environ.get("OPEN_BROWSER", "true").lower() == "true":
Timer(1.5, open_browser).start()
# Start Background Game Loop
if game_thread is None:
game_thread = threading.Thread(target=background_game_loop, daemon=True)
game_thread.start()
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
# In production/container, usually don't want debug mode
debug_mode = os.environ.get("FLASK_DEBUG", "True").lower() == "true"
app.run(host="0.0.0.0", port=port, debug=debug_mode)
|