clemdrl commited on
Commit
c1eb31c
·
1 Parent(s): 615a63b

Working MCP tools + ReAct loop

Browse files
Files changed (2) hide show
  1. agent.py +135 -8
  2. mcp_server.py +94 -44
agent.py CHANGED
@@ -172,10 +172,9 @@ class StudentAgent:
172
 
173
  def __init__(self):
174
  """Initialize your agent here."""
175
- # TODO: Initialize any state tracking you need
176
- # self.history = []
177
- # self.visited_locations = set()
178
- pass
179
 
180
  async def run(
181
  self,
@@ -224,20 +223,97 @@ class StudentAgent:
224
 
225
  # Placeholder implementation - replace with your code
226
  locations_visited = set()
227
- history = []
228
  final_score = 0
229
  moves = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
 
231
  # TODO: Your implementation here
232
  # ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
  return RunResult(
235
  final_score=final_score,
236
  max_score=350, # Zork1 max score, adjust if needed
237
  moves=moves,
238
  locations_visited=locations_visited,
239
  game_completed=False,
240
- history=history,
241
  )
242
 
243
  def _build_prompt(self, observation: str, history: list) -> str:
@@ -247,7 +323,25 @@ class StudentAgent:
247
  TODO: Implement this to create effective prompts
248
  """
249
  # TODO: Combine system prompt, history, and current observation
250
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
251
 
252
  def _parse_response(self, response: str) -> tuple[str, str, dict]:
253
  """
@@ -262,7 +356,40 @@ class StudentAgent:
262
  # THOUGHT: ...
263
  # TOOL: ...
264
  # ARGS: {...}
265
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
 
267
  def _call_llm(self, prompt: str, system_prompt: str, seed: int) -> str:
268
  """
 
172
 
173
  def __init__(self):
174
  """Initialize your agent here."""
175
+ self.history: list[tuple[str, str, str]] = [] # (thought, action, observation)
176
+ self.last_observation: str = ""
177
+ self.visited_obs_hashes: list[str] = [] # petit anti-loop basique
 
178
 
179
  async def run(
180
  self,
 
223
 
224
  # Placeholder implementation - replace with your code
225
  locations_visited = set()
 
226
  final_score = 0
227
  moves = 0
228
+
229
+ r = await client.call_tool("play_action", {"action": "look"})
230
+
231
+ # print("DEBUG type(r):", type(r))
232
+ # print("DEBUG dir(r) sample:", [a for a in dir(r) if a in ("content","result","data","message","output","text","value")])
233
+ # print("DEBUG repr(r):", repr(r))
234
+
235
+ # # si r a un attribut content, affiche-le
236
+ # if hasattr(r, "content"):
237
+ # print("DEBUG type(r.content):", type(r.content))
238
+ # print("DEBUG repr(r.content):", repr(r.content))
239
+ # if r.content:
240
+ # print("DEBUG type(r.content[0]):", type(r.content[0]))
241
+ # print("DEBUG repr(r.content[0]):", repr(r.content[0]))
242
+ # print("DEBUG dir(r.content[0]) sample:", [a for a in dir(r.content[0]) if a in ("text","value","data","content")])
243
+ observation = r.data if r else "No response"
244
+ self.last_observation = observation
245
+
246
 
247
  # TODO: Your implementation here
248
  # ...
249
+
250
+ for step in range(max_steps):
251
+ prompt = self._build_prompt(observation, self.history)
252
+
253
+ # 2) Appel LLM
254
+ llm_text = self._call_llm(prompt=prompt, system_prompt=SYSTEM_PROMPT, seed=seed + step)
255
+
256
+ # 3) Parse -> tool + args
257
+ thought, tool_name, args = self._parse_response(llm_text)
258
+
259
+ # fallback si le modèle sort n’importe quoi
260
+ if tool_name not in ("play_action", "inventory", "memory"):
261
+ tool_name = "play_action"
262
+ args = {"action": "look"}
263
+
264
+ if tool_name == "play_action" and ("action" not in args or not isinstance(args["action"], str)):
265
+ args = {"action": "look"}
266
+
267
+ # 4) Anti-loop très simple: si observation identique trop souvent, force look/inventory
268
+ obs_key = (observation.strip()[:200]).lower()
269
+ self.visited_obs_hashes.append(obs_key)
270
+ if len(self.visited_obs_hashes) > 6:
271
+ self.visited_obs_hashes.pop(0)
272
+ if self.visited_obs_hashes.count(obs_key) >= 3:
273
+ tool_name = "play_action"
274
+ args = {"action": "inventory" if step % 2 == 0 else "look"}
275
+ thought = thought + " (anti-loop fallback)"
276
+
277
+ # 5) Appel tool MCP
278
+ res = await client.call_tool(tool_name, args)
279
+ new_observation = res.data if res else "No response"
280
+
281
+ # 6) Update state
282
+ action_str = args.get("action", tool_name)
283
+ self.history.append((thought, action_str, new_observation))
284
+ if len(self.history) > 20:
285
+ self.history = self.history[-20:]
286
+
287
+ if verbose:
288
+ print(f"\nSTEP {step+1}/{max_steps}")
289
+ print(f"THOUGHT: {thought}")
290
+ print(f"TOOL: {tool_name}")
291
+ print(f"ARGS: {args}")
292
+ print(f"OBS:\n{new_observation}\n")
293
+
294
+ observation = new_observation
295
+ self.last_observation = observation
296
 
297
+ status = await client.call_tool("get_status", {})
298
+ status_txt = status.data if status else ""
299
+ m = re.search(r"SCORE:\s*(\d+)", status_txt)
300
+ if m: final_score = int(m.group(1))
301
+ m = re.search(r"MOVES:\s*(\d+)", status_txt)
302
+ if m: moves = int(m.group(1))
303
+
304
+ # location -> set
305
+ m = re.search(r"LOCATION:\s*(.*)", status_txt)
306
+ if m and m.group(1).strip():
307
+ locations_visited.add(m.group(1).strip())
308
+
309
+
310
  return RunResult(
311
  final_score=final_score,
312
  max_score=350, # Zork1 max score, adjust if needed
313
  moves=moves,
314
  locations_visited=locations_visited,
315
  game_completed=False,
316
+ history=self.history,
317
  )
318
 
319
  def _build_prompt(self, observation: str, history: list) -> str:
 
323
  TODO: Implement this to create effective prompts
324
  """
325
  # TODO: Combine system prompt, history, and current observation
326
+
327
+ recent = history[-6:]
328
+ hist_txt = ""
329
+ for i, (t, a, o) in enumerate(recent, 1):
330
+ o_short = o.strip().replace("\n", " ")
331
+ if len(o_short) > 300:
332
+ o_short = o_short[:300] + "..."
333
+ hist_txt += f"{i}. THOUGHT: {t}\n ACTION: {a}\n OBS: {o_short}\n"
334
+
335
+ obs_short = observation.strip()
336
+ if len(obs_short) > 1200:
337
+ obs_short = obs_short[:1200] + "..."
338
+
339
+ return (
340
+ f"GAME: {observation}\n\n"
341
+ f"RECENT HISTORY:\n{hist_txt if hist_txt else '(none)'}\n"
342
+ f"CURRENT OBSERVATION:\n{obs_short}\n\n"
343
+ f"Choose ONE next tool call."
344
+ )
345
 
346
  def _parse_response(self, response: str) -> tuple[str, str, dict]:
347
  """
 
356
  # THOUGHT: ...
357
  # TOOL: ...
358
  # ARGS: {...}
359
+ thought = ""
360
+ tool = ""
361
+ args: dict = {}
362
+
363
+ # tolérant aux espaces/variantes
364
+ thought_m = re.search(r"THOUGHT:\s*(.*)", response)
365
+ tool_m = re.search(r"TOOL:\s*(.*)", response)
366
+ args_m = re.search(r"ARGS:\s*(\{.*\})", response, flags=re.DOTALL)
367
+
368
+ if thought_m:
369
+ thought = thought_m.group(1).strip()
370
+ if tool_m:
371
+ tool = tool_m.group(1).strip()
372
+
373
+ if args_m:
374
+ raw = args_m.group(1).strip()
375
+ try:
376
+ args = json.loads(raw)
377
+ except Exception:
378
+ args = {}
379
+
380
+ # fallback si pas trouvé
381
+ if not tool:
382
+ tool = "play_action"
383
+ if tool == "play_action" and "action" not in args:
384
+ # essaie de deviner une action simple depuis la réponse
385
+ # ex: le modèle écrit "ACTION: look"
386
+ act_m = re.search(r"ACTION:\s*(.*)", response)
387
+ args = {"action": act_m.group(1).strip()} if act_m else {"action": "look"}
388
+
389
+ if not thought:
390
+ thought = "No thought"
391
+
392
+ return thought, tool, args
393
 
394
  def _call_llm(self, prompt: str, system_prompt: str, seed: int) -> str:
395
  """
mcp_server.py CHANGED
@@ -33,14 +33,12 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
33
  from fastmcp import FastMCP
34
  from games.zork_env import TextAdventureEnv
35
 
36
-
37
  # =============================================================================
38
  # Create the MCP Server
39
  # =============================================================================
40
 
41
  mcp = FastMCP("Student Text Adventure Server")
42
 
43
-
44
  # =============================================================================
45
  # Game State Management
46
  # =============================================================================
@@ -60,9 +58,11 @@ class GameManager:
60
  self.state = None
61
  self.game_name: str = ""
62
  # TODO: Add more state tracking
63
- # self.history: list[tuple[str, str]] = []
64
- # self.explored_locations: dict[str, set[str]] = {}
65
- # self.current_location: str = ""
 
 
66
 
67
  def initialize(self, game: str = "zork1"):
68
  """Initialize or reset the game."""
@@ -70,6 +70,12 @@ class GameManager:
70
  self.env = TextAdventureEnv(game)
71
  self.state = self.env.reset()
72
  # TODO: Reset your state tracking here
 
 
 
 
 
 
73
  return self.state.observation
74
 
75
  def step(self, action: str) -> str:
@@ -77,13 +83,25 @@ class GameManager:
77
  if self.env is None:
78
  self.initialize()
79
 
 
80
  self.state = self.env.step(action)
81
-
82
- # TODO: Update your state tracking here
83
- # self.history.append((action, self.state.observation))
84
- # Update location tracking, etc.
85
-
86
- return self.state.observation
 
 
 
 
 
 
 
 
 
 
 
87
 
88
  def get_score(self) -> int:
89
  """Get current score."""
@@ -93,6 +111,15 @@ class GameManager:
93
  """Get number of moves taken."""
94
  return self.state.moves if self.state else 0
95
 
 
 
 
 
 
 
 
 
 
96
 
97
  # Global game manager
98
  _game = GameManager()
@@ -145,44 +172,67 @@ def play_action(action: str) -> str:
145
 
146
  # TODO: Implement additional tools to help your agent
147
 
148
- # @mcp.tool()
149
- # def memory() -> str:
150
- # """
151
- # Get the current game state summary.
152
- #
153
- # Returns:
154
- # A summary including current location, score, moves, and recent history
155
- # """
156
- # game = get_game()
157
- # # TODO: Return useful state information
158
- # pass
 
 
 
 
159
 
 
 
 
 
160
 
161
- # @mcp.tool()
162
- # def inventory() -> str:
163
- # """
164
- # Check what the player is carrying.
165
- #
166
- # Returns:
167
- # List of items in the player's inventory
168
- # """
169
- # game = get_game()
170
- # result = game.step("inventory")
171
- # return result
172
 
 
 
 
 
 
 
 
 
 
 
 
173
 
174
- # @mcp.tool()
175
- # def get_map() -> str:
176
- # """
177
- # Get a map of explored locations.
178
- #
179
- # Returns:
180
- # A text representation of explored locations and connections
181
- # """
182
- # game = get_game()
183
- # # TODO: Return map of explored locations
184
- # pass
185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
 
187
  # @mcp.tool()
188
  # def get_valid_actions() -> str:
 
33
  from fastmcp import FastMCP
34
  from games.zork_env import TextAdventureEnv
35
 
 
36
  # =============================================================================
37
  # Create the MCP Server
38
  # =============================================================================
39
 
40
  mcp = FastMCP("Student Text Adventure Server")
41
 
 
42
  # =============================================================================
43
  # Game State Management
44
  # =============================================================================
 
58
  self.state = None
59
  self.game_name: str = ""
60
  # TODO: Add more state tracking
61
+ self.history: list[tuple[str, str]] = []
62
+ self.explored_locations: dict[str, set[str]] = {}
63
+ self.current_location: str = ""
64
+ self.map_edges: dict[str, dict[str, str]] = {}
65
+ self.last_location: str = ""
66
 
67
  def initialize(self, game: str = "zork1"):
68
  """Initialize or reset the game."""
 
70
  self.env = TextAdventureEnv(game)
71
  self.state = self.env.reset()
72
  # TODO: Reset your state tracking here
73
+ self.history = []
74
+ self.map_edges = {}
75
+ self.last_location = self._extract_location(self.state.observation)
76
+
77
+ obs0 = self.state.observation or ""
78
+ self.history.append(("[RESET]", obs0.strip().replace("\n", " ")[:240]))
79
  return self.state.observation
80
 
81
  def step(self, action: str) -> str:
 
83
  if self.env is None:
84
  self.initialize()
85
 
86
+ prev_loc = self.last_location
87
  self.state = self.env.step(action)
88
+ obs = self.state.observation or ""
89
+
90
+ # update location + map
91
+ new_loc = self._extract_location(obs)
92
+ if prev_loc and new_loc and action:
93
+ self.map_edges.setdefault(prev_loc, {})[action.strip().lower()] = new_loc
94
+ self.last_location = new_loc
95
+
96
+ # update history (court)
97
+ obs_short = obs.strip().replace("\n", " ")
98
+ if len(obs_short) > 240:
99
+ obs_short = obs_short[:240] + "..."
100
+ self.history.append((action, obs_short))
101
+ if len(self.history) > 30:
102
+ self.history = self.history[-30:]
103
+
104
+ return obs
105
 
106
  def get_score(self) -> int:
107
  """Get current score."""
 
111
  """Get number of moves taken."""
112
  return self.state.moves if self.state else 0
113
 
114
+ def _extract_location(self, observation: str) -> str:
115
+ """Heuristique: la 1ère ligne non vide est souvent le nom de la room."""
116
+ for line in (observation or "").splitlines():
117
+ line = line.strip()
118
+ if line:
119
+ # évite des lignes trop longues (descriptions)
120
+ return line[:80]
121
+ return ""
122
+
123
 
124
  # Global game manager
125
  _game = GameManager()
 
172
 
173
  # TODO: Implement additional tools to help your agent
174
 
175
+ @mcp.tool()
176
+ def memory() -> str:
177
+ """
178
+ Get the current game state summary.
179
+
180
+ Returns:
181
+ A summary including current location, score, moves, and recent history
182
+ """
183
+ game = get_game()
184
+ # TODO: Return useful state information
185
+ lines = []
186
+ lines.append(f"GAME: {game.game_name}")
187
+ if game.last_location:
188
+ lines.append(f"LOCATION: {game.last_location}")
189
+ lines.append(f"SCORE: {game.get_score()} MOVES: {game.get_moves()}")
190
 
191
+ lines.append("RECENT:")
192
+ for a, o in game.history[-8:]:
193
+ lines.append(f"- action: {a}")
194
+ lines.append(f" obs: {o}")
195
 
196
+ return "\n".join(lines)
 
 
 
 
 
 
 
 
 
 
197
 
198
+ @mcp.tool()
199
+ def inventory() -> str:
200
+ """
201
+ Check what the player is carrying.
202
+
203
+ Returns:
204
+ List of items in the player's inventory
205
+ """
206
+ game = get_game()
207
+ result = game.step("inventory")
208
+ return result
209
 
 
 
 
 
 
 
 
 
 
 
 
210
 
211
+ @mcp.tool()
212
+ def get_map() -> str:
213
+ """
214
+ Get a map of explored locations.
215
+
216
+ Returns:
217
+ A text representation of explored locations and connections
218
+ """
219
+ game = get_game()
220
+ if not game.map_edges:
221
+ return "MAP: (empty)"
222
+
223
+ out = ["MAP:"]
224
+ for src, edges in game.map_edges.items():
225
+ for act, dst in edges.items():
226
+ out.append(f"- {src} --{act}--> {dst}")
227
+ return "\n".join(out)
228
+
229
+ @mcp.tool()
230
+ def get_status() -> str:
231
+ """
232
+ Tool simple et stable: score/moves/location sans polluer le jeu.
233
+ """
234
+ game = get_game()
235
+ return f"LOCATION: {game.last_location}\nSCORE: {game.get_score()}\nMOVES: {game.get_moves()}"
236
 
237
  # @mcp.tool()
238
  # def get_valid_actions() -> str: