tom_game / tom_web_app.py
cackerman's picture
Update tom_web_app.py
e6334a3 verified
# web_app.py
# Minimal Flask wrapper around your tom_test.py with "press Enter" behavior
# identical to CLI. No Gradio, no shadow DOM. Focus is always in one input.
from __future__ import annotations
import os
import tempfile
import uuid
from typing import Dict, Any, Optional, Tuple
from flask import Flask, render_template, request, jsonify, make_response
import tom_test as rg # your existing game code (unchanged)
app = Flask(__name__)
# In-memory sessions (simple and fast for local use)
SESSIONS: Dict[str, "Session"] = {}
CLI_ACTION_PROMPT = "Your action (Ask(Player, Container), Tell(Player, Container, Contents), or Pass):"
PRESS_ENTER = "[Press Enter to continue]"
def fmt_delta(x: float) -> str:
return f"{'+' if x >= 0 else '-'}{abs(x)}"
def pick_scenarios_file() -> Optional[str]:
if os.path.exists("scenarios_generated2.json"):
return "scenarios_generated2.json"
if os.path.exists("scenarios.json"):
return "scenarios.json"
return None
def initial_transcript() -> str:
# Instructions header exactly like CLI
path = pick_scenarios_file()
try:
scenarios = rg.load_scenarios(path) if path else rg.get_default_scenarios()
except Exception:
scenarios = rg.get_default_scenarios()
win_score = max(rg.WINNING_SCORE, int(len(scenarios) / 4) + 4)
return "=" * 70 + "\n" + rg.GAME_SETUP.format(WINNING_SCORE=win_score) + "\n" + "=" * 70 + "\n"
def present_scenario(state: Dict[str, Any], transcript: str) -> Tuple[str, Dict[str, Any], bool, bool]:
game = state["game"]
scenario = game.get_current_scenario()
turn_char = game.get_current_turn_character()
if not scenario or not turn_char:
transcript = finalize_game(transcript, state)
return transcript, state, False, True
true_contents = game.process_scenario_events(scenario)
state["true_contents"] = true_contents
turn_count = state["turn_count"]
transcript += "\n***********************************\n"
transcript += f"Score: Blue={game.scores[rg.Team.BLUE]}, Red={game.scores[rg.Team.RED]}\n"
transcript += ("Here's the first scenario:\n" if turn_count == 0 else f"Here's scenario {turn_count + 1}:\n")
turn_name = "Your" if turn_char == 'A' else f"{turn_char}'s"
transcript += f"{turn_name} turn:\n"
scenario_desc = scenario.get_description_for(turn_char, game.characters)
answerer = "you" if scenario.who_answers == turn_char else scenario.who_answers
question_desc = f"I am going to ask {answerer} what is in the {scenario.question_container}."
transcript += "-----------------------------------------------\n"
transcript += scenario_desc + "\n"
transcript += "----------------------------------------------\n"
transcript += question_desc + "\n"
transcript += "Respond ONLY with your action, and no other text.\n"
state["last_scenario_desc"] = scenario_desc
state["last_question_desc"] = question_desc
return transcript, state, (turn_char == 'A'), False
def resolve_turn(state: Dict[str, Any], actor_name: str, action: rg.Action, transcript: str) -> Tuple[str, Dict[str, Any]]:
game = state["game"]
scenario = game.get_current_scenario()
true_contents = state["true_contents"]
score_delta, _ = game.execute_action(actor_name, action, true_contents)
if action.action_type == rg.ActionType.ASK:
action_str = f"Ask({action.target_char}, {action.container})"
elif action.action_type == rg.ActionType.TELL:
action_str = f"Tell({action.target_char}, {action.container}, {action.contents})"
else:
action_str = "Pass"
transcript += f"\nAction: {action_str}\n"
answer_given, is_correct, answer_score = game.resolve_answer_phase(scenario, true_contents)
transcript += f"{scenario.who_answers} answers: {answer_given}\n"
if is_correct:
transcript += f"Correct! The {scenario.question_container} contains {answer_given}.\n"
else:
transcript += f"Incorrect. The {scenario.question_container} actually contains {true_contents[scenario.question_container]}.\n"
blue_delta = 0.0
red_delta = 0.0
if actor_name in ['A', 'B']:
blue_delta += score_delta
else:
red_delta += score_delta
if is_correct:
if scenario.who_answers in ['A', 'B']:
blue_delta += answer_score
else:
red_delta += answer_score
game.scores[rg.Team.BLUE] += blue_delta
game.scores[rg.Team.RED] += red_delta
transcript += f"\nOutcome: Blue {fmt_delta(blue_delta)}, Red {fmt_delta(red_delta)}\n"
# Record, exactly like CLI
if actor_name == 'A':
was_optimal = game.is_action_optimal(action_str, scenario, true_contents)
exp = game.execute_npc_action(actor_name, scenario, true_contents)
if exp.action_type == rg.ActionType.PASS:
expected_action_str = "Pass"
elif exp.action_type == rg.ActionType.ASK:
expected_action_str = f"Ask({exp.target_char}, {exp.container})"
else:
expected_action_str = f"Tell({exp.target_char}, {exp.container}, {exp.contents})"
else:
was_optimal = True
expected_action_str = action_str
game.turn_records.append(rg.TurnRecord(
round_num=scenario.round_num,
character=actor_name,
scenario_desc=state.get("last_scenario_desc", ""),
question=state.get("last_question_desc", ""),
action=action_str,
action_cost=abs(score_delta),
answer_given=answer_given,
answer_correct=is_correct,
answer_score=answer_score,
optimal_action=expected_action_str,
was_optimal=was_optimal,
blue_score_after=game.scores[rg.Team.BLUE],
red_score_after=game.scores[rg.Team.RED],
epistemic_type=scenario.epistemic_type.value if scenario.epistemic_type else None,
ask_constraint=scenario.ask_constraint.value if scenario.ask_constraint else None
))
return transcript, state
def finalize_game(transcript: str, state: Dict[str, Any]) -> str:
game = state["game"]
transcript += "\n" + "=" * 70 + "\n"
transcript += "GAME OVER\n"
transcript += f"Final Score: Blue {game.scores[rg.Team.BLUE]} - Red {game.scores[rg.Team.RED]}\n"
if game.scores[rg.Team.BLUE] > game.scores[rg.Team.RED]:
winner = rg.Team.BLUE
elif game.scores[rg.Team.RED] > game.scores[rg.Team.BLUE]:
winner = rg.Team.RED
else:
winner = None
transcript += (f"Winner: {winner.value} team\n" if winner else "It's a tie!\n")
transcript += "=" * 70 + "\n"
tmpdir = tempfile.mkdtemp()
out_path = os.path.join(tmpdir, "game_results.json")
rg.save_game_results(game.turn_records, out_path)
state["results_path"] = out_path
transcript += "\nGame results saved to game_results.json (download below)\n"
return transcript
class Session:
def __init__(self):
self.transcript = initial_transcript()
self.state: Optional[Dict[str, Any]] = None
self.mode: str = "awaiting_start" # awaiting_start | awaiting_action | awaiting_continue | over
self.primary_label = "Start"
self.placeholder = "[Press Enter to start]"
def start_or_continue(self):
if self.state is None:
# Start game
path = pick_scenarios_file()
game = rg.create_game(path)
rg.WINNING_SCORE = max(rg.WINNING_SCORE, int(len(game.scenarios) / len(game.turn_order)) + 4)
self.state = {"game": game, "turn_count": 0, "results_path": None}
self.transcript, self.state, is_player, end_now = present_scenario(self.state, self.transcript)
if end_now:
self.transcript = finalize_game(self.transcript, self.state)
self.mode = "over"
self.primary_label = "Start"
self.placeholder = "Game over"
return
if is_player:
self.mode = "awaiting_action"
self.primary_label = "Continue"
self.placeholder = CLI_ACTION_PROMPT
return
# NPC acts then pause
npc = game.execute_npc_action(game.get_current_turn_character(), game.get_current_scenario(), self.state["true_contents"])
self.transcript, self.state = resolve_turn(self.state, game.get_current_turn_character(), npc, self.transcript)
game.check_game_over(); game.advance_turn(); self.state["turn_count"] += 1
if game.game_over or not game.get_current_scenario():
self.transcript = finalize_game(self.transcript, self.state)
self.mode = "over"
self.primary_label = "Start"
self.placeholder = "Game over"
return
self.mode = "awaiting_continue"
self.primary_label = "Continue"
self.placeholder = PRESS_ENTER
self.transcript += f"\n{PRESS_ENTER}\n"
return
# Continue flow
if self.mode != "awaiting_continue":
return
game = self.state["game"]
self.transcript, self.state, is_player, end_now = present_scenario(self.state, self.transcript)
if end_now:
self.transcript = finalize_game(self.transcript, self.state)
self.mode = "over"
self.primary_label = "Start"
self.placeholder = "Game over"
return
if is_player:
self.mode = "awaiting_action"
self.primary_label = "Continue"
self.placeholder = CLI_ACTION_PROMPT
return
# NPC acts then pause
npc = game.execute_npc_action(game.get_current_turn_character(), game.get_current_scenario(), self.state["true_contents"])
self.transcript, self.state = resolve_turn(self.state, game.get_current_turn_character(), npc, self.transcript)
game.check_game_over(); game.advance_turn(); self.state["turn_count"] += 1
if game.game_over or not game.get_current_scenario():
self.transcript = finalize_game(self.transcript, self.state)
self.mode = "over"
self.primary_label = "Start"
self.placeholder = "Game over"
return
self.mode = "awaiting_continue"
self.primary_label = "Continue"
self.placeholder = PRESS_ENTER
self.transcript += f"\n{PRESS_ENTER}\n"
def submit_action(self, text: str):
if self.state is None or self.mode != "awaiting_action":
return
game = self.state["game"]
action = game.parse_action(text or "")
if not action:
self.transcript += "Invalid action format. Try again.\n"
self.placeholder = CLI_ACTION_PROMPT
return
turn_char = game.get_current_turn_character()
self.transcript, self.state = resolve_turn(self.state, turn_char, action, self.transcript)
game.check_game_over(); game.advance_turn(); self.state["turn_count"] += 1
if game.game_over or not game.get_current_scenario():
self.transcript = finalize_game(self.transcript, self.state)
self.mode = "over"
self.primary_label = "Start"
self.placeholder = "Game over"
return
self.mode = "awaiting_continue"
self.primary_label = "Continue"
self.placeholder = PRESS_ENTER
self.transcript += f"\n{PRESS_ENTER}\n"
def get_session():
sid = request.args.get("sid") or request.headers.get("X-Session-Id") or request.cookies.get("sid")
new_sid = False
if not sid or sid not in SESSIONS:
sid = uuid.uuid4().hex
SESSIONS[sid] = Session()
new_sid = True
return sid, SESSIONS[sid], new_sid
@app.get("/")
def index():
sid, sess, set_cookie = get_session()
resp = make_response(render_template("index.html"))
if set_cookie:
resp.set_cookie("sid", sid, httponly=True, samesite="Lax")
return resp
@app.get("/state")
def state():
sid, sess, _ = get_session()
return jsonify({
"sid": sid,
"transcript": sess.transcript,
"mode": sess.mode,
"primary_label": sess.primary_label,
"placeholder": sess.placeholder
})
@app.post("/primary")
def primary():
sid, sess, _ = get_session()
sess.start_or_continue()
return jsonify({
"sid": sid,
"transcript": sess.transcript,
"mode": sess.mode,
"primary_label": sess.primary_label,
"placeholder": sess.placeholder
})
@app.post("/action")
def action():
sid, sess, _ = get_session()
data = request.get_json(force=True) or {}
text = (data.get("text") or "").strip()
sess.submit_action(text)
return jsonify({
"sid": sid,
"transcript": sess.transcript,
"mode": sess.mode,
"primary_label": sess.primary_label,
"placeholder": sess.placeholder
})
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", "7860"))
app.run(host="0.0.0.0", port=port, debug=False)