Spaces:
Sleeping
Sleeping
| """ | |
| MCP Server - Enhanced Generalist Agent | |
| Features: | |
| - Graph-based mapping with BFS pathfinding | |
| - Structured notebook (clues, puzzles, NPCs, dangers) | |
| - Explicit inventory tracking | |
| - Unexplored exit tracking for systematic exploration | |
| - Stuck detection helpers | |
| """ | |
| import re | |
| import sys | |
| import os | |
| from collections import deque | |
| from fastmcp import FastMCP | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from games.zork_env import TextAdventureEnv | |
| INITIAL_GAME = os.environ.get("GAME", "zork1") | |
| mcp = FastMCP("Text Adventure Server") | |
| # All recognized movement directions | |
| DIRECTIONS = { | |
| "n", | |
| "s", | |
| "e", | |
| "w", | |
| "u", | |
| "d", | |
| "ne", | |
| "nw", | |
| "se", | |
| "sw", | |
| "north", | |
| "south", | |
| "east", | |
| "west", | |
| "up", | |
| "down", | |
| "northeast", | |
| "northwest", | |
| "southeast", | |
| "southwest", | |
| "in", | |
| "out", | |
| "enter", | |
| "exit", | |
| } | |
| # Canonical direction mapping for consistency in the graph | |
| DIR_CANONICAL = { | |
| "n": "north", | |
| "s": "south", | |
| "e": "east", | |
| "w": "west", | |
| "u": "up", | |
| "d": "down", | |
| "ne": "northeast", | |
| "nw": "northwest", | |
| "se": "southeast", | |
| "sw": "southwest", | |
| } | |
| OPPOSITE_DIR = { | |
| "north": "south", | |
| "south": "north", | |
| "east": "west", | |
| "west": "east", | |
| "up": "down", | |
| "down": "up", | |
| "northeast": "southwest", | |
| "southwest": "northeast", | |
| "northwest": "southeast", | |
| "southeast": "northwest", | |
| "in": "out", | |
| "out": "in", | |
| "enter": "exit", | |
| "exit": "enter", | |
| } | |
| def canonicalize(direction: str) -> str: | |
| d = direction.lower().strip() | |
| return DIR_CANONICAL.get(d, d) | |
| class WorldMap: | |
| """Graph-based world map with BFS pathfinding.""" | |
| def __init__(self): | |
| # room_name -> {canonical_direction: destination_room_name} | |
| self.graph: dict[str, dict[str, str]] = {} | |
| # room_name -> brief description | |
| self.room_info: dict[str, str] = {} | |
| # room_name -> set of canonical directions mentioned but not yet taken | |
| self.known_exits: dict[str, set[str]] = {} | |
| def ensure_room(self, name: str): | |
| if name and name != "Unknown": | |
| if name not in self.graph: | |
| self.graph[name] = {} | |
| if name not in self.known_exits: | |
| self.known_exits[name] = set() | |
| def record_move(self, from_room: str, direction: str, to_room: str): | |
| """Record a successful movement between rooms.""" | |
| d = canonicalize(direction) | |
| self.ensure_room(from_room) | |
| self.ensure_room(to_room) | |
| if from_room != to_room and from_room != "Unknown" and to_room != "Unknown": | |
| self.graph[from_room][d] = to_room | |
| # Record reverse edge | |
| opp = OPPOSITE_DIR.get(d) | |
| if opp: | |
| self.graph[to_room][opp] = from_room | |
| # Remove from unexplored | |
| self.known_exits.get(from_room, set()).discard(d) | |
| def record_blocked(self, room: str, direction: str): | |
| """Record that a direction is blocked/doesn't work from a room.""" | |
| d = canonicalize(direction) | |
| self.ensure_room(room) | |
| self.graph[room][d] = "[BLOCKED]" | |
| self.known_exits.get(room, set()).discard(d) | |
| def register_exits(self, room: str, directions: list[str]): | |
| """Register exits mentioned in room description that we haven't explored yet.""" | |
| self.ensure_room(room) | |
| for d in directions: | |
| cd = canonicalize(d) | |
| if cd not in self.graph.get(room, {}): | |
| self.known_exits[room].add(cd) | |
| def set_room_info(self, room: str, info: str): | |
| self.room_info[room] = info[:200] | |
| def find_path(self, start: str, end: str) -> list[str] | None: | |
| """BFS shortest path. Returns list of directions, or None if no path.""" | |
| if start == end: | |
| return [] | |
| if start not in self.graph or end not in self.graph: | |
| return None | |
| queue = deque([(start, [])]) | |
| visited = {start} | |
| while queue: | |
| current, path = queue.popleft() | |
| for direction, neighbor in self.graph.get(current, {}).items(): | |
| if neighbor == "[BLOCKED]" or neighbor in visited: | |
| continue | |
| if neighbor == end: | |
| return path + [direction] | |
| visited.add(neighbor) | |
| queue.append((neighbor, path + [direction])) | |
| return None | |
| def get_unexplored(self) -> list[tuple[str, str]]: | |
| """Get all (room, direction) pairs that are known but unexplored.""" | |
| result = [] | |
| for room, dirs in self.known_exits.items(): | |
| for d in dirs: | |
| result.append((room, d)) | |
| return result | |
| def get_nearest_unexplored(self, current: str) -> tuple[list[str], str] | None: | |
| """Find the nearest unexplored exit from current position. | |
| Returns (path_to_room, unexplored_direction) or None.""" | |
| unexplored = self.get_unexplored() | |
| if not unexplored: | |
| return None | |
| best = None | |
| best_len = float("inf") | |
| for room, direction in unexplored: | |
| if room == current: | |
| return ([], direction) | |
| path = self.find_path(current, room) | |
| if path is not None and len(path) < best_len: | |
| best = (path, direction) | |
| best_len = len(path) | |
| return best | |
| def to_text(self, current: str = "") -> str: | |
| if not self.graph: | |
| return "Map is empty — no paths recorded yet." | |
| lines = [] | |
| for room in sorted(self.graph.keys()): | |
| exits = self.graph[room] | |
| marker = " << YOU ARE HERE" if room == current else "" | |
| exit_parts = [] | |
| for d, dest in sorted(exits.items()): | |
| if dest == "[BLOCKED]": | |
| exit_parts.append(f"{d}:BLOCKED") | |
| else: | |
| exit_parts.append(f"{d}->{dest}") | |
| unexplored = self.known_exits.get(room, set()) | |
| for d in sorted(unexplored): | |
| exit_parts.append(f"{d}:???") | |
| exits_str = ", ".join(exit_parts) if exit_parts else "no known exits" | |
| lines.append(f" [{room}]{marker}: {exits_str}") | |
| return "Known Map:\n" + "\n".join(lines) | |
| class Notebook: | |
| """Structured notebook for clues, items, puzzles, etc.""" | |
| def __init__(self): | |
| self.entries: list[dict] = [] | |
| def add(self, text: str, category: str = "General") -> str: | |
| cat = category.upper().strip() | |
| entry = {"category": cat, "text": text} | |
| # Avoid exact duplicates | |
| for e in self.entries: | |
| if e["text"] == text and e["category"] == cat: | |
| return f"(Already noted: {text})" | |
| self.entries.append(entry) | |
| return f"Noted [{cat}]: {text} (Total: {len(self.entries)} entries)" | |
| def to_text(self) -> str: | |
| if not self.entries: | |
| return "Notebook is empty." | |
| lines = [] | |
| for e in self.entries: | |
| lines.append(f" [{e['category']}] {e['text']}") | |
| return "\n".join(lines) | |
| def search(self, keyword: str) -> str: | |
| kw = keyword.lower() | |
| matches = [ | |
| e | |
| for e in self.entries | |
| if kw in e["text"].lower() or kw in e["category"].lower() | |
| ] | |
| if not matches: | |
| return f"No notes matching '{keyword}'." | |
| return "\n".join(f" [{e['category']}] {e['text']}" for e in matches) | |
| class GameState: | |
| def __init__(self, game_name: str): | |
| self.env = TextAdventureEnv(game_name) | |
| self.state = self.env.reset() | |
| self.world_map = WorldMap() | |
| self.notebook = Notebook() | |
| self.current_location = "Unknown" | |
| self._update_location(self.state.location) | |
| self.world_map.ensure_room(self.current_location) | |
| self.current_goal = ( | |
| "Explore the environment, collect useful items, and increase score." | |
| ) | |
| self.plan = "" | |
| self.action_history: list[str] = [] # last N actions for stuck detection | |
| def _clean_location_name(self, raw: str) -> str: | |
| if not raw: | |
| return "Unknown" | |
| if ":" in raw: | |
| raw = raw.split(":", 1)[1].strip() | |
| raw = re.sub( | |
| r"\s*(Parent\d+|Sibling\d+|Child\d+|Attributes\s*\[.*?\]|Properties\s*\[.*?\]).*", | |
| "", | |
| raw, | |
| ) | |
| return raw.strip() or "Unknown" | |
| def _update_location(self, raw_loc: str): | |
| self.current_location = self._clean_location_name(raw_loc) | |
| def step(self, action: str) -> str: | |
| prev_loc = self.current_location | |
| action_clean = action.strip() | |
| self.state = self.env.step(action_clean) | |
| self._update_location(self.state.location) | |
| # Track action history for stuck detection | |
| self.action_history.append(action_clean.lower()) | |
| if len(self.action_history) > 20: | |
| self.action_history = self.action_history[-20:] | |
| # Determine if this was a movement command | |
| action_lower = action_clean.lower() | |
| is_move = action_lower in DIRECTIONS | |
| if is_move: | |
| if prev_loc != self.current_location and prev_loc != "Unknown": | |
| self.world_map.record_move( | |
| prev_loc, action_lower, self.current_location | |
| ) | |
| elif prev_loc == self.current_location and prev_loc != "Unknown": | |
| self.world_map.record_blocked(prev_loc, action_lower) | |
| return self.state.observation | |
| def get_inventory_text(self) -> str: | |
| if hasattr(self.state, "inventory") and self.state.inventory: | |
| items = self.state.inventory | |
| if isinstance(items, list): | |
| return ", ".join(items) if items else "empty-handed" | |
| return str(items) | |
| return "(unknown — try 'inventory' command)" | |
| def is_stuck(self) -> bool: | |
| """Detect if agent is looping: same action repeated 3+ times in last 6.""" | |
| if len(self.action_history) < 6: | |
| return False | |
| recent = self.action_history[-6:] | |
| from collections import Counter | |
| counts = Counter(recent) | |
| return counts.most_common(1)[0][1] >= 3 | |
| _game: GameState | None = None | |
| def get_game() -> GameState: | |
| global _game | |
| if _game is None: | |
| _game = GameState(INITIAL_GAME) | |
| return _game | |
| # ============================================================================= | |
| # TOOLS | |
| # ============================================================================= | |
| def play_action(action: str) -> str: | |
| """Execute a game command. Examples: 'north', 'take lamp', 'examine rug', 'open door', 'inventory'. | |
| For directions, just use: north/south/east/west/up/down/in/out/ne/nw/se/sw. | |
| Returns the game's response plus your current location and score.""" | |
| g = get_game() | |
| obs = g.step(action) | |
| # Build response with structured metadata | |
| parts = [obs] | |
| parts.append( | |
| f"\n[Location: {g.current_location} | Score: {g.state.score}/{g.state.max_score}]" | |
| ) | |
| # If stuck, add a gentle nudge | |
| if g.is_stuck(): | |
| parts.append( | |
| "[WARNING: You seem to be repeating actions. Try something different!]" | |
| ) | |
| return "\n".join(parts) | |
| def think(goal: str, thought: str) -> str: | |
| """Plan your strategy. Update your current goal and reasoning. | |
| Use this to organize what you want to accomplish and why.""" | |
| g = get_game() | |
| g.current_goal = goal | |
| g.plan = thought | |
| return f"Goal updated: {goal}\nPlan: {thought}\nLocation: {g.current_location}" | |
| def notebook_write(text: str, category: str = "Clue") -> str: | |
| """Save important information to your permanent notebook. | |
| Categories: Clue, Puzzle, Item, Danger, NPC, Code, Goal, Map. | |
| Use this to remember puzzle hints, codes, locked doors, NPC dialogue, etc.""" | |
| g = get_game() | |
| return g.notebook.add(text, category) | |
| def notebook_read(keyword: str = "") -> str: | |
| """Read your notebook. Optionally filter by keyword or category. | |
| Call with no keyword to see everything.""" | |
| g = get_game() | |
| if keyword: | |
| return g.notebook.search(keyword) | |
| return g.notebook.to_text() | |
| def memory() -> str: | |
| """Get a full status dump: location, goal, inventory, notebook, and map.""" | |
| g = get_game() | |
| return f"""=== STATUS === | |
| Location: {g.current_location} | |
| Score: {g.state.score}/{g.state.max_score} | |
| Goal: {g.current_goal} | |
| Plan: {g.plan} | |
| === INVENTORY === | |
| {g.get_inventory_text()} | |
| === NOTEBOOK ({len(g.notebook.entries)} entries) === | |
| {g.notebook.to_text()} | |
| === MAP === | |
| {g.world_map.to_text(g.current_location)} | |
| """ | |
| def get_map() -> str: | |
| """View your explored map with all known connections and unexplored exits.""" | |
| g = get_game() | |
| txt = g.world_map.to_text(g.current_location) | |
| unexplored = g.world_map.get_unexplored() | |
| if unexplored: | |
| txt += f"\n\nUnexplored exits ({len(unexplored)}):" | |
| for room, d in unexplored: | |
| txt += f"\n {room} -> {d} (not yet visited)" | |
| return txt | |
| def find_path(target_room: str) -> str: | |
| """Find the shortest path from your current location to a target room. | |
| Returns step-by-step directions, or says if no path is known.""" | |
| g = get_game() | |
| path = g.world_map.find_path(g.current_location, target_room) | |
| if path is None: | |
| # Try fuzzy match | |
| for room in g.world_map.graph: | |
| if target_room.lower() in room.lower(): | |
| path = g.world_map.find_path(g.current_location, room) | |
| if path is not None: | |
| target_room = room | |
| break | |
| if path is None: | |
| return f"No known path from '{g.current_location}' to '{target_room}'. You may need to explore more." | |
| if not path: | |
| return f"You are already at '{target_room}'!" | |
| return f"Path to '{target_room}': {' -> '.join(path)} ({len(path)} steps)" | |
| def suggest_exploration() -> str: | |
| """Get a suggestion for where to explore next, based on unexplored exits. | |
| Finds the nearest unexplored direction and tells you how to get there.""" | |
| g = get_game() | |
| result = g.world_map.get_nearest_unexplored(g.current_location) | |
| if result is None: | |
| return "No unexplored exits recorded. Try: look (to spot exits), or try directions manually." | |
| path, unexplored_dir = result | |
| if not path: | |
| return f"There's an unexplored exit right here: go '{unexplored_dir}'!" | |
| path_str = " -> ".join(path) | |
| return ( | |
| f"Nearest unexplored exit: go to via [{path_str}], then try '{unexplored_dir}'." | |
| ) | |
| def register_exits(directions: str) -> str: | |
| """Tell the map about exits you see in the current room description. | |
| Pass a comma-separated list of directions, e.g. 'north, south, up'. | |
| This helps track what you haven't explored yet.""" | |
| g = get_game() | |
| dirs = [d.strip().lower() for d in directions.split(",") if d.strip()] | |
| valid = [d for d in dirs if canonicalize(d) in DIRECTIONS or d in DIRECTIONS] | |
| if valid: | |
| g.world_map.register_exits(g.current_location, valid) | |
| return f"Registered exits at {g.current_location}: {', '.join(valid)}" | |
| return "No valid directions recognized. Use: north, south, east, west, up, down, in, out, etc." | |
| if __name__ == "__main__": | |
| mcp.run() | |