File size: 18,952 Bytes
e1da269 d9d90c6 42e80f7 d9d90c6 42e80f7 e1da269 42e80f7 e1da269 42e80f7 e1da269 d9d90c6 0c8ceb9 a8bba7b 615a63b e1da269 42e80f7 d9d90c6 a8bba7b 42e80f7 d9d90c6 42e80f7 e1da269 42e80f7 e1da269 d9d90c6 42e80f7 d9d90c6 42e80f7 e1da269 42e80f7 e1da269 42e80f7 e1da269 d9d90c6 42e80f7 e1da269 42e80f7 e1da269 42e80f7 d9d90c6 42e80f7 a8bba7b d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 07df9e7 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 d9d90c6 0c8ceb9 d9d90c6 42e80f7 07df9e7 42e80f7 d9d90c6 42e80f7 0c8ceb9 d9d90c6 42e80f7 0c8ceb9 42e80f7 0c8ceb9 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 07df9e7 42e80f7 07df9e7 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 42e80f7 0c8ceb9 d9d90c6 42e80f7 d9d90c6 42e80f7 d9d90c6 42e80f7 0c8ceb9 42e80f7 a8bba7b 42e80f7 a8bba7b 07df9e7 42e80f7 0c8ceb9 d9d90c6 42e80f7 0c8ceb9 42e80f7 d9d90c6 42e80f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 | """
Exploration-first hybrid ReAct agent (score + locations) for text adventures.
Key points:
- Deterministic policy driven by server status() JSON.
- ReAct loop explicit each step: THOUGHT -> TOOL(play_action) -> OBSERVATION
- Priority:
A) Valid untried exits (Jericho-validated) + obs-boosted directions
B) Bounded suggested_interactions (game-validated)
C) BFS backtrack to nearest frontier (room with untried exits)
D) Stuck recovery (look/inventory/examine noun)
E) Optional single LLM fallback if HF_TOKEN is present (never required)
- Uses peek_action (if available) to score a small candidate set quickly.
- All verbose/debug output goes to stderr only.
"""
import json
import os
import re
import sys
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
from dotenv import load_dotenv
from huggingface_hub import InferenceClient
load_dotenv()
# =============================================================================
# LLM Configuration (fixed model for fairness)
# =============================================================================
LLM_MODEL = "Qwen/Qwen2.5-72B-Instruct"
_hf_token = os.getenv("HF_TOKEN")
LLM_CLIENT = InferenceClient(token=_hf_token) if _hf_token else None
def call_llm(prompt: str, system_prompt: str, seed: int, max_tokens: int = 120) -> str:
"""LLM call used only as last-resort fallback (optional)."""
if LLM_CLIENT is None:
raise RuntimeError("HF_TOKEN missing => LLM unavailable")
r = LLM_CLIENT.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
temperature=0.0,
max_tokens=max_tokens,
seed=seed,
)
return r.choices[0].message.content or ""
@dataclass
class RunResult:
final_score: int
max_score: int
moves: int
locations_visited: set
game_completed: bool
error: Optional[str] = None
history: list[tuple[str, str, str]] = field(default_factory=list)
# =============================================================================
# Tunables
# =============================================================================
MAX_INTERACTIONS = 4
STUCK_THRESHOLD = 10
MEMORY_LEN = 20
PEEK_K = 6 # lower if too slow; higher can improve decisions but costs time
UNSAFE_STARTS = (
"burn ", "set fire", "ignite ",
"attack ", "kill ", "hit ", "stab ", "shoot ", "punch ", "fight ",
"destroy ", "break ", "smash ",
"eat ",
)
DIR_WORD_RE = re.compile(
r"\b(north(?:east|west)?|south(?:east|west)?|east|west|"
r"northeast|northwest|southeast|southwest|up|down|in|out)\b",
re.IGNORECASE,
)
DISAMBIG_RE = re.compile(
r"which do you mean|do you mean|be more specific|what do you want",
re.IGNORECASE,
)
OPTION_RE = re.compile(r"\bthe\s+([a-z]+(?:\s+[a-z]+)?)", re.IGNORECASE)
LLM_SYSTEM = (
"You play a text adventure game. Propose ONE action (<= 5 words) that helps "
"explore a new location or gain points. Reply with exactly one line:\n"
"ACTION: <command>"
)
class StudentAgent:
def __init__(self) -> None:
self.visited: set[int] = set()
self.graph: dict[int, dict[str, int]] = {}
self.loc_untried: dict[int, list[str]] = {}
self.interactions_done: dict[int, int] = {}
# recent_memory: (action, loc_id, score_before, obs_snip_after)
self.recent_memory = deque(maxlen=MEMORY_LEN)
self.no_progress_steps = 0
self.llm_calls = 0
self.last_action = ""
# =============================================================================
# ReAct loop
# =============================================================================
async def run(self, client, game: str, max_steps: int, seed: int, verbose: bool = False) -> RunResult:
history: list[tuple[str, str, str]] = []
moves_taken = 0
final_score = 0
max_score = 0
game_completed = False
last_status = {}
tools = await client.list_tools()
tool_names = {t.name for t in tools}
has_peek = "peek_action" in tool_names
# Initial observation
init_obs = await client.call_tool("play_action", {"action": "look"})
moves_taken += 1
self.last_action = "look"
history.append((
"THOUGHT: Start by looking around to ground the state.",
"TOOL: play_action ARGS: {'action': 'look'}",
self._text(init_obs)[:160],
))
prev_score = 0
prev_loc = -1
while moves_taken < max_steps:
# Observation/telemetry (does not consume moves)
try:
raw = await client.call_tool("status", {})
status = json.loads(self._text(raw))
last_status = status
except Exception:
status = last_status
if not status:
# Emergency fallback
thought = "THOUGHT: Status unavailable; use a safe action to recover."
tool_call = "TOOL: play_action ARGS: {'action': 'look'}"
res = await client.call_tool("play_action", {"action": "look"})
moves_taken += 1
obs_txt = self._text(res)
history.append((thought, tool_call, obs_txt[:160]))
continue
loc_id = int(status["loc_id"])
score = int(status.get("score", 0))
final_score = score
max_score = int(status.get("max_score", max_score) or max_score)
done = bool(status.get("done", False))
self.visited.add(loc_id)
self._merge_edges(loc_id, status.get("edges_here", {}) or {})
self.loc_untried[loc_id] = list(status.get("untried_directions", []) or [])
if score == prev_score and loc_id == prev_loc:
self.no_progress_steps += 1
else:
self.no_progress_steps = 0
prev_score, prev_loc = score, loc_id
if done:
game_completed = True
break
# Decide next action (deterministic heuristics + optional LLM fallback)
thought_reason, action = self._decide(status, seed)
# Optional look-ahead improvement
if has_peek:
action = await self._peek_pick(client, status, action)
action = self._sanitize_action(action)
# ReAct record (explicit)
thought = f"THOUGHT: {thought_reason}"
tool_call = f"TOOL: play_action ARGS: {{'action': '{action}'}}"
# Execute action
res = await client.call_tool("play_action", {"action": action})
moves_taken += 1
obs2 = self._text(res)
# Update recent memory for loop avoidance
self.recent_memory.append((action.lower().strip(), loc_id, score, obs2[:60]))
self.last_action = action
if verbose:
print(
f"[step] loc={loc_id} score={score} stuck={self.no_progress_steps} -> {action!r}",
file=sys.stderr,
)
history.append((thought, tool_call, obs2[:160]))
if self._is_game_over(obs2):
game_completed = True
break
# final status (best effort)
try:
raw = await client.call_tool("status", {})
st2 = json.loads(self._text(raw))
final_score = max(final_score, int(st2.get("score", 0)))
max_score = max_score or int(st2.get("max_score", 0))
self.visited.add(int(st2["loc_id"]))
except Exception:
pass
return RunResult(
final_score=final_score,
max_score=max_score,
moves=moves_taken,
locations_visited=self.visited,
game_completed=game_completed,
history=history,
)
# =============================================================================
# Decision logic
# =============================================================================
def _decide(self, status: dict, seed: int) -> tuple[str, str]:
loc_id = int(status["loc_id"])
obs = status.get("last_observation", "") or ""
outcomes = status.get("outcomes_here", {}) or {}
banned = {str(x).lower().strip() for x in (status.get("banned_actions_here", []) or [])}
untried = status.get("untried_directions", []) or []
valid_exits = status.get("valid_exits", []) or []
suggested = status.get("suggested_interactions", []) or []
# 0) Disambiguation
if DISAMBIG_RE.search(obs):
opt = self._extract_option(obs)
if opt and not self._repeat_noop(opt, loc_id):
return "Disambiguation requested by the game; answer with the first plausible option.", opt
# A1) Jericho-validated untried exits
untried_set = set(untried)
obs_dirs = self._mentioned_dirs(obs)
for d in valid_exits:
dl = d.lower().strip()
if d in untried_set and dl not in banned and not self._repeat_noop(d, loc_id):
return f"Take a valid untried exit to explore: {d}.", d
# A2) Observation-boosted untried dirs
for d in obs_dirs:
if d in untried_set and d.lower() not in banned and not self._repeat_noop(d, loc_id):
return f"Direction mentioned in observation and untried; explore: {d}.", d
# A3) Any untried direction
for d in untried:
if d.lower() not in banned and not self._repeat_noop(d, loc_id):
return f"No strong cue; systematically try untried direction: {d}.", d
# B) Bounded safe interactions
n = self.interactions_done.get(loc_id, 0)
if n < MAX_INTERACTIONS:
for a in suggested:
al = a.lower().strip()
if al in banned:
continue
if any(al.startswith(x) for x in UNSAFE_STARTS):
continue
if a in outcomes:
continue
if self._repeat_noop(a, loc_id):
continue
self.interactions_done[loc_id] = n + 1
return f"Try a game-validated interaction in this room (#{n+1}): {a}.", a
# C) BFS backtrack to frontier
avoid = self._oscillation_avoid()
step_dir = self._bfs_step(loc_id, avoid)
if step_dir:
return "No local frontier; backtrack via BFS to nearest unexplored frontier.", step_dir
# D) Stuck recovery
if self.no_progress_steps >= STUCK_THRESHOLD:
for a in ("look", "inventory"):
if not self._repeat_noop(a, loc_id):
return "Stuck for many steps; run a safe recovery action.", a
noun = self._extract_noun(obs)
if noun and not self._repeat_noop(f"examine {noun}", loc_id):
return "Stuck; examine a likely noun from the observation.", f"examine {noun}"
# E) Optional LLM fallback
if LLM_CLIENT is not None:
try:
self.llm_calls += 1
prompt = self._llm_prompt(status)
resp = call_llm(prompt, LLM_SYSTEM, seed + self.llm_calls)
act = self._parse_llm(resp)
if act and act.lower().strip() not in banned and not self._repeat_noop(act, loc_id):
return "Heuristics exhausted; use one short LLM suggestion (optional fallback).", act
except Exception:
pass
return "Fallback to a safe neutral action.", "look"
async def _peek_pick(self, client, status: dict, current_action: str) -> str:
"""Use peek_action to score a small candidate set and pick best."""
loc_id = int(status["loc_id"])
score = int(status.get("score", 0))
candidates = []
if current_action:
candidates.append(current_action)
for d in (status.get("untried_directions", []) or [])[:4]:
if d not in candidates:
candidates.append(d)
for a in (status.get("suggested_interactions", []) or [])[:4]:
if a not in candidates:
candidates.append(a)
candidates = candidates[:PEEK_K]
best = current_action
best_u = -10**18
for a in candidates:
try:
raw = await client.call_tool("peek_action", {"action": a})
st = json.loads(self._text(raw))
new_score = int(st.get("score", score))
new_loc = int(st.get("loc_id", loc_id))
delta = max(0, new_score - score)
if new_loc != loc_id:
moved_bonus = 600 if (new_loc not in self.visited) else 80
else:
moved_bonus = 0
repeat_pen = 120 if self._repeat_noop(a, loc_id) else 0
u = delta * 900 + moved_bonus - repeat_pen
if u > best_u:
best_u = u
best = a
except Exception:
continue
return best
# =============================================================================
# Graph / BFS
# =============================================================================
def _merge_edges(self, loc_id: int, edges_here: dict) -> None:
if not edges_here:
return
node = self.graph.setdefault(loc_id, {})
for d, nid in edges_here.items():
try:
node[str(d)] = int(nid)
except Exception:
pass
def _oscillation_avoid(self) -> Optional[int]:
locs = [x[1] for x in self.recent_memory]
if len(locs) >= 4 and locs[-1] == locs[-3] and locs[-2] == locs[-4]:
return locs[-2]
return None
def _bfs_step(self, from_loc: int, avoid_loc: Optional[int]) -> Optional[str]:
frontier = {lid for lid, u in self.loc_untried.items() if u and lid != from_loc}
if not frontier:
return None
q = deque()
seen = {from_loc}
for d, nid in self.graph.get(from_loc, {}).items():
if nid not in seen and nid != avoid_loc:
q.append((nid, d))
seen.add(nid)
while q:
cur, first_dir = q.popleft()
if cur in frontier:
return first_dir
for d, nid in self.graph.get(cur, {}).items():
if nid not in seen:
seen.add(nid)
q.append((nid, first_dir))
return None
# =============================================================================
# Parsing / loop helpers
# =============================================================================
def _repeat_noop(self, action: str, loc_id: int) -> bool:
a = (action or "").lower().strip()
return any(prev_a == a and prev_loc == loc_id for (prev_a, prev_loc, _sc, _o) in self.recent_memory)
def _mentioned_dirs(self, obs: str) -> list[str]:
out = []
for m in DIR_WORD_RE.finditer(obs or ""):
d = m.group(1).lower()
if d not in out:
out.append(d)
return out
def _extract_option(self, obs: str) -> Optional[str]:
m = OPTION_RE.search(obs or "")
if m:
return m.group(1).strip().lower()
return None
def _extract_noun(self, obs: str) -> Optional[str]:
m = re.search(r"\bthe\s+([a-z]{3,})\b", (obs or "").lower())
if m:
noun = m.group(1)
# avoid directions being interpreted as nouns
if noun not in {"north", "south", "east", "west", "up", "down", "in", "out"}:
return noun
return None
def _sanitize_action(self, a: str) -> str:
a = (a or "").strip()
a = re.sub(r"[`\"']", "", a)
a = re.sub(r"\s+", " ", a).strip()
words = a.split()[:6]
return " ".join(words) if words else "look"
def _llm_prompt(self, status: dict) -> str:
inv = ", ".join(status.get("inventory", [])) or "empty"
tried = ", ".join(list((status.get("outcomes_here") or {}).keys())[:20]) or "none"
banned = ", ".join(status.get("banned_actions_here", [])) or "none"
return (
f"Location: {status.get('loc_name')} (id={status.get('loc_id')})\n"
f"Score: {status.get('score')}/{status.get('max_score')} Moves: {status.get('moves')}\n"
f"Inventory: {inv}\n"
f"Untried dirs: {', '.join((status.get('untried_directions') or [])[:12])}\n"
f"Tried here: {tried}\n"
f"BANNED: {banned}\n\n"
f"Observation:\n{(status.get('last_observation') or '')[:500]}\n"
)
def _parse_llm(self, resp: str) -> str:
for line in (resp or "").splitlines():
line = line.strip()
if not line:
continue
if line.upper().startswith("ACTION:"):
line = line.split(":", 1)[1].strip()
line = line.lower()
m = re.match(
r"^(?:go\s+)?(north(?:east|west)?|south(?:east|west)?|east|west|up|down|in|out)\b",
line,
)
if m:
return m.group(1)
return " ".join(line.split()[:5])
return "look"
def _is_game_over(self, text: str) -> bool:
t = (text or "").lower()
return any(x in t for x in ("game over", "you have died", "you are dead", "you have won"))
def _text(self, result) -> str:
try:
if hasattr(result, "content") and result.content:
return result.content[0].text
if isinstance(result, list) and result:
return result[0].text
except Exception:
pass
return str(result)
# Optional smoke-test (does not run during evaluation import)
async def _test() -> None:
from fastmcp import Client
from fastmcp.client.transports import StdioTransport
import sys as _sys
import os as _os
transport = StdioTransport(
command=_sys.executable,
args=[_os.path.join(_os.path.dirname(__file__), "mcp_server.py")],
env={**_os.environ, "GAME": "lostpig"},
)
agent = StudentAgent()
async with Client(transport) as client:
res = await agent.run(client, game="lostpig", max_steps=30, seed=42, verbose=True)
print(
f"Score: {res.final_score}/{res.max_score} | Moves: {res.moves} | Locations: {len(res.locations_visited)}",
file=sys.stderr,
)
if __name__ == "__main__":
import asyncio
asyncio.run(_test()) |