| """ |
| Self-contained MCP Server for Text Adventure Games |
| |
| Tools: |
| - play_action(action) -> observation (with score delta / game over suffix) |
| - status() -> JSON snapshot for deterministic agent parsing |
| - peek_action(action) -> JSON snapshot after simulating action (no commit) |
| - valid_actions() -> JSON list (Jericho analyzer; safe fallback) |
| - inventory(), memory(), get_map(), reset() |
| |
| IMPORTANT: FastMCP uses stdio JSON-RPC framing. Do NOT print to stdout. |
| Any debug must go to stderr only. |
| """ |
|
|
| import atexit |
| import contextlib |
| import glob |
| import io |
| import json |
| import os |
| import re |
| import sys |
| from dataclasses import dataclass |
| from typing import Optional |
|
|
| from fastmcp import FastMCP |
|
|
| |
| try: |
| from jericho import FrotzEnv |
| except Exception as e: |
| FrotzEnv = None |
| _JERICHO_IMPORT_ERROR = e |
| else: |
| _JERICHO_IMPORT_ERROR = None |
|
|
| mcp = FastMCP("Agentic Zork Server (self-contained)") |
|
|
| DEBUG = False |
|
|
|
|
| |
| |
| |
| def _safe_call(fn, *args, **kwargs): |
| """ |
| Call fn with Python-level stdout captured to avoid corrupting MCP stdio. |
| Captured output is forwarded to stderr. |
| """ |
| buf = io.StringIO() |
| try: |
| with contextlib.redirect_stdout(buf): |
| return fn(*args, **kwargs) |
| finally: |
| out = buf.getvalue() |
| if out: |
| print(out, file=sys.stderr, end="") |
|
|
|
|
| |
| |
| |
| CANONICAL_DIRECTIONS = [ |
| "north", "south", "east", "west", |
| "northeast", "northwest", "southeast", "southwest", |
| "up", "down", "in", "out", |
| ] |
| CANONICAL_DIR_SET = set(CANONICAL_DIRECTIONS) |
|
|
| DIRECTION_ALIASES = { |
| "n": "north", "s": "south", "e": "east", "w": "west", |
| "ne": "northeast", "nw": "northwest", "se": "southeast", "sw": "southwest", |
| "u": "up", "d": "down", |
| "enter": "in", "exit": "out", |
| } |
|
|
| FAILURE_PATTERNS = [ |
| "you can't go", "can't go that way", "there is no way to go", |
| "you can't", "cannot", "not possible", |
| "i don't understand", "don't understand", |
| "beg your pardon", "huh?", "what?", |
| "nothing happens", "that doesn't work", "doesn't seem to", |
| "not here", "you don't see", "you see nothing", |
| "too dark to see", "it's too dark", |
| |
| "grunk not know", "not know where that", "not understand that", |
| ] |
|
|
| UNSAFE_STARTS = ( |
| "burn ", "set fire", "ignite ", |
| "attack ", "kill ", "hit ", "stab ", "shoot ", "punch ", "fight ", |
| "destroy ", "break ", "smash ", |
| "eat ", |
| ) |
|
|
| _DIR_WORD_RE = re.compile( |
| r"\b(north(?:east|west)?|south(?:east|west)?|east|west|" |
| r"northeast|northwest|southeast|southwest|up|down|in|out)\b", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def canonicalize_direction(action: str) -> Optional[str]: |
| """Map action -> canonical direction, tolerant to 'go <dir>' and extra words.""" |
| a = (action or "").strip().lower() |
| if not a: |
| return None |
| if a in CANONICAL_DIR_SET: |
| return a |
| if a in DIRECTION_ALIASES: |
| return DIRECTION_ALIASES[a] |
| if a.startswith("go "): |
| rest = a[3:].strip() |
| if not rest: |
| return None |
| first = rest.split()[0] |
| if first in CANONICAL_DIR_SET: |
| return first |
| return DIRECTION_ALIASES.get(first) |
| |
| first = a.split()[0] |
| if first in CANONICAL_DIR_SET: |
| return first |
| return DIRECTION_ALIASES.get(first) |
|
|
|
|
| def is_direction(action: str) -> bool: |
| return canonicalize_direction(action) is not None |
|
|
|
|
| def looks_like_failure(obs: str) -> bool: |
| t = (obs or "").lower() |
| return any(p in t for p in FAILURE_PATTERNS) |
|
|
|
|
| def _extract_obs_exits(obs: str) -> list[str]: |
| """Fallback: extract direction words from observation prose.""" |
| found = [] |
| for m in _DIR_WORD_RE.finditer(obs or ""): |
| d = m.group(1).lower() |
| if d in CANONICAL_DIR_SET and d not in found: |
| found.append(d) |
| return found |
|
|
|
|
| def _clean_jericho_name(raw: str) -> str: |
| """Extract a clean room name from Jericho object repr.""" |
| if not raw or raw == "None": |
| return "" |
| m = re.match(r"Obj\d+:\s*(.+?)\s+Parent", raw) |
| if m: |
| return m.group(1).strip() |
| return raw.strip() |
|
|
|
|
| def _first_obs_line(obs: str) -> str: |
| for line in (obs or "").splitlines(): |
| s = line.strip() |
| if s: |
| return s |
| return "Unknown" |
|
|
|
|
| def _find_rom_path(game: str) -> str: |
| """ |
| Find <game>.z* in common locations. |
| Works with BYU-PCCL z-machine-games repo layout: |
| z-machine-games-master/jericho-game-suite/<game>.z* |
| """ |
| game = (game or "").strip().lower() |
| env_dir = os.environ.get("Z_MACHINE_GAMES_DIR") or os.environ.get("Z_MACHINE_GAMES_PATH") |
|
|
| patterns = [] |
| if env_dir: |
| patterns.append(os.path.join(env_dir, "jericho-game-suite", f"{game}.z*")) |
| patterns.append(os.path.join(env_dir, f"{game}.z*")) |
|
|
| |
| patterns.append(os.path.join("z-machine-games-master", "jericho-game-suite", f"{game}.z*")) |
|
|
| |
| here = os.path.dirname(os.path.abspath(__file__)) |
| patterns.append(os.path.join(here, "z-machine-games-master", "jericho-game-suite", f"{game}.z*")) |
|
|
| |
| p = here |
| for _ in range(8): |
| p = os.path.dirname(p) |
| patterns.append(os.path.join(p, "z-machine-games-master", "jericho-game-suite", f"{game}.z*")) |
|
|
| |
| patterns.append(os.path.join(os.getcwd(), "**", f"{game}.z*")) |
|
|
| for pat in patterns: |
| hits = glob.glob(pat, recursive=True) |
| if hits: |
| return sorted(hits)[0] |
|
|
| raise FileNotFoundError( |
| f"ROM not found for game='{game}'. Expected z-machine-games-master/jericho-game-suite/{game}.z*" |
| ) |
|
|
|
|
| |
| |
| |
| @dataclass |
| class OutcomeEntry: |
| count: int = 0 |
| moved: bool = False |
| reward_delta: int = 0 |
| new_loc_id: int = -1 |
| failed: bool = False |
| observation_snippet: str = "" |
|
|
|
|
| class GameManager: |
| def __init__(self) -> None: |
| self.env: Optional[FrotzEnv] = None |
| self.game_name: str = "" |
| self.rom_path: str = "" |
|
|
| self.last_observation: str = "" |
| self.current_loc_id: int = -1 |
| self.current_loc_name: str = "Unknown" |
|
|
| self.loc_names: dict[int, str] = {} |
| self.outcomes: dict[int, dict[str, OutcomeEntry]] = {} |
| self.edges: dict[int, dict[str, int]] = {} |
|
|
| self._score: int = 0 |
| self._max_score: int = 0 |
| self._moves: int = 0 |
| self._done: bool = False |
|
|
| def initialize(self, game: str) -> str: |
| if FrotzEnv is None: |
| raise ModuleNotFoundError(f"jericho not available: {_JERICHO_IMPORT_ERROR}") |
|
|
| self.game_name = game |
| self.rom_path = _find_rom_path(game) |
| self.env = _safe_call(FrotzEnv, self.rom_path) |
|
|
| |
| seed_env = os.environ.get("SEED") |
| if seed_env is not None: |
| try: |
| _safe_call(self.env.seed, int(seed_env)) |
| except Exception: |
| pass |
|
|
| obs = self._reset_env() |
| self.last_observation = obs |
|
|
| self.loc_names = {} |
| self.outcomes = {} |
| self.edges = {} |
|
|
| self._refresh_counters() |
| self.current_loc_id, self.current_loc_name = self._get_loc(obs) |
| self.loc_names[self.current_loc_id] = self.current_loc_name |
| return obs |
|
|
| def _reset_env(self) -> str: |
| r = _safe_call(self.env.reset) |
| if isinstance(r, tuple) and len(r) >= 1: |
| return r[0] or "" |
| return r or "" |
|
|
| def _refresh_counters(self) -> None: |
| try: |
| self._score = int(_safe_call(self.env.get_score)) |
| except Exception: |
| self._score = 0 |
| try: |
| self._moves = int(_safe_call(self.env.get_moves)) |
| except Exception: |
| self._moves = 0 |
| try: |
| self._max_score = int(_safe_call(self.env.get_max_score)) |
| except Exception: |
| self._max_score = 0 |
| try: |
| self._done = bool(_safe_call(self.env.game_over)) |
| except Exception: |
| self._done = False |
|
|
| def _get_loc(self, obs: str) -> tuple[int, str]: |
| |
| try: |
| obj = _safe_call(self.env.get_player_location) |
| num = int(obj.num) |
| name = _clean_jericho_name(str(obj)) |
| if num > 0 and name and name.lower() not in ("unknown", ""): |
| return num, name |
| except Exception: |
| pass |
|
|
| |
| name = _first_obs_line(obs) |
| if name and name.lower() not in ("unknown", ""): |
| return hash(name) & 0x7FFF_FFFF, name |
|
|
| return -1, "Unknown" |
|
|
| def get_inventory_items(self) -> list[str]: |
| try: |
| inv = _safe_call(self.env.get_inventory) or [] |
| except Exception: |
| inv = [] |
| out = [] |
| for it in inv: |
| s = _clean_jericho_name(str(it)) |
| if ":" in s: |
| s = s.split(":", 1)[1].strip() |
| if "Parent" in s: |
| s = s.split("Parent", 1)[0].strip() |
| if s: |
| out.append(s) |
| return out |
|
|
| def _get_valid_actions(self) -> list[str]: |
| raw = [] |
| try: |
| raw = _safe_call(self.env.get_valid_actions) or [] |
| except Exception: |
| raw = [] |
| if raw: |
| return [str(a) for a in raw] |
| return list(CANONICAL_DIRECTIONS) + ["look", "inventory"] |
|
|
| def step(self, action: str) -> str: |
| if self.env is None: |
| self.initialize(os.environ.get("GAME", "zork1")) |
|
|
| action_norm = (action or "").strip().lower() |
| prev_loc = self.current_loc_id |
| prev_score = self._score |
|
|
| obs, _r, done, _info = _safe_call(self.env.step, action_norm) |
| obs = obs or "" |
| self.last_observation = obs |
|
|
| self._refresh_counters() |
| delta = int(self._score - prev_score) |
|
|
| new_loc_id, new_loc_name = self._get_loc(obs) |
| self.loc_names[new_loc_id] = new_loc_name |
| moved = new_loc_id != prev_loc |
|
|
| canon_dir = canonicalize_direction(action_norm) |
| key = canon_dir if canon_dir else action_norm |
|
|
| self.outcomes.setdefault(prev_loc, {}) |
| entry = self.outcomes[prev_loc].get(key, OutcomeEntry()) |
| entry.count += 1 |
| entry.moved = moved |
| entry.reward_delta = delta |
| entry.new_loc_id = new_loc_id if moved else -1 |
| failed_now = (not moved) and (delta == 0) and looks_like_failure(obs) |
| entry.failed = entry.failed or failed_now |
| entry.observation_snippet = obs[:120].replace("\n", " ") |
| self.outcomes[prev_loc][key] = entry |
|
|
| if canon_dir and moved: |
| self.edges.setdefault(prev_loc, {})[canon_dir] = new_loc_id |
|
|
| self.current_loc_id = new_loc_id |
| self.current_loc_name = new_loc_name |
| self._done = bool(done) or self._done |
| return obs |
|
|
| def get_status_dict(self) -> dict: |
| loc_id = self.current_loc_id |
| raw_out = self.outcomes.get(loc_id, {}) |
|
|
| outcomes_here = { |
| act: { |
| "count": e.count, |
| "moved": e.moved, |
| "reward_delta": e.reward_delta, |
| "new_loc_id": e.new_loc_id, |
| "failed": e.failed, |
| "observation_snippet": e.observation_snippet, |
| } |
| for act, e in raw_out.items() |
| } |
|
|
| tried_dirs = {a for a in outcomes_here.keys() if a in CANONICAL_DIR_SET} |
| untried = [d for d in CANONICAL_DIRECTIONS if d not in tried_dirs] |
| banned = [a for a, info in outcomes_here.items() if info.get("failed")] |
|
|
| edges_here = dict(self.edges.get(loc_id, {})) |
| valid_all = self._get_valid_actions() |
| tried_set = set(outcomes_here.keys()) |
|
|
| suggested = [] |
| for a in valid_all: |
| al = a.lower().strip() |
| if is_direction(a): |
| continue |
| if a in tried_set: |
| continue |
| if any(al.startswith(x) for x in UNSAFE_STARTS): |
| continue |
| suggested.append(a) |
| if len(suggested) >= 8: |
| break |
|
|
| valid_exits_set = set() |
| for a in valid_all: |
| d = canonicalize_direction(a) |
| if d and d not in tried_dirs: |
| valid_exits_set.add(d) |
| if not valid_exits_set: |
| for d in _extract_obs_exits(self.last_observation): |
| if d not in tried_dirs: |
| valid_exits_set.add(d) |
|
|
| return { |
| "game": self.game_name, |
| "loc_id": int(loc_id), |
| "loc_name": self.current_loc_name, |
| "score": int(self._score), |
| "max_score": int(self._max_score), |
| "moves": int(self._moves), |
| "done": bool(self._done), |
| "inventory": self.get_inventory_items(), |
| "last_observation": self.last_observation, |
| "outcomes_here": outcomes_here, |
| "edges_here": edges_here, |
| "untried_directions": untried, |
| "valid_exits": sorted(valid_exits_set), |
| "banned_actions_here": banned, |
| "suggested_interactions": suggested, |
| } |
|
|
| def get_memory(self) -> str: |
| st = self.get_status_dict() |
| edges = st.get("edges_here", {}) |
| exits_str = ", ".join(f"{d}->{edges[d]}" for d in sorted(edges)) or "none" |
| banned = ", ".join(st.get("banned_actions_here", [])) or "none" |
| inv = ", ".join(st.get("inventory", [])) or "empty" |
| return ( |
| f"Game: {self.game_name}\n" |
| f"Location: {st['loc_name']} (id={st['loc_id']})\n" |
| f"Score: {st['score']}/{st['max_score']} Moves: {st['moves']}\n" |
| f"Inventory: {inv}\n" |
| f"Known exits: {exits_str}\n" |
| f"Banned here: {banned}\n\n" |
| f"Observation:\n{st['last_observation']}" |
| ) |
|
|
| def get_map(self) -> str: |
| if not self.edges: |
| return "No map data yet." |
| lines = ["Directed exploration map:"] |
| for lid, exits in sorted(self.edges.items()): |
| src = self.loc_names.get(lid, f"loc#{lid}") |
| for d, dst_id in sorted(exits.items()): |
| dst = self.loc_names.get(dst_id, f"loc#{dst_id}") |
| lines.append(f" [{lid}] {src} --{d}--> [{dst_id}] {dst}") |
| lines.append(f"\nCurrent: [{self.current_loc_id}] {self.current_loc_name}") |
| return "\n".join(lines) |
|
|
| def get_state_snapshot(self): |
| return _safe_call(self.env.get_state) |
|
|
| def set_state_snapshot(self, st): |
| _safe_call(self.env.set_state, st) |
|
|
|
|
| _game = GameManager() |
|
|
|
|
| def _cleanup() -> None: |
| if _game.env is not None: |
| try: |
| close_fn = getattr(_game.env, "close", None) |
| if callable(close_fn): |
| close_fn() |
| except Exception: |
| pass |
|
|
|
|
| atexit.register(_cleanup) |
|
|
|
|
| def get_game() -> GameManager: |
| if _game.env is None: |
| _game.initialize(os.environ.get("GAME", "zork1")) |
| return _game |
|
|
|
|
| @mcp.tool() |
| def play_action(action: str) -> str: |
| game = get_game() |
| prev_score = game._score |
| obs = game.step(action) |
| suffix = "" |
| if game._score > prev_score: |
| suffix += f"\n\n+{game._score - prev_score} points! (Total: {game._score})" |
| if game._done: |
| suffix += "\n\nGAME OVER" |
| return (obs or "") + suffix |
|
|
|
|
| @mcp.tool() |
| def status() -> str: |
| return json.dumps(get_game().get_status_dict(), indent=2) |
|
|
|
|
| @mcp.tool() |
| def peek_action(action: str) -> str: |
| """ |
| Simulate an action and return the resulting status dict as JSON. |
| No commit: restores Jericho state + server tracking. |
| """ |
| game = get_game() |
| st = game.get_state_snapshot() |
|
|
| |
| saved_outcomes = {} |
| for lid, d in game.outcomes.items(): |
| saved_outcomes[lid] = {} |
| for k, v in d.items(): |
| saved_outcomes[lid][k] = OutcomeEntry( |
| count=v.count, |
| moved=v.moved, |
| reward_delta=v.reward_delta, |
| new_loc_id=v.new_loc_id, |
| failed=v.failed, |
| observation_snippet=v.observation_snippet, |
| ) |
|
|
| saved = { |
| "last_observation": game.last_observation, |
| "current_loc_id": game.current_loc_id, |
| "current_loc_name": game.current_loc_name, |
| "loc_names": dict(game.loc_names), |
| "outcomes": saved_outcomes, |
| "edges": {lid: dict(d) for lid, d in game.edges.items()}, |
| "_score": game._score, |
| "_max_score": game._max_score, |
| "_moves": game._moves, |
| "_done": game._done, |
| "game_name": game.game_name, |
| "rom_path": game.rom_path, |
| } |
|
|
| try: |
| _ = game.step(action) |
| return json.dumps(game.get_status_dict()) |
| finally: |
| game.set_state_snapshot(st) |
| game.last_observation = saved["last_observation"] |
| game.current_loc_id = saved["current_loc_id"] |
| game.current_loc_name = saved["current_loc_name"] |
| game.loc_names = saved["loc_names"] |
| game.outcomes = saved["outcomes"] |
| game.edges = saved["edges"] |
| game._score = saved["_score"] |
| game._max_score = saved["_max_score"] |
| game._moves = saved["_moves"] |
| game._done = saved["_done"] |
| game.game_name = saved["game_name"] |
| game.rom_path = saved["rom_path"] |
|
|
|
|
| @mcp.tool() |
| def valid_actions() -> str: |
| game = get_game() |
| return json.dumps(game._get_valid_actions()) |
|
|
|
|
| @mcp.tool() |
| def memory() -> str: |
| return get_game().get_memory() |
|
|
|
|
| @mcp.tool() |
| def inventory() -> str: |
| items = get_game().get_inventory_items() |
| return "Inventory: " + (", ".join(items) if items else "empty") |
|
|
|
|
| @mcp.tool() |
| def get_map() -> str: |
| return get_game().get_map() |
|
|
|
|
| @mcp.tool() |
| def reset(game: Optional[str] = None) -> str: |
| g = (game or os.environ.get("GAME", "zork1")).strip() |
| _game.initialize(g) |
| return _game.last_observation |
|
|
|
|
| if __name__ == "__main__": |
| mcp.run() |