Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Literal, Protocol | |
| from dotenv import load_dotenv | |
| from google import genai | |
| from google.genai import types | |
| from textworld.core import GameState | |
| from agents.hero.cli import parse_cli_command | |
| from .base import DMInterfaceError, SUPPORTED_DIRECTIONS | |
| if TYPE_CHECKING: | |
| from .session import EpisodeSession | |
| DEFAULT_GEMINI_MODEL = "gemini-2.5-flash-lite" | |
| _TEXTWORLD_PROMPT_LINE_RE = re.compile(r"^\s*>\s.*-\=\s.*=\-(?:\d+/\d+)?\s*$") | |
| _TEXTWORLD_BANNER_CHAR_RE = re.compile(r"[\\|$_/]") | |
| _TEXTWORLD_ROOM_HEADER_RE = re.compile(r"^\s*-\=\s*(?P<label>.+?)\s*\=-\s*$") | |
| _TEXTWORLD_META_LINE_RE = re.compile(r"^\s*(?:score:|moves:|available commands:|type 'help')", re.IGNORECASE) | |
| class InterfaceAdapter(Protocol): | |
| def translate_command(self, raw_command: str, session: EpisodeSession) -> str: | |
| ... | |
| def render_observation(self, feedback: str, state: GameState | None, session: EpisodeSession) -> str: | |
| ... | |
| class SimpleInterfaceAdapter: | |
| """A deterministic parser for explicit non-LLM play.""" | |
| _ARTICLE_RE = re.compile(r"\b(the|a|an)\b", re.IGNORECASE) | |
| def translate_command(self, raw_command: str, session: EpisodeSession) -> str: | |
| command = raw_command.strip() | |
| lowered = command.lower() | |
| if lowered in SUPPORTED_DIRECTIONS: | |
| return "go " + lowered | |
| if lowered in {"look", "look around"}: | |
| return "look" | |
| if lowered in {"inventory", "check inventory", "show inventory"}: | |
| return "inventory" | |
| if lowered in {"wait", "pass"}: | |
| return "wait" | |
| if lowered.startswith("answer "): | |
| return "submit " + command[7:].strip() | |
| if lowered.startswith("say "): | |
| return "submit " + command[4:].strip().strip("\"'") | |
| if lowered.startswith("talk to "): | |
| return "talk " + command[8:].strip() | |
| if lowered.startswith("speak to "): | |
| return "talk " + command[9:].strip() | |
| if lowered.startswith("use ") and " on " in lowered: | |
| item_text, target_text = re.split(r"\s+on\s+", command[4:].strip(), maxsplit=1, flags=re.IGNORECASE) | |
| return "use " + self._normalize_object_text(item_text) + " on " + self._normalize_object_text(target_text) | |
| if lowered.startswith("give ") and " to " in lowered: | |
| item_text, target_text = re.split(r"\s+to\s+", command[5:].strip(), maxsplit=1, flags=re.IGNORECASE) | |
| return "give " + self._normalize_object_text(item_text) + " to " + self._normalize_object_text(target_text) | |
| if lowered.startswith("combine ") and " with " in lowered: | |
| item_a, item_b = re.split(r"\s+with\s+", command[8:].strip(), maxsplit=1, flags=re.IGNORECASE) | |
| return "combine " + self._normalize_object_text(item_a) + " with " + self._normalize_object_text(item_b) | |
| if lowered.startswith("combine ") and " and " in lowered: | |
| item_a, item_b = re.split(r"\s+and\s+", command[8:].strip(), maxsplit=1, flags=re.IGNORECASE) | |
| return "combine " + self._normalize_object_text(item_a) + " with " + self._normalize_object_text(item_b) | |
| parts = command.split(maxsplit=1) | |
| if len(parts) != 2: | |
| return lowered | |
| verb = parts[0].lower() | |
| if verb not in {"read", "talk", "open", "take", "unlock", "examine"}: | |
| return lowered | |
| normalized = self._normalize_object_text(parts[1]) | |
| if verb == "examine": | |
| if session.node_id_for_command_name(normalized, node_types={"readable"}): | |
| return "read " + normalized | |
| if session.node_id_for_command_name(normalized, node_types={"npc"}): | |
| return "talk " + normalized | |
| return verb + " " + normalized | |
| def _normalize_object_text(self, text: str) -> str: | |
| object_text = self._ARTICLE_RE.sub(" ", text) | |
| return re.sub(r"\s+", " ", object_text).strip().lower() | |
| def render_observation(self, feedback: str, state: GameState | None, session: EpisodeSession) -> str: | |
| del state | |
| return enrich_feedback_text(sanitize_feedback_text(feedback), session) | |
| class StrictCliInterfaceAdapter: | |
| """A deterministic adapter for parser-style CLI commands.""" | |
| def translate_command(self, raw_command: str, session: EpisodeSession) -> str: | |
| del session | |
| parsed = parse_cli_command(raw_command) | |
| if not parsed.valid or parsed.normalized_command is None: | |
| raise DMInterfaceError(parsed.error or "Command does not match the strict CLI grammar.") | |
| return parsed.normalized_command | |
| def render_observation(self, feedback: str, state: GameState | None, session: EpisodeSession) -> str: | |
| del state | |
| return enrich_feedback_text(sanitize_feedback_text(feedback), session) | |
| class _TranslationGlossary: | |
| canonical_to_alias: dict[str, str] | |
| alias_to_canonical: dict[str, str] | |
| class GeminiInterfaceAdapter: | |
| _ARTICLE_RE = re.compile(r"\b(the|a|an)\b", re.IGNORECASE) | |
| _PARSER_SAFE_NAME_RE = re.compile(r"^[a-z0-9]+(?: [a-z0-9]+)*$") | |
| _TRAILING_POLITENESS_RE = re.compile(r"(?:\s+(?:please|for me|thanks|thank you))+[.!?]*$", re.IGNORECASE) | |
| _COMMAND_SYSTEM = ( | |
| "Translate the player's text into exactly one canonical dungeon command. " | |
| "Return only the command and nothing else." | |
| ) | |
| _OBSERVATION_SYSTEM = ( | |
| "Rewrite dungeon feedback in at most two short sentences. " | |
| "Preserve facts exactly. Do not infer, solve, explain, or add implications." | |
| ) | |
| _TRANSLATED_COMMAND_SYSTEM = ( | |
| "The player is using a corporate app metaphor layered over a fantasy dungeon. " | |
| "Translate the player's text back into exactly one canonical dungeon command from the underlying fantasy world. " | |
| "Return only the canonical command and nothing else." | |
| ) | |
| _TRANSLATED_OBSERVATION_SYSTEM = ( | |
| "Rewrite the dungeon observation as a corporate app interface while preserving facts one-to-one. " | |
| "Use the provided aliases exactly, keep directions unchanged, and do not add hints, solutions, or new mechanics." | |
| ) | |
| _TRANSLATION_GLOSSARY_SYSTEM = ( | |
| "Create a one-to-one alias glossary that maps fantasy dungeon terms into a corporate app metaphor. " | |
| "Return JSON only." | |
| ) | |
| def __init__( | |
| self, | |
| api_key: str | None = None, | |
| model: str = DEFAULT_GEMINI_MODEL, | |
| narrate_observations: bool = False, | |
| translation_mode: Literal["none", "corporate_app"] = "none", | |
| max_admissible_commands: int = 18, | |
| ) -> None: | |
| if translation_mode not in {"none", "corporate_app"}: | |
| raise ValueError(f"Unsupported Gemini translation mode: {translation_mode}") | |
| self.model = model | |
| self.narrate_observations = narrate_observations | |
| self.translation_mode = translation_mode | |
| self.max_admissible_commands = max_admissible_commands | |
| self._client = self._create_client(api_key) | |
| self._translation_glossary_cache: dict[str, _TranslationGlossary] = {} | |
| self._translation_observation_cache: dict[tuple[str, str], str] = {} | |
| def translate_command(self, raw_command: str, session: EpisodeSession) -> str: | |
| lowered = raw_command.strip().lower() | |
| if not lowered: | |
| raise DMInterfaceError("Command must not be empty.") | |
| admissible = set(session.available_commands()) | |
| direct = self._normalize_generated_command(self._preprocess_player_text(lowered)) | |
| if resolved := self._resolve_candidate_command(direct, session, admissible): | |
| return resolved | |
| movement = self._extract_direction_command(lowered, admissible) | |
| if movement is not None: | |
| return movement | |
| prompt = self._command_prompt(raw_command, session, admissible) | |
| generated = self._generate_command( | |
| system_instruction=self._TRANSLATED_COMMAND_SYSTEM if self._translation_enabled() else self._COMMAND_SYSTEM, | |
| prompt=prompt, | |
| max_output_tokens=48, | |
| temperature=0.1, | |
| ) | |
| if resolved := self._resolve_candidate_command(generated, session, admissible): | |
| return resolved | |
| raise DMInterfaceError(f"Gemini returned an invalid command: {generated or '<empty>'}") | |
| def render_observation(self, feedback: str, state: GameState | None, session: EpisodeSession) -> str: | |
| sanitized = sanitize_feedback_text(feedback) | |
| enriched = enrich_feedback_text(sanitized, session) | |
| if not sanitized: | |
| return enriched | |
| if self._translation_enabled(): | |
| cache_key = (self._translation_cache_key(session), enriched) | |
| cached = self._translation_observation_cache.get(cache_key) | |
| if cached is not None: | |
| return cached | |
| prompt = self._observation_prompt(enriched, session) | |
| generated = self._generate_observation( | |
| system_instruction=self._TRANSLATED_OBSERVATION_SYSTEM, | |
| prompt=prompt, | |
| max_output_tokens=220 if not self.narrate_observations else 120, | |
| temperature=0.2, | |
| ) | |
| if not generated: | |
| raise DMInterfaceError("Gemini returned an empty translated observation.") | |
| self._translation_observation_cache[cache_key] = generated | |
| return generated | |
| if not self.narrate_observations: | |
| return enriched | |
| if self._should_preserve_feedback(sanitized, state): | |
| return enriched | |
| prompt = self._observation_prompt(sanitized, session) | |
| generated = self._generate_observation( | |
| system_instruction=self._OBSERVATION_SYSTEM, | |
| prompt=prompt, | |
| max_output_tokens=80, | |
| temperature=0.2, | |
| ) | |
| if not generated: | |
| raise DMInterfaceError("Gemini returned an empty observation.") | |
| return enrich_feedback_text(generated, session) | |
| def _create_client(self, api_key: str | None) -> genai.Client: | |
| load_dotenv(self._repo_root() / ".env", override=False) | |
| key = api_key or os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") | |
| if not key: | |
| raise DMInterfaceError("Missing GEMINI_API_KEY or GOOGLE_API_KEY.") | |
| return genai.Client(api_key=key) | |
| def _repo_root() -> Path: | |
| return Path(__file__).resolve().parents[2] | |
| def _command_prompt(self, raw_command: str, session: EpisodeSession, admissible: set[str]) -> str: | |
| commands = sorted(admissible)[: self.max_admissible_commands] | |
| interactables = self._interactables(session) | |
| current_room = session.state.location or session.current_room_id | |
| lines: list[str] = [] | |
| if self._translation_enabled(): | |
| glossary = self._translation_glossary(session) | |
| lines.extend( | |
| [ | |
| "The player only sees the translated corporate-app interface.", | |
| "Map their request back to the underlying dungeon command.", | |
| "Treat rooms as apps/workspaces, NPCs as coworkers or reviewers, and items as files, tools, credentials, or tickets.", | |
| "Translated aliases (alias => canonical):", | |
| *[f"- {alias} => {canonical}" for alias, canonical in sorted(glossary.alias_to_canonical.items())], | |
| ] | |
| ) | |
| lines.extend( | |
| [ | |
| "Use an exact visible command whenever possible.", | |
| "Allowed verbs: go, open, unlock, take, read, use, combine, give, talk, submit, look, inventory, wait", | |
| f"Room: {current_room}", | |
| "Visible commands:", | |
| *[f"- {command}" for command in commands], | |
| ] | |
| ) | |
| if interactables: | |
| lines.append(f"Objects here: {', '.join(interactables)}") | |
| lines.append("If the player is answering the guardian, use: submit <answer>") | |
| lines.append("If no valid mapping exists, return INVALID") | |
| lines.append(f"Player text: {raw_command.strip()}") | |
| return "\n".join(lines) | |
| def _observation_prompt(self, feedback: str, session: EpisodeSession) -> str: | |
| current_room = session.state.location or session.current_room_id | |
| if self._translation_enabled(): | |
| glossary = self._translation_glossary(session) | |
| lines = [ | |
| f"Canonical room: {current_room}", | |
| "Use this exact alias glossary (canonical => alias):", | |
| *[f"- {canonical} => {alias}" for canonical, alias in sorted(glossary.canonical_to_alias.items())], | |
| "Preserve the same facts, object counts, and navigation affordances.", | |
| "Keep any 'Visible here:' and 'Exits:' sections, but rewrite the entity names with the aliases above.", | |
| ] | |
| if self.narrate_observations: | |
| lines.append("Keep the response compact.") | |
| lines.append("Canonical observation:") | |
| lines.append(feedback) | |
| return "\n".join(lines) | |
| return ( | |
| f"Room: {current_room}\n" | |
| "Describe only what the game text explicitly says.\n" | |
| "Never reveal what a clue means or what answer it implies.\n" | |
| f"Feedback: {feedback}" | |
| ) | |
| def _translation_glossary_prompt(self, session: EpisodeSession) -> str: | |
| lines = [ | |
| "Return JSON with shape: {\"aliases\": [{\"source\": \"...\", \"alias\": \"...\"}]}", | |
| "Rules:", | |
| "- Every alias must be unique.", | |
| "- Use lowercase letters, numbers, and spaces only.", | |
| "- Do not use articles like a, an, or the.", | |
| "- Keep aliases short and parser-safe.", | |
| "- Rooms should feel like apps, dashboards, workspaces, portals, or queues.", | |
| "- NPCs should feel like coworkers, reviewers, owners, admins, or operators.", | |
| "- Items should feel like files, tickets, tokens, credentials, tools, or documents.", | |
| "- Preserve identity one-to-one. Do not merge multiple source terms into one alias.", | |
| "Terms:", | |
| ] | |
| for kind, source in self._translation_terms(session): | |
| lines.append(f"- {kind}: {source}") | |
| return "\n".join(lines) | |
| def _interactables(self, session: EpisodeSession) -> list[str]: | |
| names: list[str] = [] | |
| for node in session.compiled.world.nodes: | |
| if getattr(node, "parent_id", None) != session.current_room_id: | |
| continue | |
| safe_name = session.compiled.node_command_names.get(node.id) | |
| if safe_name is not None and node.type in {"container", "readable", "npc", "door", "fixture"}: | |
| names.append(safe_name) | |
| return sorted(names)[:8] | |
| def _generate_response( | |
| self, | |
| *, | |
| system_instruction: str, | |
| prompt: str, | |
| max_output_tokens: int, | |
| temperature: float, | |
| ) -> str: | |
| response = self._client.models.generate_content( | |
| model=self.model, | |
| contents=f"{system_instruction}\n\n{prompt}", | |
| config=types.GenerateContentConfig( | |
| temperature=temperature, | |
| max_output_tokens=max_output_tokens, | |
| candidate_count=1, | |
| ), | |
| ) | |
| return getattr(response, "text", "") or "" | |
| def _generate_command( | |
| self, | |
| *, | |
| system_instruction: str, | |
| prompt: str, | |
| max_output_tokens: int, | |
| temperature: float, | |
| ) -> str: | |
| return self._sanitize_command_response( | |
| self._generate_response( | |
| system_instruction=system_instruction, | |
| prompt=prompt, | |
| max_output_tokens=max_output_tokens, | |
| temperature=temperature, | |
| ) | |
| ) | |
| def _generate_observation( | |
| self, | |
| *, | |
| system_instruction: str, | |
| prompt: str, | |
| max_output_tokens: int, | |
| temperature: float, | |
| ) -> str: | |
| return self._sanitize_multiline_response( | |
| self._generate_response( | |
| system_instruction=system_instruction, | |
| prompt=prompt, | |
| max_output_tokens=max_output_tokens, | |
| temperature=temperature, | |
| ) | |
| ) | |
| def _generate_json( | |
| self, | |
| *, | |
| system_instruction: str, | |
| prompt: str, | |
| max_output_tokens: int, | |
| temperature: float, | |
| ) -> str: | |
| return self._sanitize_json_response( | |
| self._generate_response( | |
| system_instruction=system_instruction, | |
| prompt=prompt, | |
| max_output_tokens=max_output_tokens, | |
| temperature=temperature, | |
| ) | |
| ) | |
| def _resolve_candidate_command( | |
| self, | |
| candidate: str, | |
| session: EpisodeSession, | |
| admissible: set[str], | |
| ) -> str | None: | |
| for option in self._candidate_variants(candidate, session): | |
| if not option: | |
| continue | |
| if option == "invalid": | |
| continue | |
| if resolved := self._resolve_admissible_command(option, admissible): | |
| return resolved | |
| if self._allow_unlisted_canonical(option): | |
| return option | |
| return None | |
| def _candidate_variants(self, candidate: str, session: EpisodeSession) -> list[str]: | |
| variants = [self._normalize_generated_command(candidate)] | |
| if self._translation_enabled(): | |
| canonicalized = self._canonicalize_translated_command(variants[0], session) | |
| if canonicalized not in variants: | |
| variants.insert(0, canonicalized) | |
| return variants | |
| def _canonicalize_translated_command(self, command: str, session: EpisodeSession) -> str: | |
| glossary = self._translation_glossary(session) | |
| rewritten = command | |
| for alias, canonical in sorted(glossary.alias_to_canonical.items(), key=lambda item: (-len(item[0]), item[0])): | |
| rewritten = re.sub( | |
| rf"(?<![a-z0-9]){re.escape(alias)}(?![a-z0-9])", | |
| canonical, | |
| rewritten, | |
| ) | |
| return self._normalize_generated_command(rewritten) | |
| def _translation_glossary(self, session: EpisodeSession) -> _TranslationGlossary: | |
| cache_key = self._translation_cache_key(session) | |
| cached = self._translation_glossary_cache.get(cache_key) | |
| if cached is not None: | |
| return cached | |
| terms = self._translation_terms(session) | |
| generated = self._generate_json( | |
| system_instruction=self._TRANSLATION_GLOSSARY_SYSTEM, | |
| prompt=self._translation_glossary_prompt(session), | |
| max_output_tokens=700, | |
| temperature=0.2, | |
| ) | |
| glossary = self._parse_translation_glossary(generated, terms) | |
| self._translation_glossary_cache[cache_key] = glossary | |
| return glossary | |
| def _parse_translation_glossary( | |
| self, | |
| payload: str, | |
| terms: list[tuple[str, str]], | |
| ) -> _TranslationGlossary: | |
| try: | |
| data = json.loads(payload) | |
| except json.JSONDecodeError as exc: | |
| raise DMInterfaceError("Gemini returned invalid translation glossary JSON.") from exc | |
| raw_aliases: dict[str, str] = {} | |
| if isinstance(data, dict): | |
| aliases = data.get("aliases", data) | |
| if isinstance(aliases, dict): | |
| raw_aliases = { | |
| self._normalize_object_text(str(source)): str(alias) | |
| for source, alias in aliases.items() | |
| if isinstance(source, str) | |
| } | |
| elif isinstance(aliases, list): | |
| for entry in aliases: | |
| if not isinstance(entry, dict): | |
| continue | |
| source = entry.get("source") | |
| alias = entry.get("alias") | |
| if isinstance(source, str) and isinstance(alias, str): | |
| raw_aliases[self._normalize_object_text(source)] = alias | |
| if not raw_aliases: | |
| raise DMInterfaceError("Gemini returned an empty translation glossary.") | |
| canonical_to_alias: dict[str, str] = {} | |
| alias_to_canonical: dict[str, str] = {} | |
| used_aliases: set[str] = set() | |
| for _kind, source in terms: | |
| requested_alias = self._normalize_parser_safe_alias(raw_aliases.get(source, "")) | |
| alias = self._dedupe_alias(source, requested_alias, used_aliases) | |
| canonical_to_alias[source] = alias | |
| alias_to_canonical[alias] = source | |
| used_aliases.add(alias) | |
| return _TranslationGlossary( | |
| canonical_to_alias=canonical_to_alias, | |
| alias_to_canonical=alias_to_canonical, | |
| ) | |
| def _translation_terms(self, session: EpisodeSession) -> list[tuple[str, str]]: | |
| terms: list[tuple[str, str]] = [] | |
| seen: set[str] = set() | |
| for node in session.compiled.world.nodes: | |
| source = session.compiled.node_command_names.get(node.id) | |
| if source is None or source in seen: | |
| continue | |
| kind = "room" if node.type in {"location", "junction"} else node.type | |
| seen.add(source) | |
| terms.append((kind, source)) | |
| for item in session.compiled.world.items: | |
| source = session.compiled.item_command_names.get(item.id) | |
| if source is None or source in seen: | |
| continue | |
| seen.add(source) | |
| terms.append(("item", source)) | |
| answer = session.compiled.correct_answer_normalized | |
| if answer and answer not in seen: | |
| terms.append(("answer", answer)) | |
| return sorted(terms, key=lambda item: (item[0], item[1])) | |
| def _dedupe_alias(self, source: str, alias: str, used_aliases: set[str]) -> str: | |
| for candidate in (alias, source): | |
| if candidate and candidate not in used_aliases: | |
| return candidate | |
| suffix = 2 | |
| while True: | |
| candidate = f"{source} {suffix}" | |
| if candidate not in used_aliases and self._PARSER_SAFE_NAME_RE.fullmatch(candidate): | |
| return candidate | |
| suffix += 1 | |
| def _normalize_parser_safe_alias(self, value: str) -> str: | |
| alias = self._normalize_object_text(value) | |
| if not alias or not self._PARSER_SAFE_NAME_RE.fullmatch(alias): | |
| return "" | |
| return alias | |
| def _translation_cache_key(self, session: EpisodeSession) -> str: | |
| episode_id = getattr(session.compiled, "episode_id", "") or "session" | |
| return f"{episode_id}:{session.compiled.game_file}" | |
| def _translation_enabled(self) -> bool: | |
| return self.translation_mode != "none" | |
| def _preprocess_player_text(cls, text: str) -> str: | |
| normalized = re.sub(r"\s+", " ", text.strip().lower()) | |
| replacements = ( | |
| ("pick up ", "take "), | |
| ("grab ", "take "), | |
| ("using ", "with "), | |
| ("talk to ", "talk "), | |
| ("speak to ", "talk "), | |
| ) | |
| for source, target in replacements: | |
| normalized = normalized.replace(source, target) | |
| prefixes = ( | |
| "please ", | |
| "please, ", | |
| "can you ", | |
| "could you ", | |
| "would you ", | |
| "will you ", | |
| "go ahead and ", | |
| "i want to ", | |
| "i'd like to ", | |
| "try to ", | |
| ) | |
| stripped = True | |
| while stripped: | |
| stripped = False | |
| for prefix in prefixes: | |
| if normalized.startswith(prefix): | |
| normalized = normalized[len(prefix) :].strip() | |
| stripped = True | |
| normalized = cls._TRAILING_POLITENESS_RE.sub("", normalized).strip() | |
| return normalized | |
| def _extract_direction_command(text: str, admissible: set[str]) -> str | None: | |
| directions = [direction for direction in SUPPORTED_DIRECTIONS if re.search(rf"\b{direction}\b", text)] | |
| if len(directions) != 1: | |
| return None | |
| if not re.search(r"\b(go|head|move|walk|run|travel|enter|step)\b", text): | |
| return None | |
| candidate = f"go {directions[0]}" | |
| return candidate if candidate in admissible else None | |
| def _allow_unlisted_canonical(command: str) -> bool: | |
| return GeminiInterfaceAdapter._is_canonical_command(command) and not GeminiInterfaceAdapter._contains_conversational_fluff(command) | |
| def _contains_conversational_fluff(command: str) -> bool: | |
| return bool( | |
| re.search( | |
| r"\b(for me|please|thanks|thank you|could you|can you|would you|will you)\b", | |
| command, | |
| ) | |
| ) | |
| def _normalize_generated_command(text: str) -> str: | |
| normalized = re.sub(r"\s+", " ", text.strip().lower()) | |
| normalized = normalized.removeprefix("command: ").removeprefix("response: ").strip() | |
| normalized = normalized.rstrip(".!?") | |
| if normalized in SUPPORTED_DIRECTIONS: | |
| return "go " + normalized | |
| if normalized.startswith("talk to "): | |
| return "talk " + GeminiInterfaceAdapter._normalize_object_text(normalized[8:].strip()) | |
| if normalized.startswith("speak to "): | |
| return "talk " + GeminiInterfaceAdapter._normalize_object_text(normalized[9:].strip()) | |
| if normalized.startswith("answer "): | |
| return "submit " + normalized[7:].strip() | |
| if normalized.startswith("say "): | |
| return "submit " + normalized[4:].strip().strip("\"'") | |
| if normalized.startswith("combine ") and " and " in normalized: | |
| item_a, item_b = normalized[8:].split(" and ", 1) | |
| return "combine " + GeminiInterfaceAdapter._normalize_object_text(item_a) + " with " + GeminiInterfaceAdapter._normalize_object_text(item_b) | |
| if normalized.startswith("unlock ") and " with " in normalized: | |
| target, key = normalized[7:].split(" with ", 1) | |
| return "unlock " + GeminiInterfaceAdapter._normalize_object_text(target) + " with " + GeminiInterfaceAdapter._normalize_object_text(key) | |
| if normalized.startswith("use ") and " on " in normalized: | |
| item, target = normalized[4:].split(" on ", 1) | |
| return "use " + GeminiInterfaceAdapter._normalize_object_text(item) + " on " + GeminiInterfaceAdapter._normalize_object_text(target) | |
| if normalized.startswith("give ") and " to " in normalized: | |
| item, target = normalized[5:].split(" to ", 1) | |
| return "give " + GeminiInterfaceAdapter._normalize_object_text(item) + " to " + GeminiInterfaceAdapter._normalize_object_text(target) | |
| if normalized.startswith("combine ") and " with " in normalized: | |
| item_a, item_b = normalized[8:].split(" with ", 1) | |
| return "combine " + GeminiInterfaceAdapter._normalize_object_text(item_a) + " with " + GeminiInterfaceAdapter._normalize_object_text(item_b) | |
| if normalized.startswith(("open ", "read ", "talk ", "take ", "examine ")): | |
| verb, obj = normalized.split(" ", 1) | |
| return verb + " " + GeminiInterfaceAdapter._normalize_object_text(obj) | |
| return normalized | |
| def _normalize_object_text(text: str) -> str: | |
| object_text = GeminiInterfaceAdapter._ARTICLE_RE.sub(" ", text) | |
| return re.sub(r"\s+", " ", object_text).strip().lower() | |
| def _is_canonical_command(command: str) -> bool: | |
| if command in {"look", "inventory", "wait"}: | |
| return True | |
| if command.startswith("go "): | |
| return command[3:] in SUPPORTED_DIRECTIONS | |
| if command.startswith(("open ", "read ", "talk ", "submit ")): | |
| return bool(command.split(maxsplit=1)[1].strip()) | |
| if command.startswith("use "): | |
| return " on " in command and all(part.strip() for part in command[4:].split(" on ", 1)) | |
| if command.startswith("combine "): | |
| return " with " in command and all(part.strip() for part in command[8:].split(" with ", 1)) | |
| if command.startswith("give "): | |
| return " to " in command and all(part.strip() for part in command[5:].split(" to ", 1)) | |
| if command.startswith("take "): | |
| return bool(command.split(maxsplit=1)[1].strip()) | |
| if command.startswith("unlock "): | |
| if " with " not in command: | |
| return False | |
| door_text, key_text = command[7:].split(" with ", 1) | |
| return bool(door_text.strip() and key_text.strip()) | |
| return False | |
| def _sanitize_command_response(text: str) -> str: | |
| cleaned = text.strip().strip("`").strip().strip("\"'") | |
| if not cleaned: | |
| return "" | |
| first_line = cleaned.splitlines()[0].strip() | |
| if ":" in first_line: | |
| prefix, suffix = first_line.split(":", 1) | |
| if prefix.lower() in {"command", "response"}: | |
| first_line = suffix.strip() | |
| return re.sub(r"\s+", " ", first_line).strip().lower() | |
| def _sanitize_multiline_response(text: str) -> str: | |
| cleaned = GeminiInterfaceAdapter._sanitize_json_response(text) | |
| if not cleaned: | |
| return "" | |
| lines: list[str] = [] | |
| blank_run = 0 | |
| for raw_line in cleaned.splitlines(): | |
| line = raw_line.strip() | |
| if not line: | |
| blank_run += 1 | |
| if blank_run <= 1: | |
| lines.append("") | |
| continue | |
| blank_run = 0 | |
| if ":" in line: | |
| prefix, suffix = line.split(":", 1) | |
| if prefix.lower() == "observation": | |
| line = suffix.strip() | |
| lines.append(line) | |
| return "\n".join(lines).strip().strip("\"'") | |
| def _sanitize_json_response(text: str) -> str: | |
| cleaned = text.strip() | |
| if cleaned.startswith("```"): | |
| cleaned = re.sub(r"^```(?:json|text)?\s*", "", cleaned) | |
| cleaned = re.sub(r"\s*```$", "", cleaned) | |
| return cleaned.strip() | |
| def _should_preserve_feedback(feedback: str, state: GameState | None) -> bool: | |
| if '"' in feedback or "'" in feedback: | |
| return True | |
| if state is not None and (state.last_command or "").startswith("read"): | |
| return True | |
| return False | |
| def _resolve_admissible_command(candidate: str, admissible: set[str]) -> str | None: | |
| if candidate in admissible: | |
| return candidate | |
| if " " not in candidate: | |
| return None | |
| verb, remainder = candidate.split(" ", 1) | |
| candidate_tokens = [token for token in re.split(r"\s+", remainder) if token and token not in {"from", "with", "on", "to"}] | |
| matches: list[tuple[int, str]] = [] | |
| for option in admissible: | |
| if not option.startswith(verb + " "): | |
| continue | |
| option_tokens = [token for token in re.split(r"\s+", option[len(verb) + 1 :]) if token and token not in {"from", "with", "on", "to"}] | |
| if candidate_tokens and all(token in option_tokens for token in candidate_tokens): | |
| matches.append((len(option_tokens), option)) | |
| if not matches: | |
| return None | |
| matches.sort(key=lambda item: (item[0], item[1])) | |
| return matches[0][1] | |
| def sanitize_feedback_text(feedback: str) -> str: | |
| lines = feedback.replace("\r\n", "\n").splitlines() | |
| cleaned_lines: list[str] = [] | |
| for raw_line in lines: | |
| line = raw_line.rstrip() | |
| stripped = line.strip() | |
| if not stripped: | |
| cleaned_lines.append("") | |
| continue | |
| if _TEXTWORLD_PROMPT_LINE_RE.match(line): | |
| continue | |
| if stripped.startswith(">"): | |
| continue | |
| if _TEXTWORLD_META_LINE_RE.match(stripped): | |
| continue | |
| room_match = _TEXTWORLD_ROOM_HEADER_RE.match(stripped) | |
| if room_match: | |
| cleaned_lines.append(f"Location: {room_match.group('label').strip()}") | |
| continue | |
| if _is_probable_banner_line(stripped): | |
| continue | |
| cleaned_lines.append(stripped) | |
| start_index = 0 | |
| for index, line in enumerate(cleaned_lines): | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| if stripped.startswith("Explore ") or stripped.startswith("Location: ") or not _is_probable_banner_line(stripped): | |
| start_index = index | |
| break | |
| useful_lines = cleaned_lines[start_index:] | |
| collapsed: list[str] = [] | |
| blank_run = 0 | |
| for line in useful_lines: | |
| stripped = line.strip() | |
| if not stripped: | |
| blank_run += 1 | |
| if blank_run <= 1: | |
| collapsed.append("") | |
| continue | |
| blank_run = 0 | |
| collapsed.append(stripped) | |
| return "\n".join(collapsed).strip() | |
| def enrich_feedback_text(feedback: str, session: EpisodeSession) -> str: | |
| supplement_lines = _observation_context_lines(session) | |
| if not supplement_lines: | |
| return feedback.strip() | |
| merged: list[str] = [] | |
| base = feedback.strip() | |
| if base: | |
| merged.append(base) | |
| for line in supplement_lines: | |
| if line not in base: | |
| merged.append(line) | |
| return "\n\n".join(merged).strip() | |
| def _observation_context_lines(session: EpisodeSession) -> list[str]: | |
| visible = _visible_entities(session) | |
| exits = sorted(command[3:] for command in session.available_commands() if command.startswith("go ")) | |
| lines: list[str] = [] | |
| if visible: | |
| lines.append("Visible here: " + ", ".join(visible)) | |
| if exits: | |
| lines.append("Exits: " + ", ".join(exits)) | |
| return lines | |
| def _visible_entities(session: EpisodeSession) -> list[str]: | |
| visible: list[str] = [] | |
| seen: set[str] = set() | |
| for node in session.compiled.world.nodes: | |
| if getattr(node, "parent_id", None) != session.current_room_id: | |
| continue | |
| if node.type == "readable" and node.id not in session.revealed_readables: | |
| continue | |
| name = session.compiled.node_command_names.get(node.id) | |
| if name and name not in seen: | |
| seen.add(name) | |
| visible.append(name) | |
| for edge in session.compiled.world.edges: | |
| if edge.from_node_id != session.current_room_id or not edge.door_node_id: | |
| continue | |
| name = session.compiled.node_command_names.get(edge.door_node_id) | |
| if name and name not in seen: | |
| seen.add(name) | |
| visible.append(name) | |
| for item in session.compiled.world.items: | |
| if session.item_locations.get(item.id) != session.current_room_id: | |
| continue | |
| name = session.compiled.item_command_names.get(item.id) | |
| if name and name not in seen: | |
| seen.add(name) | |
| visible.append(name) | |
| return visible | |
| def _is_probable_banner_line(line: str) -> bool: | |
| if len(line) < 12: | |
| return False | |
| if line.startswith("Explore ") or line.startswith("Location: "): | |
| return False | |
| banner_chars = len(_TEXTWORLD_BANNER_CHAR_RE.findall(line)) | |
| return banner_chars >= max(4, len(line) // 6) | |