dspy-zmachine / mcp_server.py
janisaiad's picture
everything working
6e1e03a
"""
Student MCP Server for Text Adventure Games
Full Z-machine integration via Jericho: inventory, location, score, moves,
valid_actions, and state hash come directly from the Z-machine (no LLM parsing).
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastmcp import FastMCP
from games.zork_env import TextAdventureEnv
# =============================================================================
# Create the MCP Server
# =============================================================================
mcp = FastMCP("Student Text Adventure Server")
# =============================================================================
# Game State Management (Z-machine direct access via Jericho)
# =============================================================================
class GameManager:
"""
Manages game state with direct Z-machine access through Jericho FrotzEnv.
All structured data (inventory, location, score, valid_actions) comes from
the Z-machine, not from text parsing.
Tracks visited locations and failed actions per state for loop/deadlock avoidance.
"""
def __init__(self):
self.env: TextAdventureEnv | None = None
self.state = None
self.game_name: str = ""
self.history: list[tuple[str, str]] = []
self.explored_locations: dict[str, set[str]] = {}
self.visited_location_hashes: set[str] = set()
self.failed_actions_at_state: dict[str, set[str]] = {}
def initialize(self, game: str = "lostpig"):
"""Initialize or reset the game."""
self.game_name = game
self.env = TextAdventureEnv(game)
self.state = self.env.reset()
self.history = []
self.explored_locations = {}
self.visited_location_hashes = set()
self.failed_actions_at_state = {}
return self.state.observation
def step(self, action: str) -> str:
"""Execute an action and return the result."""
if self.env is None:
self.initialize(os.environ.get("GAME", "lostpig"))
self.state = self.env.step(action)
self.history.append((action, self.state.observation))
if len(self.history) > 50:
self.history = self.history[-50:]
self._update_map(action)
state_hash = self.get_state_hash()
self.visited_location_hashes.add(state_hash[:64] if state_hash else "")
return self.state.observation
def record_failed_action(self, state_hash: str, action: str) -> None:
"""Record that action failed at this state (for loop/deadlock avoidance)."""
key = (state_hash or "")[:64]
if key not in self.failed_actions_at_state:
self.failed_actions_at_state[key] = set()
self.failed_actions_at_state[key].add(action.lower().strip())
if len(self.failed_actions_at_state[key]) > 20:
self.failed_actions_at_state[key] = set(list(self.failed_actions_at_state[key])[-15:])
def get_failed_actions_at_current_state(self) -> list[str]:
"""Return actions that failed at the current state (do not repeat)."""
h = self.get_state_hash()
key = (h or "")[:64]
return list(self.failed_actions_at_state.get(key, []))[:15]
def get_locations_count(self) -> int:
"""Return number of unique locations visited (evaluation metric)."""
current = self.get_player_location_zmachine()
rooms = set(self.explored_locations.keys()) | {current}
return len(rooms)
def _update_map(self, action: str):
"""Update explored locations from Z-machine state."""
if action.lower() in ("north", "south", "east", "west", "up", "down", "enter", "exit",
"n", "s", "e", "w", "u", "d"):
prev_loc = self._get_location()
new_loc = self.state.location if hasattr(self.state, "location") else self._extract_location(self.state.observation)
if prev_loc not in self.explored_locations:
self.explored_locations[prev_loc] = set()
if new_loc != prev_loc:
self.explored_locations[prev_loc].add(f"{action} -> {new_loc}")
def _extract_location(self, observation: str) -> str:
"""Fallback: extract location from first line of observation."""
lines = observation.strip().split("\n")
return lines[0] if lines else "Unknown"
def _get_location(self) -> str:
"""Get current location from Z-machine (state.location) or fallback."""
if self.state and hasattr(self.state, "location") and self.state.location:
return self.state.location
if self.history:
return self._extract_location(self.history[-1][1])
return "Unknown"
def get_score(self) -> int:
"""Get current score from Z-machine."""
return self.state.score if self.state else 0
def get_moves(self) -> int:
"""Get number of moves from Z-machine."""
return self.state.moves if self.state else 0
def get_max_score(self) -> int:
"""Get max possible score from Z-machine."""
if self.state and hasattr(self.state, "max_score"):
return self.state.max_score
try:
return self.env.env.get_max_score() if self.env else 0
except Exception:
return 0
def get_inventory_zmachine(self) -> list:
"""Get inventory directly from Z-machine (list of objects)."""
try:
return [str(obj) for obj in self.env.env.get_inventory()]
except Exception:
return self.state.inventory if (self.state and hasattr(self.state, "inventory")) else []
def get_valid_actions_zmachine(self) -> list[str]:
"""Get valid actions directly from Z-machine (object tree)."""
try:
return self.env.get_valid_actions()
except Exception:
return ["north", "south", "east", "west", "up", "down", "look", "inventory", "take all"]
def get_state_hash(self) -> str:
"""Get world state hash from Z-machine for loop detection."""
try:
return str(self.env.env.get_state())
except Exception:
return ""
def get_player_location_zmachine(self) -> str:
"""Get player location directly from Z-machine."""
try:
loc = self.env.env.get_player_location()
return str(loc) if loc else self._get_location()
except Exception:
return self._get_location()
def format_inventory(self, items: list) -> str:
"""Format inventory items (clean Z-machine object names)."""
if not items:
return "Inventory: You are empty-handed."
names = []
for item in items:
s = str(item).lower()
if "parent" in s:
idx = s.index("parent")
name = str(item)[:idx].strip()
if ":" in name:
name = name.split(":", 1)[1].strip()
names.append(name)
elif ":" in str(item):
names.append(str(item).split(":", 1)[1].strip())
else:
names.append(str(item))
return f"Inventory: {', '.join(names)}"
def get_memory(self) -> str:
"""Get game state summary (location/score/moves from Z-machine)."""
recent = self.history[-5:] if self.history else []
recent_str = "\n".join([f" > {a} -> {r[:60]}..." for a, r in recent]) if recent else " (none yet)"
loc = self.get_player_location_zmachine()
return f"""Current State (Z-machine):
- Location: {loc}
- Score: {self.get_score()} / {self.get_max_score()} points
- Moves: {self.get_moves()}
- Game: {self.game_name}
Recent Actions:
{recent_str}
Current Observation:
{self.state.observation if self.state else 'N/A'}"""
def get_map(self) -> str:
"""Get map of explored locations."""
if not self.explored_locations:
return "Map: No locations explored yet. Try moving around!"
lines = ["Explored Locations and Exits:"]
for loc, exits in sorted(self.explored_locations.items()):
lines.append(f"\n* {loc}")
for exit_info in sorted(exits):
lines.append(f" -> {exit_info}")
lines.append(f"\n[Current] {self.get_player_location_zmachine()}")
return "\n".join(lines)
# Global game manager
_game: GameManager | None = None
def get_game() -> GameManager:
"""Get or initialize the game manager."""
global _game
if _game is None:
_game = GameManager()
if _game.env is None:
_game.initialize(os.environ.get("GAME", "lostpig"))
return _game
# =============================================================================
# MCP Tools (all use Z-machine data where available)
# =============================================================================
@mcp.tool()
def play_action(action: str) -> str:
"""
Execute a game command and return the result.
Args:
action: The command to execute (e.g., "north", "take lamp", "open mailbox")
Returns:
The game's response to the action
"""
game = get_game()
result = game.step(action)
score_info = f"\n\n[Score: {game.get_score()} | Moves: {game.get_moves()}]"
if game.state and game.state.reward > 0:
score_info = f"\n\n+{game.state.reward} points! (Total: {game.get_score()})"
done_info = "\n\nGAME OVER" if (game.state and game.state.done) else ""
return result + score_info + done_info
@mcp.tool()
def memory() -> str:
"""
Get current game state summary (location, score, moves, recent history).
Location and score come from Z-machine directly.
"""
return get_game().get_memory()
@mcp.tool()
def inventory() -> str:
"""
Check what the player is carrying.
Data comes directly from Z-machine get_inventory().
"""
game = get_game()
items = game.get_inventory_zmachine()
return game.format_inventory(items)
@mcp.tool()
def get_map() -> str:
"""
Get a map of explored locations and connections.
"""
return get_game().get_map()
@mcp.tool()
def get_valid_actions() -> str:
"""
Get a list of valid actions from the Z-machine object tree.
Used by Critic for fast validation before LLM evaluation.
"""
game = get_game()
try:
valid = game.get_valid_actions_zmachine()
return "Valid actions: " + ", ".join(str(a) for a in valid[:30])
except Exception:
return "Could not get valid actions (spacy may be required)."
@mcp.tool()
def get_state_hash() -> str:
"""
Get a hash of the current Z-machine world state for loop detection.
"""
game = get_game()
h = game.get_state_hash()
return f"State hash: {h[:80]}..." if len(h) > 80 else f"State hash: {h}"
@mcp.tool()
def record_failed_action(state_hash: str, action: str) -> str:
"""
Record that an action failed at the given state. Never repeat this action when returning to this state.
Call this after play_action when the result indicates failure (no progress, rejection, etc).
"""
game = get_game()
game.record_failed_action(state_hash or "", action or "")
return "Recorded."
@mcp.tool()
def get_context_for_agent() -> str:
"""
Get context summary for agent: locations count, failed actions at current state.
Use this to avoid loops and deadlocks.
"""
game = get_game()
loc_count = game.get_locations_count()
failed = game.get_failed_actions_at_current_state()
h = game.get_state_hash()
state_key = (h or "")[:64]
lines = [
f"Locations visited: {loc_count}",
f"State hash (for loop detection): {state_key[:32]}...",
]
if failed:
lines.append(f"DO NOT repeat at current location: {', '.join(failed[:10])}")
return "\n".join(lines)
# =============================================================================
# Run the server
# =============================================================================
if __name__ == "__main__":
mcp.run()