"""Standalone play server for manual Phantom Grid gameplay.
Completely separate from the OpenEnv app.py — does NOT affect
HuggingFace deployment, Docker builds, or run_eval.py in any way.
Runs on port 8001 by default. Uses the game engine and renderer directly.
Usage:
cd visual-memory
python play_server.py
# Then open play.html in a browser
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, FileResponse
from pydantic import BaseModel
sys.path.insert(0, str(Path(__file__).resolve().parent))
from server.engine import GameEngine
from server.renderer import Renderer
SCENARIOS_DIR = os.path.join(os.path.dirname(__file__), "scenarios")
app = FastAPI(title="Phantom Grid — Play Server")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
engine: GameEngine | None = None
renderer = Renderer()
def _load_scenario_file(scenario_id: str) -> dict:
path = os.path.join(SCENARIOS_DIR, f"{scenario_id}.json")
if not os.path.isfile(path):
raise FileNotFoundError(f"Scenario '{scenario_id}' not found at {path}")
with open(path) as f:
return json.load(f)
def _board_response() -> dict:
"""Build a unified response with board SVG + game status."""
if engine is None:
return {"error": "No scenario loaded."}
bs = engine.get_board_state()
view = renderer.get_board_view(
bs.visible_cells, bs.board_width, bs.board_height,
scenario_type=bs.scenario_type, step_count=bs.step_count,
)
status = engine.get_status()
return {"board": view, "status": status}
@app.get("/")
async def index():
html_path = os.path.join(os.path.dirname(__file__), "play.html")
if os.path.isfile(html_path):
return FileResponse(html_path, media_type="text/html")
return HTMLResponse("
play.html not found
", status_code=404)
@app.get("/scenarios")
async def list_scenarios():
results = []
for fname in sorted(os.listdir(SCENARIOS_DIR)):
if not fname.endswith(".json"):
continue
try:
data = _load_scenario_file(fname.replace(".json", ""))
results.append({
"scenario_id": data.get("scenario_id", fname.replace(".json", "")),
"type": data.get("type", "hidden_grid"),
"difficulty": data.get("difficulty", "hard"),
"board_size": f"{data.get('board_width', '?')}x{data.get('board_height', '?')}",
"description": data.get("description", ""),
})
except Exception:
continue
return {"scenarios": results}
class LoadReq(BaseModel):
scenario_id: str
@app.post("/load")
async def load_scenario(req: LoadReq):
global engine
try:
data = _load_scenario_file(req.scenario_id)
except FileNotFoundError as e:
return {"error": str(e)}
engine = GameEngine(data)
resp = _board_response()
resp["loaded"] = True
resp["how_to_play"] = data.get("how_to_play", "")
resp["scenario_description"] = data.get("description", "")
return resp
class CellReq(BaseModel):
row: int
col: int
@app.post("/reveal")
async def reveal(req: CellReq):
if engine is None:
return {"error": "No scenario loaded."}
result = engine.reveal_cell(req.row, req.col)
resp = _board_response()
resp["action_result"] = result
return resp
@app.post("/flag")
async def flag(req: CellReq):
if engine is None:
return {"error": "No scenario loaded."}
result = engine.flag_cell(req.row, req.col)
resp = _board_response()
resp["action_result"] = result
return resp
@app.post("/unflag")
async def unflag(req: CellReq):
if engine is None:
return {"error": "No scenario loaded."}
result = engine.unflag_cell(req.row, req.col)
resp = _board_response()
resp["action_result"] = result
return resp
@app.post("/move_viewport")
async def move_viewport(req: CellReq):
if engine is None:
return {"error": "No scenario loaded."}
result = engine.move_viewport(req.row, req.col)
resp = _board_response()
resp["action_result"] = result
return resp
class InspectReq(BaseModel):
center_row: int
center_col: int
radius: int = 1
@app.post("/inspect")
async def inspect(req: InspectReq):
if engine is None:
return {"error": "No scenario loaded."}
if engine.game_over:
return {"error": "Game is already over."}
if req.radius < 1 or req.radius > 3:
return {"error": "Radius must be between 1 and 3."}
engine.step_count += 1
engine._tick_pattern_memory()
visible = engine.get_visible_board()
region = []
for r in range(max(0, req.center_row - req.radius),
min(engine.height, req.center_row + req.radius + 1)):
for c in range(max(0, req.center_col - req.radius),
min(engine.width, req.center_col + req.radius + 1)):
cell = visible[r][c]
region.append({"row": r, "col": c, "state": cell["state"], "content": cell.get("content")})
resp = _board_response()
resp["action_result"] = {"cells": region}
return resp
@app.get("/status")
async def status():
if engine is None:
return {"error": "No scenario loaded."}
return engine.get_status()
@app.get("/board")
async def board():
return _board_response()
@app.get("/recall")
async def recall():
if engine is None:
return {"error": "No scenario loaded."}
bs = engine.get_board_state()
return {
"discovered_signals": bs.discovered_signals,
"memory_events": bs.memory_events,
}
class SubmitReq(BaseModel):
flagged_positions: list[list[int]] = []
safe_positions: list[list[int]] = []
@app.post("/submit")
async def submit(req: SubmitReq):
if engine is None:
return {"error": "No scenario loaded."}
result = engine.submit_solution(
flagged_positions=req.flagged_positions,
safe_positions=req.safe_positions,
)
resp = _board_response()
resp["action_result"] = result
return resp
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)