| """ |
| Example: MCP ReAct Agent |
| |
| A complete ReAct agent that uses MCP tools to play text adventure games. |
| This is a working example students can learn from. |
| """ |
|
|
|
|
| MAX_STEPS_ALLOWED = 8 |
|
|
| import json |
| import os |
| import re |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| from dotenv import load_dotenv |
| from huggingface_hub import InferenceClient |
|
|
| from typing import List, Optional, Dict |
| from pydantic import BaseModel |
| import json |
|
|
| from adaptive_knowledge import AdaptiveKnowledgeManager |
| from spatial_memory import SpatialMemorySystem |
| from memory import HierarchicalMemoryManager |
| from prompts import ( |
| SYSTEM_PROMPT, |
| PLANNER_SYSTEM_PROMPT, |
| EXTRACTOR_SYSTEM_PROMPT, |
| CRITIC_SYSTEM_PROMPT, |
| MEMORY_SYNTHESIS_PROMPT |
| ) |
|
|
| load_dotenv() |
|
|
| |
| |
| |
|
|
| LLM_MODEL = "Qwen/Qwen2.5-72B-Instruct" |
| |
| |
| |
| |
| |
| |
|
|
| _hf_token = os.getenv("HF_TOKEN") |
| if not _hf_token: |
| raise ValueError("HF_TOKEN not found. Set it in your .env file.") |
|
|
| LLM_CLIENT = InferenceClient(token=_hf_token) |
|
|
| def call_llm(prompt: str, system_prompt: str, seed: int, max_tokens: int = 300) -> str: |
| """Call the LLM with the given prompt.""" |
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": prompt}, |
| ] |
| |
| response = LLM_CLIENT.chat.completions.create( |
| model=LLM_MODEL, |
| messages=messages, |
| temperature=0.0, |
| max_tokens=max_tokens, |
| seed=seed, |
| ) |
| |
| return response.choices[0].message.content |
|
|
|
|
| @dataclass |
| class RunResult: |
| """Result of running the agent. Do not modify this class.""" |
| final_score: int |
| max_score: int |
| moves: int |
| locations_visited: set[str] |
| game_completed: bool |
| error: Optional[str] = None |
| history: list[tuple[str, str, str]] = field(default_factory=list) |
|
|
|
|
| |
| |
| |
|
|
| class StudentAgent: |
| """ |
| MCP ReAct Agent - A complete working example. |
| |
| This agent demonstrates: |
| - ReAct loop (Thought -> Tool -> Observation) |
| - Loop detection |
| - Action validation |
| - Score tracking via memory tool |
| """ |
| |
| def __init__(self): |
| """Initialize the agent state.""" |
| self.history: list[dict] = [] |
| self.recent_actions: list[str] = [] |
|
|
| self.score: int = 0 |
| self.strategist = StrategyModule() |
|
|
| self.critic = CriticAgent(call_llm_func=call_llm, verbose=True) |
| self.knowledge_manager = AdaptiveKnowledgeManager("knowledgebase.md") |
| self.world_mapper = WorldMapper() |
|
|
| self.extractor = ObservationExtractor(call_llm) |
|
|
| self.learning_interval = 10 |
| self.last_learning_step = 0 |
| self.seen_responses = {} |
|
|
| self.last_room_id = None |
| self.last_world_hash = None |
|
|
| self.memory_manager = HierarchicalMemoryManager(call_llm) |
| self.current_location = "Start" |
| self.location_action_memory = {} |
|
|
| self.steps_in_current_room = 0 |
|
|
| self.current_room_actions = set() |
| |
|
|
| async def run( |
| self, |
| client, |
| game: str, |
| max_steps: int, |
| seed: int, |
| verbose: bool = False, |
| ) -> RunResult: |
| """Run the agentknowledge_content = self.knowledge_manager.load_knowledge() |
| if verbose and knowledge_content: |
| print(f"\n[INIT] Loaded strategic knowledge ({len(knowledge_content)} chars)") |
| for a game session.""" |
| locations_visited = set() |
| history = [] |
| moves = 0 |
|
|
| |
| knowledge_content = self.knowledge_manager.load_knowledge() |
| if verbose and knowledge_content: |
| print(f"\n[INIT] Loaded strategic knowledge ({len(knowledge_content)} chars)") |
|
|
| |
| tools = await client.list_tools() |
| tool_names = [t.name for t in tools] |
| |
| |
| result = await client.call_tool("play_action", {"action": "look"}) |
| observation = self._extract_result(result) |
|
|
| loc_result = await client.call_tool("get_location_info", {}) |
| raw_res = self._extract_result(loc_result) |
|
|
| if isinstance(raw_res, str): |
| import json |
| loc_dict = json.loads(raw_res) |
| else: |
| loc_dict = raw_res |
| |
| structured_data = self.extractor.extract( |
| raw_text=observation, |
| seed=seed + 0, |
| ram_data=loc_dict, |
| last_location=self.current_location |
| ) |
|
|
| self.world_mapper.update_map(structured_data, "look", observation) |
| |
| |
| |
| |
| |
| |
| if verbose: |
| print(f"\n{observation}") |
| |
| |
| for step in range(1, max_steps + 1): |
|
|
| raw_possible_actions = "No actions available" |
| structured_data = {"location_name": self.current_location, "is_new_location": False} |
| priority_guidance = "" |
|
|
| try: |
| inv_result = await client.call_tool("inventory", {}) |
| current_inv = self._extract_result(inv_result) |
|
|
| loc_result = await client.call_tool("get_location_info", {}) |
| raw_res = self._extract_result(loc_result) |
|
|
| if isinstance(raw_res, str): |
| import json |
| loc_dict = json.loads(raw_res) |
| else: |
| loc_dict = raw_res |
|
|
| if loc_dict.get("status") == "success": |
| current_loc_id = loc_dict["location"].get("id") |
| current_loc_name = loc_dict["location"].get("name") |
| current_world_hash = loc_dict.get("world_hash") |
|
|
| objects_in_room = [obj["name"] for obj in loc_dict.get("detected_objects", [])] |
| else: |
| print("⚠️ Erreur lors de la récupération des données RAM") |
|
|
| is_new_room = False |
| if current_loc_id != self.last_room_id: |
| is_new_room = True |
| self.steps_in_current_room = 0 |
| print(f"🚀 Mouvement détecté vers : {current_loc_name} (ID: {current_loc_id})") |
| self.last_room_id = current_loc_id |
| self.current_location = current_loc_name |
| else : |
| self.steps_in_current_room += 1 |
|
|
| world_changed = False |
| if current_world_hash != self.last_world_hash: |
| world_changed = True |
| print(f"🔍 Le monde a changé (objet déplacé/modifié)") |
| self.last_world_hash = current_world_hash |
| |
| |
| map_result = await client.call_tool("get_map", {}) |
| current_map = self._extract_result(map_result) |
|
|
| |
| structured_data = self.extractor.extract( |
| raw_text=observation, |
| seed=seed + step, |
| ram_data=loc_dict, |
| last_location=self.current_location |
| ) |
|
|
| print("structured_data run loop ", structured_data) |
|
|
| structured_data["location_id"] = current_loc_id |
| structured_data["location_name"] = current_loc_name |
| structured_data["is_new_location"] = is_new_room |
| structured_data["world_changed"] = world_changed |
|
|
| should_refresh_actions = is_new_room or world_changed |
|
|
| if should_refresh_actions: |
| cheat_result = await client.call_tool("get_valid_actions_cheat", {}) |
| new_actions = self._extract_result(cheat_result) |
| |
| if is_new_room: |
| self.current_room_actions = set() |
| |
| if isinstance(new_actions, list): |
| self.current_room_actions.update(new_actions) |
| elif isinstance(new_actions, str): |
| self.current_room_actions.update([a.strip() for a in new_actions.split(',')]) |
|
|
| structured_data["cheat_actions"] = list(self.current_room_actions) |
|
|
| |
| |
| priority_guidance = self._build_priority_guidance(structured_data) |
| print("priority guidance",priority_guidance) |
| print("fin PRIORITY") |
|
|
| except Exception as e: |
| print(f"⚠️ Sensory Error: {e}") |
| structured_data = {} |
| current_inv = "Unknown" |
| current_map = "Unknown" |
| |
| is_urgent = False |
|
|
| enriched_actions = self._generate_enriched_actions(structured_data) |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| is_dead = self._is_game_over(observation) |
| time_to_learn = (step - self.last_learning_step >= self.learning_interval) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| if is_urgent: |
| if verbose: |
| print("\n🔥 [MODE URGENCE ACTIVÉ] Le cochon court / Combat en cours !") |
| |
| |
| rich_context = f""" |
| !!! URGENT SITUATION - FOCUS ON IMMEDIATE ACTION !!! |
| SITUATION : {observation} |
| PRIORITY GUIDANCE : {priority_guidance if priority_guidance else "- [STATUS] Standard exploration."} |
| POSSIBLE ACTIONS: {enriched_actions} |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| else: |
| |
| |
| rich_context = f""" |
| CURRENT SITUATION : {observation} |
| INVENTORY : {current_inv} |
| PRIORITY GUIDANCE : {priority_guidance if priority_guidance else "- [STATUS] Standard exploration."} |
| POSSIBLE ACTIONS: {enriched_actions} |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if verbose : |
| print(f"Context {rich_context}") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| prompt = self._build_prompt(rich_context,knowledge="") |
|
|
| max_retries = 3 |
| |
| final_tool_name = "play_action" |
| final_tool_args = {"action": "look"} |
| final_thought = "No thought generated" |
|
|
| for attempt in range(max_retries): |
| |
| response = call_llm(prompt, SYSTEM_PROMPT, seed + step + attempt) |
| |
| |
| thought, tool_name, tool_args = self._parse_response(response, tool_names) |
| final_thought = thought |
|
|
| final_tool_name = tool_name |
| final_tool_args = tool_args |
|
|
| |
| tool_name, tool_args = self._validate_tool_call(tool_name, tool_args, tool_names) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| tool_name = final_tool_name |
| tool_args = final_tool_args |
| thought = final_thought |
|
|
| if verbose: |
| print(f"\n--- Step {step} ---") |
| print(f"[THOUGHT] {thought}") |
| print(f"[TOOL] {tool_name}({tool_args})") |
|
|
| |
| proposed_action = "tool_use" |
| if tool_name == "play_action": |
| proposed_action = tool_args.get("action", "look") |
| |
| self.recent_actions.append(proposed_action) |
| if len(self.recent_actions) > 5: |
| self.recent_actions = self.recent_actions[-5:] |
| |
| if len(self.recent_actions) >= 3 and len(set(self.recent_actions[-3:])) == 1: |
| |
| import random |
|
|
| all_raw_actions = [] |
| if isinstance(enriched_actions, str): |
| all_raw_actions = [line.strip("- ").strip() for line in enriched_actions.split('\n') if "-" in line] |
| |
| move_keywords = ["north", "south", "east", "west", "ne", "nw", "se", "sw", "up", "down", "in", "out", "go "] |
| valid_moves = [ |
| a for a in all_raw_actions |
| if any(k == a or a.startswith("go ") for k in move_keywords) or a == "wait" |
| ] |
| |
| if valid_moves: |
| |
| last_action = self.recent_actions[-1] |
| choices = [m for m in valid_moves if m != last_action] |
| forced_move = random.choice(choices if choices else valid_moves) |
| |
| tool_args = {"action": forced_move} |
| proposed_action = forced_move |
| |
| if verbose: |
| print(f"🔄 [LOOP BREAK] Agent stuck on '{last_action}'. Forcing move to: {forced_move}") |
| else: |
| |
| tool_args = {"action": "wait"} |
| proposed_action = "wait" |
| |
| moves += 1 |
|
|
|
|
| |
| try: |
| result = await client.call_tool(tool_name, tool_args) |
| new_observation = self._extract_result(result) |
|
|
|
|
| full_action_key = tool_args.get("action", tool_name) |
|
|
| if self.current_location not in self.location_action_memory: |
| self.location_action_memory[self.current_location] = [] |
| |
| if tool_name : |
| summary = self._clean_memory_result(new_observation) |
| |
| action_entry = { |
| "action": full_action_key, |
| "result": summary, |
| "step": getattr(self, 'step_count', 0) |
| } |
| self.location_action_memory[self.current_location].append(action_entry) |
|
|
| if tool_name == "play_action": |
| loc_res = await client.call_tool("get_location_info", {}) |
| new_loc_dict = self._extract_result(loc_res) |
|
|
| print("new loc dict",new_loc_dict) |
|
|
| if isinstance(new_loc_dict, str): |
| import json |
| try: |
| new_loc_dict = json.loads(new_loc_dict) |
| except: |
| new_loc_dict = {} |
| else: |
| new_loc_dict = new_loc_dict |
|
|
| print("new loc dict",new_loc_dict) |
| new_id = str(new_loc_dict["location"].get("id")) |
| |
| action_text = tool_args.get("action", "") |
| direction_attempted = self.world_mapper._extract_direction(action_text) |
|
|
| if direction_attempted and new_id == str(self.last_room_id): |
| reason = self._clean_memory_result(new_observation) |
| self.world_mapper.mark_blocked_exit(new_id, direction_attempted, reason) |
| print(f"🚫 BLOCAGE : {direction_attempted} n'a pas fonctionné.") |
| else: |
| print("direction attempted",direction_attempted,action_text) |
| new_structured = self.extractor.extract( |
| raw_text=new_observation, |
| seed=seed, |
| ram_data=new_loc_dict, |
| last_location=self.current_location |
| ) |
| self.world_mapper.update_map(new_structured, action_text, new_observation) |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| observation = new_observation |
| |
| if verbose: |
| print(f"[RESULT] {observation}...") |
|
|
| except Exception as e: |
| observation = f"Error: {e}" |
| if verbose: |
| print(f"[ERROR] {e}") |
| |
| |
| |
| |
| |
| self.history.append({ |
| "step": step, |
| "thought": thought, |
| "tool": tool_name, |
| "args": tool_args, |
| "result": observation[:200] |
| }) |
| if len(self.history) > 10: |
| self.history = self.history[-10:] |
| |
| self._update_score(observation) |
| |
| |
| history.append((thought, f"{tool_name}({tool_args})", observation[:100])) |
| |
| if self._is_game_over(observation): |
| if verbose: |
| print("\n*** GAME OVER ***") |
|
|
| |
| |
| |
| |
| break |
| |
| return RunResult( |
| final_score=self.score, |
| max_score=350, |
| moves=moves, |
| locations_visited=locations_visited, |
| game_completed=self._is_game_over(observation), |
| history=history, |
| ) |
|
|
|
|
| def _clean_memory_result(self, text: str) -> str: |
| """Nettoie le résultat pour la mémoire : une seule ligne, max 80 chars.""" |
| if not text: return "" |
| |
| clean = text.replace('\n', ' ').strip() |
| |
| clean = " ".join(clean.split()) |
| return clean |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def _generate_enriched_actions(self, structured_data: dict) -> str: |
| """ |
| Génère un kit d'action utilisant les noms traduits (friendly) |
| ou signalant les noms techniques si nécessaire. |
| """ |
| ram_objects = structured_data.get("raw_ram_objects", []) |
| mapping = structured_data.get("name_translation", {}) |
| inventory_raw = structured_data.get("inventory", []) |
| inventory = [obj["name"] for obj in inventory_raw] |
| |
| kit = ["### 🛠️ ACTION CONSTRUCTION KIT"] |
|
|
| cheat_actions = structured_data.get("cheat_actions", []) |
| if cheat_actions: |
| |
| kit.append(f"**DIRECT ACTIONS (Proven Valid)**: {', '.join(cheat_actions)}") |
| kit.append(f"ABOVE ACTIONS ARE VALID BUT CAN BE DUMB") |
|
|
| |
| verbs = ["examine", "take", "drop", "look", "inventory", "wait", "listen", "search"] |
| |
| |
| |
| all_context = (structured_data.get("description_summary", "") + " ".join(ram_objects)).lower() |
| |
| if any(x in all_context for x in ["door", "gate", "mailbox", "chest", "box", "case", "window"]): |
| verbs += ["open", "close", "unlock", "lock"] |
| |
| if any(x in all_context or "torch" in str(inventory).lower() for x in ["lamp", "torch", "switch", "device"]): |
| verbs += ["turn on", "turn off", "light", "extinguish"] |
|
|
| kit.append(f"**CORE VERBS**: {', '.join(sorted(set(verbs)))}") |
|
|
| |
| visible_ram = [o for o in ram_objects if "(missing" not in o.lower()] |
| hidden_ram = [o for o in ram_objects if "(missing" in o.lower()] |
| |
| if visible_ram: |
| formatted_objects = [] |
| for tech_name in visible_ram: |
| |
| if tech_name in mapping: |
| friendly_name = mapping[tech_name] |
| formatted_objects.append(f"{friendly_name.upper()}") |
| else: |
| |
| formatted_objects.append(f"{tech_name} [! technical name] map to possible real and simple words") |
| |
| kit.append(f"**OBJECTS AROUND YOU**: {', '.join(formatted_objects)}") |
| kit.append("_Note: Use names exactly as shown above._") |
|
|
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| return "\n".join(kit) |
|
|
| def classify_interaction(self, action: str, result: str,observation: str,inventory : str, name_mapping : dict, possible_actions : str) -> dict: |
| """ |
| Demande au LLM si l'action a échoué et pourquoi. |
| """ |
| valid_names = ", ".join([f"'{friendly}' (ID: {tech})" for tech, friendly in name_mapping.items()]) |
|
|
| prompt = f""" |
| Analyze this text adventure interaction: |
| Current Observation : {observation} |
| Inventory : {inventory} |
| valid objects to use: {valid_names} |
| ACTION: "{action}" |
| RESULT: "{result}" |
| |
| Here is possible actions to take {possible_actions} |
| |
| Is this interaction a FAILURE (e.g., object not found, too dark, locked, invalid name, generic response)? |
| If it's a failure, provide a short reason and a suggestion. |
| |
| Return ONLY JSON: |
| {{ |
| "is_failure": true/false, |
| "reason": "short explanation", |
| "suggestion": "what to try instead" |
| }} |
| """ |
| |
| response = call_llm(prompt, "You are a Game Logic Analyzer.", seed=42) |
| try: |
| import json |
| return json.loads(response) |
| except: |
| return {"is_failure": False, "reason": "", "suggestion": ""} |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
|
|
| def _build_priority_guidance(self, structured_data: dict) -> str: |
| guidance = [] |
| loc_name = self.current_location |
| loc_id = str(structured_data.get("location_id", self.last_room_id or -1)) |
| mapping = structured_data.get("name_translation", {}) |
| past_actions = self.location_action_memory.get(loc_name, []) |
| |
| |
| current_inventory = str(structured_data.get("inventory", "Unknown")) |
| current_obs = structured_data.get("description_summary", "No description") |
| enriched_actions = self._generate_enriched_actions(structured_data) |
|
|
|
|
| if self.steps_in_current_room >= MAX_STEPS_ALLOWED: |
| guidance.append("\n### 🚀 URGENT STRATEGIC DIRECTIVE:") |
| guidance.append(f"- [STAGNATION ALERT] {self.steps_in_current_room} turns in this room.") |
| |
| known_room = self.world_mapper.graph.get(loc_id, {}) |
| potential = known_room.get("potential_exits", []) |
| already_linked = known_room.get("exits", {}) |
| |
| new_paths = [p for p in potential if p not in already_linked] |
| |
| |
| if new_paths: |
| guidance.append(f"- [ACTION] Move to a NEW area. Priority: {', '.join(new_paths).upper()}") |
| |
| |
| elif already_linked: |
| |
| guidance.append("- [ACTION] Room exhausted. Backtrack or move to a known adjacent room.") |
| directions_list = [d.upper() for d in already_linked.keys()] |
| guidance.append(f"- [HINT] Known exits: {', '.join(directions_list)}") |
| |
| |
| else: |
| guidance.append("- [CRITICAL] No exits found in memory or observation.") |
| guidance.append("- [ACTION] Use 'SEARCH', 'LISTEN', or 'EXAMINE' on the scenery to find hidden passages.") |
| guidance.append("- [HINT] Try common directions anyway: NORTH, SOUTH, EAST, WEST,NORTHEAST,NORTHWEST,SOUTHEAST,SOUTHWEST, UP, DOWN.") |
|
|
|
|
| |
| |
| |
| |
| if self.recent_actions and past_actions: |
| last_action_taken = self.recent_actions[-1] |
| last_entry = past_actions[-1] |
| |
| guidance.append("### ⚡ LAST ACTION FEEDBACK:") |
| guidance.append(f"- Command: '{last_action_taken}'") |
| |
| |
| immediate_analysis = self.classify_interaction( |
| action=last_entry['action'], |
| result=last_entry['result'], |
| observation=current_obs, |
| inventory=current_inventory, |
| name_mapping=mapping, |
| possible_actions=enriched_actions |
| ) |
|
|
| if immediate_analysis.get("is_failure"): |
| guidance.append(f"- [!] STATUS: FAILURE") |
| guidance.append(f"- [!] REASON: {immediate_analysis['reason']}") |
| guidance.append(f"- [!] SUGGESTION: {immediate_analysis['suggestion']}") |
| elif structured_data.get("world_changed"): |
| guidance.append("- [!] STATUS: SUCCESS (World state updated)") |
| else: |
| guidance.append(f"- [!] STATUS: NEUTRAL / INFO: {last_entry.get('result', '')}") |
| |
| guidance.append("IF ANOTHER ENTITIES IS INTERESTED BY AN ITEM IT'S A HINT. EX : pig climb fountain -> fountain must be an important object to examine or search") |
|
|
| |
| |
| |
| |
| strategic_summary = self.world_mapper.generate_summary(loc_id) |
| guidance.append(strategic_summary) |
| guidance.append("") |
|
|
| |
| |
| |
| guidance.append("### 🔍 LOCAL OBJECT SCAN (RAM):") |
| objects_in_room = structured_data.get("objects_in_room", []) |
| unexplored = [] |
| failed_attempts = [] |
| known_success = [] |
|
|
| for obj_data in objects_in_room: |
| if not isinstance(obj_data, dict): continue |
| raw_name = obj_data.get("name", "Unknown") |
| has_hidden = obj_data.get("contains_count", 0) > 0 |
| |
| friendly_name = mapping.get(raw_name, "").upper() |
| present = True |
| if friendly_name == "": |
| present = False |
| |
| |
| last_obj_entry = None |
| for entry in reversed(past_actions): |
| act = entry.get("action", "").lower() |
| found_raw = raw_name and raw_name.lower() in act |
| found_friendly = friendly_name and friendly_name.lower() in act |
|
|
| if found_raw or found_friendly: |
| last_obj_entry = entry |
| break |
|
|
| |
| contents = obj_data.get("contents", []) |
| content_str = f" [Contains: {', '.join([mapping.get(c['name'], c['name']).upper() for c in contents])}]" if contents else "" |
| if contents: |
| child_names = [c.get("name", "Unknown").upper() for c in contents] |
| content_str = f" [Contains: {', '.join(child_names)}]" |
|
|
| if present: |
| display_line = f"- {friendly_name}{content_str}" |
| |
| if not present : |
| display_line = f"- {raw_name}{content_str} [!] Technical Name use the real name or a full word. Ex : fountabowl -> bowl, brokstair -> stairs ... CHECK YOUR HISTORY AND OBSERVATION" |
|
|
| if last_obj_entry: |
| last_act_text = last_obj_entry['action'] |
| last_res_text = last_obj_entry['result'] |
| |
| |
| analysis = self.classify_interaction( |
| action=last_obj_entry['action'], |
| result=last_obj_entry['result'], |
| observation=current_obs, |
| inventory=current_inventory, |
| name_mapping=mapping, |
| possible_actions=self._generate_enriched_actions(structured_data) |
| ) |
| |
| if analysis.get("is_failure"): |
| guidance.append(f"{display_line}\n ❌ Last tried: '{last_act_text}' -> {analysis['reason']}") |
| guidance.append(f" 💡 Suggestion: {analysis['suggestion']}") |
| else: |
| guidance.append(f"{display_line} | ✅ Last: '{last_act_text}' (Success) {last_act_text}") |
| else: |
| |
| if present or contents : |
| guidance.append(f"{display_line} | ✨ UNEXPLORED") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if past_actions: |
| guidance.append(f"\n### 🛑 ROOM HISTORY (Last steps):") |
| for entry in past_actions[-10:]: |
| guidance.append(f"- '{entry.get('action')}' -> {entry.get('result')}...") |
| |
|
|
| return "\n".join(guidance) |
|
|
| def _build_prompt(self, observation: str,knowledge:str) -> str: |
| """Build the prompt for the LLM with context.""" |
| parts = [] |
|
|
| |
| |
| |
|
|
| parts.append(f"Current Score: {self.score}") |
| |
| |
| if self.history: |
| parts.append("\nRecent actions:") |
| for entry in self.history[-3:]: |
| action = entry.get("args", {}).get("action", entry["tool"]) |
| result_short = entry["result"][:80] + "..." if len(entry["result"]) > 80 else entry["result"] |
| parts.append(f" > {action} -> {result_short}") |
| |
| |
| if self.recent_actions and len(set(self.recent_actions[-3:])) == 1: |
| parts.append(f"\n[WARNING: You've been doing '{self.recent_actions[-1]}' repeatedly. TRY SOMETHING DIFFERENT!]") |
| |
| |
| |
| |
| |
|
|
| parts.append(f"\nCURRENT SITUATION:\n{observation}") |
| parts.append("\nWhat is your next specific command?") |
| |
| return "\n".join(parts) |
| |
| def _parse_response(self, response: str, valid_tools: list[str]) -> tuple[str, str, dict]: |
| """Parse the LLM response to extract thought, tool, and arguments.""" |
| thought = "No reasoning provided" |
| tool_name = "play_action" |
| tool_args = {"action": "look"} |
| |
| lines = response.strip().split("\n") |
| |
| for line in lines: |
| line_clean = line.strip() |
| line_upper = line_clean.upper() |
| |
| if line_upper.startswith("THOUGHT:"): |
| thought = line_clean.split(":", 1)[1].strip() |
| |
| elif line_upper.startswith("TOOL:"): |
| raw_tool = line_clean.split(":", 1)[1].strip().lower() |
| raw_tool = raw_tool.replace("**", "").replace("*", "").replace("`", "") |
| raw_tool = raw_tool.split()[0] if raw_tool else "play_action" |
| tool_name = raw_tool |
| |
| elif line_upper.startswith("ARGS:"): |
| args_part = line_clean.split(":", 1)[1].strip() |
| try: |
| args_part = args_part.replace("'", '"') |
| tool_args = json.loads(args_part) |
| except json.JSONDecodeError: |
| match = re.search(r'"action"\s*:\s*"([^"]+)"', args_part) |
| if match: |
| tool_args = {"action": match.group(1)} |
| else: |
| tool_args = {"action": "look"} |
| |
| return thought, tool_name, tool_args |
| |
| def _validate_tool_call(self, tool_name: str, tool_args: dict, valid_tools: list[str]) -> tuple[str, dict]: |
| """Validate and fix common tool call issues.""" |
| |
| if tool_name not in valid_tools: |
| if tool_name in ["action", "do", "command"]: |
| tool_name = "play_action" |
| elif tool_name in ["map", "location"]: |
| tool_name = "get_map" |
| elif tool_name in ["mem", "state", "status"]: |
| tool_name = "memory" |
| elif tool_name in ["inv", "items"]: |
| tool_name = "inventory" |
| else: |
| tool_name = "play_action" |
| |
| |
| if tool_name == "play_action": |
| action = str(tool_args.get("action", "look")).lower().strip() |
| direction = tool_args.get("direction") |
|
|
| |
| if direction and str(direction).lower() not in action: |
| action = f"{action} {direction}" |
|
|
| nav_map = { |
| "north": "n", "south": "s", "east": "e", "west": "w", |
| "northeast": "ne", "northwest": "nw", |
| "southeast": "se", "southwest": "sw", |
| "up": "u", "down": "d" |
| } |
|
|
| invalid_verb_map = { |
| "check": "examine", |
| "inspect": "examine", |
| "search": "look", |
| "grab": "take", |
| "pick": "take", |
| "use": "examine", |
| "investigate": "examine", |
| } |
| |
| if action.startswith("go "): |
| action = action.replace("go ", "").strip() |
|
|
| words = action.split() |
| if words: |
| if words[0] in invalid_verb_map: |
| words[0] = invalid_verb_map[words[0]] |
| action = " ".join(words) |
|
|
| if words[0] == "examine": |
| words = words[:2] |
| action = " ".join(words) |
| else: |
| action = " ".join(words) |
| |
| if action in nav_map: |
| action = nav_map[action] |
| |
| action = action.replace("**", "").replace("*", "").replace("`", "") |
| action = " ".join(action.split()) |
| |
| return tool_name, {"action": action} |
| |
| return tool_name, tool_args |
| |
| def _extract_result(self, result) -> str: |
| """Extract text from MCP tool result.""" |
| if hasattr(result, 'content') and result.content: |
| return result.content[0].text |
| if isinstance(result, list) and result: |
| return result[0].text if hasattr(result[0], 'text') else str(result[0]) |
| return str(result) |
| |
| def _update_score(self, text: str) -> None: |
| """Update score from game text.""" |
| patterns = [ |
| r'Score:\s*(\d+)', |
| r'score[:\s]+(\d+)', |
| r'\[Score:\s*(\d+)', |
| ] |
| |
| for pattern in patterns: |
| match = re.search(pattern, text, re.IGNORECASE) |
| if match: |
| self.score = max(self.score, int(match.group(1))) |
| |
| def _is_game_over(self, text: str) -> bool: |
| """Check if the game is over.""" |
| game_over_phrases = [ |
| "game over", |
| "you have died", |
| "you are dead", |
| "*** you have died ***", |
| ] |
| text_lower = text.lower() |
| return any(phrase in text_lower for phrase in game_over_phrases) |
|
|
|
|
| |
| |
| |
|
|
| async def test_agent(): |
| """Test the agent locally.""" |
| from fastmcp import Client |
| |
| agent = StudentAgent() |
| |
| async with Client("mcp_server.py") as client: |
| result = await agent.run( |
| client=client, |
| game="zork1", |
| max_steps=20, |
| seed=42, |
| verbose=True, |
| ) |
| |
| print(f"\n{'=' * 50}") |
| print(f"Final Score: {result.final_score}") |
| print(f"Moves: {result.moves}") |
| print(f"Locations: {len(result.locations_visited)}") |
|
|
|
|
| class StrategyModule: |
| """ |
| Gère la planification haut niveau (The 'Brain'). |
| """ |
| def __init__(self): |
| self.current_plan = None |
| self.last_update_step = 0 |
| |
| def generate_plan(self, observation: str, history: list, step: int,knowledge:str) -> dict: |
| """Génère ou met à jour le plan stratégique.""" |
| |
| |
| history_summary = "\n".join( |
| [f"- {h['thought']} -> {h['result'][:50]}..." for h in history[-5:]] |
| ) |
| |
| prompt = f""" |
| CURRENT SITUATION: |
| {observation} |
| |
| RECENT HISTORY: |
| {history_summary} |
| |
| ACQUIRED KNOWLEDGE (Tips & Rules from previous games): |
| {knowledge if knowledge else "No prior knowledge available."} |
| |
| TASK: |
| Based on the Situation and Knowledge, create a strategic plan. |
| If the Knowledge says "Trolls fear swords", and you see a Troll, your plan must be "Find sword". |
| """ |
| |
| |
| response = call_llm(prompt, PLANNER_SYSTEM_PROMPT, seed=step, max_tokens=400) |
| |
| try: |
| |
| json_str = response.strip() |
| if "```json" in json_str: |
| json_str = json_str.split("```json")[1].split("```")[0] |
| elif "```" in json_str: |
| json_str = json_str.split("```")[1].split("```")[0] |
| |
| self.current_plan = json.loads(json_str) |
| self.last_update_step = step |
| return self.current_plan |
| except Exception as e: |
| print(f"[Strategy Error] Failed to parse plan: {e}") |
| return None |
|
|
| def get_strategy_context(self) -> str: |
| """Renvoie une chaîne de texte à injecter dans le prompt de l'Acteur.""" |
| if not self.current_plan: |
| return "NO ACTIVE PLAN. Explore cautiously." |
| |
| return f""" |
| *** STRATEGIC GUIDANCE *** |
| CURRENT OBJECTIVE: {self.current_plan.get('current_objective', 'Unknown')} |
| STRATEGIC REASONING: {self.current_plan.get('reasoning', 'None')} |
| STEPS TO TAKE: |
| {chr(10).join(['- ' + s for s in self.current_plan.get('suggested_steps', [])])} |
| """ |
|
|
| |
|
|
| @dataclass |
| class CriticResponse: |
| score: float |
| justification: str |
| is_fatal: bool = False |
|
|
| class ActionHistoryTracker: |
| """ |
| Detect loop and critics |
| """ |
| def __init__(self): |
| self.recent_actions: List[str] = [] |
| self.location_history: List[str] = [] |
| self.failed_actions_per_location: Dict[str, Set[str]] = {} |
| |
| def update(self, action: str, location: str, result: str): |
| self.recent_actions.append(action) |
| self.location_history.append(location) |
| |
| |
| failure_keywords = ["can't", "don't", "nothing happens", "impossible", "failed", "no such"] |
| if any(k in result.lower() for k in failure_keywords): |
| if location not in self.failed_actions_per_location: |
| self.failed_actions_per_location[location] = set() |
| self.failed_actions_per_location[location].add(action) |
| |
| |
| if len(self.recent_actions) > 20: |
| self.recent_actions.pop(0) |
| self.location_history.pop(0) |
|
|
| def is_looping(self, proposed_action: str) -> bool: |
| """Détecte les répétitions immédiates.""" |
| if len(self.recent_actions) >= 3: |
| |
| if all(a == proposed_action for a in self.recent_actions[-3:]): |
| return True |
| return False |
|
|
| def is_known_failure(self, proposed_action: str, current_location: str) -> bool: |
| """Vérifie si cette action a déjà échoué ici.""" |
| if current_location in self.failed_actions_per_location: |
| if proposed_action in self.failed_actions_per_location[current_location]: |
| return True |
| return False |
|
|
| class CriticAgent: |
| """ |
| Le module critique qui valide les actions avant exécution. |
| """ |
| def __init__(self,call_llm_func, verbose: bool = False): |
| self.history_tracker = ActionHistoryTracker() |
| self.verbose = verbose |
| self.call_llm = call_llm_func |
|
|
| def check_heuristics(self, action: str, current_location: str,valid_exits:list[str]) -> tuple[bool, str]: |
| """ |
| Vérification rapide basée sur des règles (Pas de LLM). |
| Retourne (Est_Valid, Raison). |
| """ |
| |
| if not action or len(action.strip()) < 2: |
| return False, "Action too short or empty" |
|
|
| |
| if self.history_tracker.is_looping(action): |
| return False, "Detected infinite loop (action repeated too many times)" |
| |
| |
| if self.history_tracker.is_known_failure(action, current_location): |
| return False, f"Action '{action}' previously failed in this location" |
| |
| directions = ["north", "south", "east", "west", "up", "down", |
| "n", "s", "e", "w", "u", "d", "ne", "nw", "se", "sw"] |
| |
| action_word = action.lower().strip() |
|
|
| if action_word in directions and valid_exits: |
| |
| is_possible = False |
| for exit_name in valid_exits: |
| if action_word in exit_name.lower() or exit_name.lower().startswith(action_word): |
| is_possible = True |
| break |
| |
| if not is_possible: |
| return False, f"You can't go '{action_word}'. Visible exits are: {valid_exits}" |
|
|
| return True, "Heuristics passed" |
|
|
| return True, "Heuristics passed" |
|
|
| def evaluate_with_llm(self, action: str, observation: str, inventory: str, seed: int) -> CriticResponse: |
| """ |
| Évaluation sémantique lente via LLM. |
| """ |
| |
| prompt = f""" |
| OBSERVATION: |
| {observation[:1000]}... |
| |
| INVENTORY: |
| {inventory} |
| |
| PROPOSED ACTION: |
| {action} |
| |
| Evaluate this action. |
| """ |
| |
| try: |
| response_text = self.call_llm(prompt, CRITIC_SYSTEM_PROMPT, seed=seed, max_tokens=150) |
| |
| |
| json_str = response_text.strip() |
| if "```json" in json_str: |
| json_str = json_str.split("```json")[1].split("```")[0] |
| elif "```" in json_str: |
| json_str = json_str.split("```")[1].split("```")[0] |
| |
| data = json.loads(json_str) |
| return CriticResponse( |
| score=float(data.get("score", 0.5)), |
| justification=data.get("justification", "No reason provided"), |
| is_fatal=data.get("is_fatal", False) |
| ) |
| |
| except Exception as e: |
| if self.verbose: |
| print(f"[Critic Error] LLM validation failed: {e}") |
| |
| return CriticResponse(score=1.0, justification="Validation failed, allowing action") |
|
|
| def critique_action(self, action: str, observation: str, inventory: str, current_location: str, seed: int,valid_exits: list[str] = None) -> bool: |
| """ |
| Méthode principale à appeler depuis l'agent. |
| Retourne True si l'action est acceptée, False sinon. |
| """ |
| |
| is_valid, reason = self.check_heuristics(action, current_location,valid_exits) |
| if not is_valid: |
| if self.verbose: |
| print(f"🛑 [CRITIC REJECT - RULE] {reason}") |
| return False |
|
|
| |
| |
| simple_moves = ["north", "south", "east", "west", "up", "down", "look", "inventory"] |
| if action.lower() in simple_moves: |
| return True |
|
|
| |
| evaluation = self.evaluate_with_llm(action, observation, inventory, seed) |
| |
| if evaluation.score < 0.4 or evaluation.is_fatal: |
| if self.verbose: |
| print(f"🛑 [CRITIC REJECT - LLM] Score: {evaluation.score} | Reason: {evaluation.justification}") |
| return False |
| |
| return True |
|
|
| def record_result(self, action: str, current_location: str, result_text: str): |
| """Met à jour la mémoire du critique après l'exécution.""" |
| self.history_tracker.update(action, current_location, result_text) |
|
|
|
|
| |
|
|
| class StructuredObservation(BaseModel): |
| location_id: int |
| location_name: str |
| is_new_location: bool |
| world_changed: bool |
| description_summary: str |
| takeable_objects: List[str] |
| visible_exits: List[str] |
| interactable_features: List[str] |
| puzzle_clues: List[str] |
| entities: List[str] |
| in_combat: bool |
| raw_ram_objects : List[str] |
| name_translation: Dict[str, str] |
|
|
|
|
| class ObservationExtractor: |
| def __init__(self, call_llm_func): |
| self.call_llm = call_llm_func |
|
|
| def extract(self, raw_text: str, seed: int, ram_data: dict, last_location: str = "Unknown") -> dict: |
| """ |
| Convertit le texte brut et les données RAM structurées en dictionnaire. |
| """ |
| inventory_raw = ram_data.get("inventory", []) |
| inventory_names = [item["name"] for item in inventory_raw] |
|
|
| |
| location_info = ram_data.get("location", {}) |
| curr_id = location_info.get("id", -1) |
| curr_name = location_info.get("name", last_location) |
| |
| |
| |
| detected_objs_raw_all = ram_data.get("detected_objects", []) |
| inventory_names_l = [item["name"].lower() for item in ram_data.get("inventory", [])] |
|
|
| detected_objs_raw = [] |
| for obj in detected_objs_raw_all: |
| name_l = obj["name"].lower() |
| content_names_l = [c["name"].lower() for c in obj.get("contents", [])] |
| |
| |
| is_player = ( |
| name_l in ["inconnu", "self", "player", "me", "grunk"] or |
| (content_names_l and any(inv_item in content_names_l for inv_item in inventory_names_l)) |
| ) |
| |
| if not is_player: |
| detected_objs_raw.append(obj) |
|
|
| |
| inventory_data = ram_data.get("inventory", []) |
| inv_names_lower = [str(item.get("name", "")).lower() for item in inventory_data] |
|
|
| objects_in_ram = [] |
| tree_view = [] |
|
|
| for parent in detected_objs_raw: |
| p_name = parent["name"] |
| p_contents = parent.get("contents", []) |
| p_content_names = [c["name"] for c in p_contents] |
|
|
| |
| p_name_l = p_name.lower() |
| p_content_names_l = [n.lower() for n in p_content_names] |
|
|
| |
| is_player_container = ( |
| p_name_l in ["inconnu", "self", "player", "me", "inventory", "grunk"] or |
| |
| any(inv_item in p_content_names_l for inv_item in inv_names_lower) or |
| |
| p_name_l in inv_names_lower |
| ) |
|
|
| if is_player_container: |
| |
| continue |
| |
| |
| objects_in_ram.append(p_name) |
| |
| if p_contents: |
| c_names = [c["name"] for c in p_contents] |
| objects_in_ram.extend(c_names) |
| tree_view.append(f"- {p_name} (contains: {', '.join(c_names)})") |
| else: |
| tree_view.append(f"- {p_name}") |
| |
|
|
| |
| |
| prompt = f""" |
| RAW GAME TEXT: |
| {raw_text} |
| |
| --- RAM DATA (TECHNICAL TRUTH) --- |
| CURRENT LOCATION ID: {curr_id} |
| OBJECTS DETECTED IN RAM: {", ".join(objects_in_ram)} |
| OBJECTS DETECTED (HIERARCHY): |
| {chr(10).join(tree_view)} |
| |
| |
| JSON SCHEMA: |
| Follow the StructuredObservation model. |
| """ |
|
|
| try: |
| |
| response = self.call_llm( |
| prompt, |
| EXTRACTOR_SYSTEM_PROMPT, |
| seed=seed |
| ) |
| |
| json_match = re.search(r'\{.*\}', response, re.DOTALL) |
| data = json.loads(json_match.group(0)) if json_match else json.loads(response) |
|
|
| data["location_id"] = curr_id |
| data["location_name"] = curr_name |
| data["raw_ram_objects"] = objects_in_ram |
| |
| if not data.get("location_name") or data.get("location_name") == "Unknown": |
| data["location_name"] = curr_name |
| |
| raw_data_mapping = data.get("name_translation", {}) |
| valid_mapping = {} |
| lower_text = raw_text.lower() |
| for tech_name, friendly_name in raw_data_mapping.items(): |
| if tech_name in objects_in_ram: |
| if friendly_name.lower() in lower_text: |
| valid_mapping[tech_name] = friendly_name |
| else: |
| pass |
|
|
| parent_names = [item["name"] for item in detected_objs_raw] |
| |
| data["takeable_objects"] = [o for o in data.get("takeable_objects", []) if o in parent_names] |
| data["interactable_features"] = [o for o in data.get("interactable_features", []) if o in parent_names] |
| data["objects_in_room"] = detected_objs_raw |
| data["name_translation"] = valid_mapping |
|
|
| |
| secrets = [o for o in objects_in_ram if "(missing" in o.lower()] |
| if secrets: |
| if "puzzle_clues" not in data: data["puzzle_clues"] = [] |
| data["puzzle_clues"].append(f"RAM Alert: {len(secrets)} hidden object(s) detected. Search the area.") |
|
|
| return data |
|
|
| except Exception as e: |
| print(f"[Extractor Error] {e}") |
|
|
| manual_exits = [] |
| lower_text = raw_text.lower() |
| for d in VALID_DIRECTIONS: |
| |
| if re.search(rf"\b{d}\b", lower_text): |
| manual_exits.append(d) |
| |
| return { |
| "location_id": curr_id, |
| "location_name": curr_name, |
| "description_summary": raw_text, |
| "raw_ram_objects": objects_in_ram, |
| "objects_in_room" : detected_objs_raw, |
| "visible_exits": manual_exits, |
| "name_translation": {}, |
| "takeable_objects": [], |
| "puzzle_clues": ["Erreur d'extraction LLM."] |
| } |
|
|
| |
|
|
| class SectionUtils: |
| """ |
| Utilitaires de nos fonctions |
| """ |
| @staticmethod |
| def extract_section_content(content: str, section_name: str) -> str: |
| if not content: return "" |
| pattern = rf"## {re.escape(section_name)}(.*?)(?=\n## |$)" |
| match = re.search(pattern, content, re.DOTALL) |
| return match.group(1).strip() if match else "" |
|
|
| @staticmethod |
| def update_section_content(content: str, section_name: str, new_content: str) -> str: |
| if not content: content = "# Zork Strategic Knowledge Base\n\n" |
| section_header = f"## {section_name}" |
| pattern = rf"## {re.escape(section_name)}(.*?)(?=\n## |$)" |
| match = re.search(pattern, content, re.DOTALL) |
| |
| full_new_section = f"{section_header}\n\n{new_content}\n" |
| |
| if match: |
| return content.replace(match.group(0), full_new_section, 1) |
| else: |
| return f"{content}\n\n{full_new_section}\n" |
|
|
| @staticmethod |
| def extract_cross_episode_section(content: str) -> str: |
| """Extrait la section 'Wisdom' qui doit persister entre les parties.""" |
| return SectionUtils.extract_section_content(content, "CROSS-EPISODE INSIGHTS") |
|
|
| VALID_DIRECTIONS = { |
| "n", "s", "e", "w", "ne", "nw", "se", "sw", "u", "d", "in", "out", |
| "north", "south", "east", "west", "northeast", "northwest", |
| "southeast", "southwest", "up", "down" |
| } |
|
|
| class WorldMapper: |
| def __init__(self): |
| self.graph = {} |
| self.last_room_id = None |
| self.last_direction = None |
|
|
| def update_map(self, structured_data: dict, last_action: str, observation: str): |
| curr_id = str(structured_data.get("location_id")) |
| curr_name = structured_data.get("location_name") |
| new_exits = structured_data.get("visible_exits", []) |
| clean_directions = [d.lower() for d in new_exits if d.lower() in VALID_DIRECTIONS] |
| print("cleand directions", clean_directions) |
| |
| if curr_id not in self.graph: |
| self.graph[curr_id] = { |
| "name": curr_name, |
| "exits": {}, |
| "blocked_exits" : {}, |
| "potential_exits": clean_directions, |
| "puzzles": structured_data.get("puzzle_clues", []), |
| "items": structured_data.get("takeable_objects", []), |
| "scenery": structured_data.get("interactable_features", []), |
| "visited_count": 1, |
| "items_scanned": False , |
| "description": observation |
| } |
| else: |
| self.graph[curr_id]["visited_count"] += 1 |
| self.graph[curr_id]["items"] = structured_data.get("takeable_objects", []) |
| self.graph[curr_id]["puzzles"] = list(set(self.graph[curr_id]["puzzles"] + structured_data.get("puzzle_clues", []))) |
|
|
| existing_potentials = set(self.graph[curr_id].get("potential_exits", [])) |
| existing_potentials.update(clean_directions) |
| self.graph[curr_id]["potential_exits"] = list(existing_potentials) |
|
|
| direction_taken = self._extract_direction(last_action) |
|
|
| if self.last_room_id and self.last_room_id != curr_id and direction_taken: |
| |
| self.graph[self.last_room_id]["exits"][direction_taken.lower()] = curr_id |
| |
| opp = self._get_opposite(direction_taken.lower()) |
| if opp: |
| |
| self.graph[curr_id]["exits"][opp] = self.last_room_id |
| print(f"🗺️ Map Link: {self.graph[self.last_room_id]['name']} <({direction_taken})--({opp})> {curr_name}") |
|
|
| self.last_room_id = curr_id |
|
|
| def mark_as_scanned(self, room_id: str): |
| """Appelé quand l'agent a fini d'examiner tous les objets d'une pièce.""" |
| if room_id in self.graph: |
| self.graph[room_id]["items_scanned"] = True |
|
|
| def mark_blocked_exit(self, room_id: str, direction: str, reason: str): |
| """Stocke une direction qui a échoué pour ne plus la tenter inutilement.""" |
| room_id = str(room_id) |
| if room_id in self.graph: |
| if "blocked_exits" not in self.graph[room_id]: |
| self.graph[room_id]["blocked_exits"] = {} |
| |
| |
| self.graph[room_id]["blocked_exits"][direction.lower()] = reason |
|
|
| def _extract_direction(self, action: str) -> str: |
| action = action.lower().strip() |
| |
| |
| mapping = { |
| "n": "north", |
| "s": "south", |
| "e": "east", |
| "w": "west", |
| "u": "up", |
| "d": "down", |
| "ne": "northeast", |
| "nw": "northwest", |
| "se": "southeast", |
| "sw": "southwest", |
| "in": "inside", |
| "out": "outside", |
| "enter": "inside", |
| "exit": "outside" |
| } |
|
|
| |
| if action in mapping: |
| return mapping[action] |
|
|
| |
| |
| full_directions = sorted(mapping.values(), key=len, reverse=True) |
| for d in full_directions: |
| if d in action: |
| return d |
| |
| return None |
|
|
| def get_navigation_guidance(self, current_id: str, visible_exits: list) -> list: |
| current_id = str(current_id) |
| room_data = self.graph.get(current_id, {}) |
| known_exits = room_data.get("exits", {}) |
| |
| nav_lines = [] |
| for direction in visible_exits: |
| dir_lower = direction.lower() |
| if dir_lower in known_exits: |
| target_id = known_exits[dir_lower] |
| target_name = self.graph.get(target_id, {}).get("name", "Unknown") |
| nav_lines.append(f"- {direction.upper()} : Already taken (leads to {target_name})") |
| else: |
| |
| nav_lines.append(f"- {direction.upper()} : 🌟 [NEW PATHWAY - TRY THIS]") |
| |
| return nav_lines |
|
|
| def _get_opposite(self, direction: str) -> str: |
| opposites = { |
| "north": "south", "south": "north", |
| "east": "west", "west": "east", |
| "up": "down", "down": "up", |
| "northeast": "southwest", "southwest": "northeast", |
| "northwest": "southeast", "southeast": "northwest", |
| "inside": "outside", "outside": "inside", |
| "n": "s", "s": "n", "e": "w", "w": "e", |
| "u": "d", "d": "u", "ne": "sw", "sw": "ne", |
| "nw": "se", "se": "nw", "in": "out", "out": "in" |
| } |
| return opposites.get(direction) |
|
|
| def generate_summary(self, current_id: str) -> str: |
| """Génère une vision stratégique complète pour le prompt de l'IA.""" |
| current_id = str(current_id) |
| if current_id not in self.graph: |
| return "### 🗺️ MAP: Position current unknown in strategic memory." |
|
|
| summary = ["### 🗺️ STRATEGIC MAP & SPATIAL MEMORY:"] |
| |
| |
| summary.append(f"**Current Location:** {self.graph[current_id]['name']}") |
| exits = self.graph[current_id].get("exits", {}) |
| if exits: |
| for direction, target_id in exits.items(): |
| room = self.graph.get(target_id, {}) |
| name = room.get("name", "Unknown Area") |
| |
| status = "✅ Scanned" if room.get("items_scanned") else "🔎 Items left" |
| summary.append(f"- {direction.upper()} -> {name} ({status})") |
| else: |
| summary.append("- No known exits explored yet from here.") |
|
|
| |
| |
| other_rooms_with_stuff = [] |
| for r_id, r_data in self.graph.items(): |
| if r_id != current_id: |
| items = r_data.get("items", []) |
| features = r_data.get("scenery", []) |
| |
| if items or features: |
| room_info = f"- In {r_data['name']}:" |
| if items: room_info += f" Items: [{', '.join(items)}]" |
| if features: room_info += f" Scenery: {', '.join(features)}" |
| other_rooms_with_stuff.append(room_info) |
|
|
| if other_rooms_with_stuff: |
| summary.append("\n**🌍 WORLD OBJECTS (Memory):**") |
| summary.extend(other_rooms_with_stuff) |
|
|
| |
| unsolved_puzzles = [] |
| for r_id, r_data in self.graph.items(): |
| for p in r_data.get("puzzles", []): |
| unsolved_puzzles.append(f"[{r_data['name']}] {p}") |
| |
| |
| |
| |
|
|
| |
| |
| |
| summary.append(f"**Current Location:** {self.graph[current_id]['name']}") |
|
|
| potential = self.graph[current_id].get("potential_exits", []) |
| known = self.graph[current_id].get("exits", {}) |
| blocked = self.graph[current_id].get("blocked_exits", {}) |
|
|
| summary.append("**Immediate Navigation:**") |
| |
| all_possible = set(potential) | set(known.keys()) | set(blocked.keys()) |
|
|
| if not all_possible : |
| summary.append("- ⚠️ NO EXITS DETECTED IN SCAN.") |
| summary.append("- **HYPOTHETICAL DIRECTIONS:** [NORTH, SOUTH, EAST, WEST,NORTHEAST,NORTHWEST, SOUTHEAST, SOUTHWEST , UP, DOWN]") |
| summary.append("- **ADVICE:** Use 'LOOK' or 'SEARCH' to confirm exits before moving, otherwise you might hit a wall.") |
|
|
| for d in all_possible: |
| d_l = d.lower() |
| if d_l in blocked: |
| summary.append(f"- {d.upper()} : 🚫 BLOCKED ({blocked[d_l]})") |
| elif d_l in known: |
| target_name = self.graph.get(known[d_l], {}).get("name", "Unknown") |
| summary.append(f"- {d.upper()} : Leads to {target_name} ✅") |
| else: |
| summary.append(f"- {d.upper()} : 🌟 [NEW - NEVER TESTED]") |
|
|
| room_data = self.graph.get(current_id, {}) |
| |
| |
| blocked = room_data.get("blocked_exits", {}) |
| if blocked: |
| summary.append("\n**🚫 BLOCKED / FAILED DIRECTIONS:**") |
| for d, reason in blocked.items(): |
| summary.append(f"- {d.upper()} : {reason}") |
|
|
| |
| for note in room_data.get("notes", []): |
| summary.append(f"- ⚠️ {note}") |
|
|
| return "\n".join(summary) |
|
|
|
|
| if __name__ == "__main__": |
| import asyncio |
| asyncio.run(test_agent()) |
|
|