Spaces:
Sleeping
Sleeping
| """ | |
| Example: MCP ReAct Agent | |
| A complete ReAct agent that uses MCP tools to play text adventure games. | |
| This is a working example students can learn from. | |
| """ | |
| import json | |
| import os | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from huggingface_hub import InferenceClient | |
| load_dotenv() | |
| # ============================================================================= | |
| # LLM Configuration - DO NOT MODIFY | |
| # ============================================================================= | |
| LLM_MODEL = "Qwen/Qwen2.5-72B-Instruct" | |
| _hf_token = os.getenv("HF_TOKEN") | |
| if not _hf_token: | |
| raise ValueError("HF_TOKEN not found. Set it in your .env file.") | |
| LLM_CLIENT = InferenceClient(token=_hf_token) | |
| def call_llm(prompt: str, system_prompt: str, seed: int, max_tokens: int = 300) -> str: | |
| """Call the LLM with the given prompt.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| # print("\n\n------------") | |
| # for m in messages[1:]: | |
| # print(f"{m['role']}: {m['content']}") | |
| # print("------------\n\n") | |
| response = LLM_CLIENT.chat.completions.create( | |
| model=LLM_MODEL, | |
| messages=messages, | |
| temperature=0.0, | |
| max_tokens=max_tokens, | |
| seed=seed, | |
| ) | |
| return response.choices[0].message.content | |
| class RunResult: | |
| """Result of running the agent. Do not modify this class.""" | |
| final_score: int | |
| max_score: int | |
| moves: int | |
| locations_visited: set[str] | |
| game_completed: bool | |
| error: Optional[str] = None | |
| history: list[tuple[str, str, str]] = field(default_factory=list) | |
| # ============================================================================= | |
| # System Prompt | |
| # ============================================================================= | |
| SYSTEM_PROMPT = """You are an expert text adventure game player. Your goal is to explore, collect treasures, and maximize your score as fast as possible. | |
| AVAILABLE TOOLS (use these via MCP): | |
| 1. play_action - Execute game commands and physically interact with your environment (north, take lamp, open mailbox, etc). | |
| 2. get_locations - List nearby locations that you visited or that are adjacent to locations you visited. | |
| 3. get_unexplored_locations - List nearby unexplored adjacent to locations you visited. | |
| 4. travel - Fast travel to a given location you previously visited through backtracking. | |
| 5. memory - Get a summary of the current game state, in case you feel lost. | |
| 6. inventory - Check your inventory. You have no inventory size limit. | |
| VALID GAME COMMANDS for play_action: | |
| - Movement: north, south, east, west, up, down, enter, exit | |
| - Objects: take <item>, drop <item>, open <thing>, close <thing>, examine <thing> | |
| - Light: turn on lamp, turn off lamp | |
| - Combat: attack <enemy> with <weapon> | |
| - Other: inventory, look, read <thing>, wait | |
| FORBIDDEN (will NOT work): check, inspect, search, grab, use, help | |
| RESPOND IN THIS EXACT FORMAT (no markdown): | |
| THOUGHT: <brief reasoning about what to do next> | |
| TOOL: <tool_name> | |
| ARGS: <JSON arguments> | |
| Examples: | |
| THOUGHT: I need to see what's around me. | |
| TOOL: play_action | |
| ARGS: {"action": "look"} | |
| THOUGHT: I'm completely loss and don't know where to go next. I will check for nearby unexplored locations. | |
| TOOL: get_unexplored_locations | |
| ARGS: {} | |
| THOUGHT: I need to explore new locations. I travel north of the burn forest directly. | |
| TOOL: travel | |
| ARGS: {"destination": "Unexplored (North Of Burnt Forest"} | |
| STRATEGY: | |
| 1. Explore systematically and travel to unexplored places. When relevant, explore up and down before exploring other directions. | |
| 2. Pick up useful items. They will not be collected automatically; you have to manually collect them (e.g. "take sword"). | |
| 3. Open containers (mailbox, window, etc.) | |
| 4. Use get_locations and get_unexplored_locations to avoid getting lost. Use 'travel' for faster travel. | |
| 5. Turn on lamp before dark areas! | |
| DO NOT repeat the same action multiple times in a row.""" | |
| # ============================================================================= | |
| # Student Agent Implementation | |
| # ============================================================================= | |
| class StudentAgent: | |
| def __init__(self): | |
| """Initialize the agent state.""" | |
| self.history: list[dict] = [] | |
| self.score: int = 0 | |
| async def run( | |
| self, | |
| client, | |
| game: str, | |
| max_steps: int, | |
| seed: int, | |
| verbose: bool = False, | |
| ) -> RunResult: | |
| """Run the agent for a game session.""" | |
| locations_visited = set() | |
| history = [] | |
| moves = 0 | |
| # Get list of available tools | |
| tools = await client.list_tools() | |
| tool_names = [t.name for t in tools] | |
| # Get initial observation | |
| observation, self.score, is_game_over = (await client.call_tool("play_action", {"action": "look"})).data | |
| # result = self._extract_result(await client.call_tool("play_action", {"action": "look"})) | |
| # observation = '\n'.join(result.split('\n')[:-2]) | |
| # self.score = max(self.score, int(result.split('\n')[-2])) | |
| # is_game_over = bool(result.split('\n')[-1]) | |
| self.history.append({ | |
| "step": 0, | |
| "thought": "This is the start of the game. I need to see what is around me.", | |
| "tool": 'play_action', | |
| "args": {'action': 'look'}, | |
| "result": observation, | |
| }) | |
| # Track initial location | |
| location = observation.split("\n")[0] if observation else "Unknown" | |
| locations_visited.add(location) | |
| if verbose: | |
| print(self._entry_to_str(self.history[-1])) | |
| # Main ReAct loop | |
| for step in range(1, max_steps + 1): | |
| # Make prompt from game history and call LLM | |
| prompt = self._make_prompt() | |
| response = call_llm(prompt, SYSTEM_PROMPT, seed + step) | |
| # Parse the response | |
| thought, tool_name, tool_args = self._parse_response(response, tool_names) | |
| if verbose: | |
| print(f"\n--- Step {step} ---") | |
| print(f"THOUGHT: {thought}") | |
| print(f"TOOL: {tool_name}") | |
| print(f"ARGS: {tool_args}") | |
| # Validate and fix common issues | |
| tool_name, tool_args = self._validate_tool_call(tool_name, tool_args, tool_names) | |
| # Execute the tool | |
| try: | |
| if tool_name == "play_action" or tool_name == "travel": | |
| moves += 1 | |
| # result = self._extract_result(await client.call_tool(tool_name, tool_args)) | |
| # observation = '\n'.join(result.split('\n')[:-2]) | |
| # self.score = max(self.score, int(result.split('\n')[-2])) | |
| # is_game_over = bool(int(result.split('\n')[-1])) | |
| observation, self.score, is_game_over = (await client.call_tool(tool_name, tool_args)).data | |
| # else: | |
| # # observation = self._extract_result(await client.call_tool(tool_name, tool_args)) | |
| # observation, = (await client.call_tool(tool_name, tool_args)).data | |
| except Exception as e: | |
| observation = f"Error: {e}" | |
| # Track location | |
| location = observation.split("\n")[0] if observation else "Unknown" | |
| locations_visited.add(location) | |
| # Update history | |
| self.history.append({ | |
| 'step': step, | |
| 'thought': thought, | |
| 'tool': tool_name, | |
| 'args': tool_args, | |
| 'result': observation, | |
| 'score': self.score, | |
| 'game_over': is_game_over, | |
| }) | |
| if verbose: | |
| print(f"GAME: {observation}") | |
| if is_game_over: | |
| if verbose: | |
| print("\n*** GAME OVER ***") | |
| break | |
| return RunResult( | |
| final_score=self.score, | |
| max_score=350, | |
| moves=moves, | |
| locations_visited=locations_visited, | |
| game_completed=is_game_over, | |
| history=history, | |
| ) | |
| def _entry_to_str(self, entry: dict) -> str: | |
| parts = [] | |
| parts.append(f"THOUGHT: {entry['thought']}") | |
| parts.append(f"TOOL: {entry['tool']}") | |
| parts.append(f"ARGS: {entry['args']}") | |
| parts.append(f"GAME: {entry['result']}") | |
| return '\n'.join(parts) | |
| def _make_prompt(self, n_past_steps: int = 4) -> str: | |
| """Build the prompt for the LLM with context.""" | |
| parts = [] | |
| # Recent history | |
| parts.append("\nHere are the last things that happened:") | |
| for entry in self.history[-n_past_steps:]: | |
| parts.append(self._entry_to_str(entry)) | |
| parts.append(f"\nYou current score is {self.score}. Now it's your turn! What do you do next?") | |
| return '\n'.join(parts) | |
| def _parse_response(self, response: str, valid_tools: list[str]) -> tuple[str, str, dict]: | |
| """Parse the LLM response to extract thought, tool, and arguments.""" | |
| thought = "No reasoning provided" | |
| tool_name = "play_action" | |
| tool_args = {"action": "look"} | |
| lines = response.strip().split("\n") | |
| for line in lines: | |
| line_clean = line.strip() | |
| line_upper = line_clean.upper() | |
| if line_upper.startswith("THOUGHT:"): | |
| thought = line_clean.split(":", 1)[1].strip() | |
| elif line_upper.startswith("TOOL:"): | |
| raw_tool = line_clean.split(":", 1)[1].strip().lower() | |
| raw_tool = raw_tool.replace("**", "").replace("*", "").replace("`", "") | |
| raw_tool = raw_tool.split()[0] if raw_tool else "play_action" | |
| tool_name = raw_tool | |
| elif line_upper.startswith("ARGS:"): | |
| args_part = line_clean.split(":", 1)[1].strip() | |
| try: | |
| args_part = args_part.replace("'", '"') | |
| tool_args = json.loads(args_part) | |
| except json.JSONDecodeError: | |
| match = re.search(r'"action"\s*:\s*"([^"]+)"', args_part) | |
| if match: | |
| tool_args = {"action": match.group(1)} | |
| else: | |
| tool_args = {"action": "look"} | |
| return thought, tool_name, tool_args | |
| def _validate_tool_call(self, tool_name: str, tool_args: dict, valid_tools: list[str]) -> tuple[str, dict]: | |
| """Validate and fix common tool call issues.""" | |
| # Fix tool name | |
| if tool_name not in valid_tools: | |
| if tool_name in ["action", "do", "command"]: | |
| tool_name = "play_action" | |
| elif tool_name in ["map", "location"]: | |
| tool_name = "get_map" | |
| elif tool_name in ["mem", "state", "status"]: | |
| tool_name = "memory" | |
| elif tool_name in ["inv", "items"]: | |
| tool_name = "inventory" | |
| else: | |
| tool_name = "play_action" | |
| # Fix action verbs | |
| if tool_name == "play_action": | |
| action = tool_args.get("action", "look") | |
| invalid_verb_map = { | |
| "check": "examine", | |
| "inspect": "examine", | |
| "search": "look", | |
| "grab": "take", | |
| "pick": "take", | |
| "use": "examine", | |
| "investigate": "examine", | |
| } | |
| words = action.lower().split() | |
| if words and words[0] in invalid_verb_map: | |
| words[0] = invalid_verb_map[words[0]] | |
| action = " ".join(words) | |
| action = action.lower().strip() | |
| action = action.replace("**", "").replace("*", "").replace("`", "") | |
| action = " ".join(action.split()) | |
| tool_args["action"] = action | |
| return tool_name, tool_args | |
| def _extract_result(self, result) -> str: | |
| """Extract text from MCP tool result.""" | |
| # return result.data | |
| if hasattr(result, 'content') and result.content: | |
| return result.content[0].text | |
| if isinstance(result, list) and result: | |
| return result[0].text if hasattr(result[0], 'text') else str(result[0]) | |
| return str(result) | |
| def _update_score(self, text: str) -> None: | |
| """Update score from game text.""" | |
| patterns = [ | |
| r'Score:\s*(\d+)', | |
| r'score[:\s]+(\d+)', | |
| r'\[Score:\s*(\d+)', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| self.score = max(self.score, int(match.group(1))) | |
| def _is_game_over(self, text: str) -> bool: | |
| """Check if the game is over.""" | |
| game_over_phrases = [ | |
| "game over", | |
| "you have died", | |
| "you are dead", | |
| "*** you have died ***", | |
| ] | |
| text_lower = text.lower() | |
| return any(phrase in text_lower for phrase in game_over_phrases) | |
| # ============================================================================= | |
| # Local Testing | |
| # ============================================================================= | |
| async def test_agent(): | |
| """Test the agent locally.""" | |
| from fastmcp import Client | |
| agent = StudentAgent() | |
| async with Client("mcp_server.py") as client: | |
| result = await agent.run( | |
| client=client, | |
| game="zork1", | |
| max_steps=20, | |
| seed=42, | |
| verbose=True, | |
| ) | |
| print(f"\n{'=' * 50}") | |
| print(f"Final Score: {result.final_score}") | |
| print(f"Moves: {result.moves}") | |
| print(f"Locations: {len(result.locations_visited)}") | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(test_agent()) | |