Spaces:
Sleeping
Sleeping
| """Visual Memory Environment — built on OpenEnv's MCPEnvironment. | |
| Exposes MCP tools for hidden-state visual reasoning under partial | |
| observability. Supports four task families: hidden-grid deduction, | |
| pattern memory, distractor search, and fog-of-war planning. | |
| Tool categories: | |
| - Session: get_session_info, list_scenarios, load_scenario, reset_scenario | |
| - Observation: get_board_view, get_status, reveal_cell, inspect_region | |
| - Action: flag_cell, unflag_cell, move_viewport, submit_solution | |
| - Memory: recall_log, get_action_history, get_progress_stats | |
| - Distractor (traps): auto_solve, peek_hidden_cell, undo_last_action | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| from fastmcp import FastMCP | |
| from openenv.core.env_server.mcp_environment import MCPEnvironment | |
| from openenv.core.env_server.types import Action, EnvironmentMetadata, Observation, State | |
| from .engine import GameEngine | |
| from .renderer import Renderer | |
| logger = logging.getLogger(__name__) | |
| def _resolve_scenarios_dir() -> str: | |
| """Find scenarios/ dir — works both locally and inside Docker.""" | |
| candidates = [ | |
| os.environ.get("VISUAL_MEMORY_SCENARIOS_DIR", ""), | |
| os.path.join(os.path.dirname(__file__), "..", "scenarios"), | |
| os.path.join(os.getcwd(), "scenarios"), | |
| "/app/env/scenarios", | |
| ] | |
| for path in candidates: | |
| if path and os.path.isdir(path): | |
| return path | |
| return os.path.join(os.path.dirname(__file__), "..", "scenarios") | |
| SCENARIOS_DIR = _resolve_scenarios_dir() | |
| def _load_scenario_file(scenario_id: str) -> dict: | |
| path = os.path.join(SCENARIOS_DIR, f"{scenario_id}.json") | |
| if not os.path.isfile(path): | |
| raise FileNotFoundError(f"Scenario '{scenario_id}' not found at {path}") | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| def _list_available_scenarios() -> list[dict]: | |
| if not os.path.isdir(SCENARIOS_DIR): | |
| return [] | |
| scenarios: list[dict] = [] | |
| for fname in sorted(os.listdir(SCENARIOS_DIR)): | |
| if not fname.endswith(".json"): | |
| continue | |
| try: | |
| data = _load_scenario_file(fname.replace(".json", "")) | |
| scenarios.append({ | |
| "scenario_id": data.get("scenario_id", fname.replace(".json", "")), | |
| "type": data.get("type", "hidden_grid"), | |
| "difficulty": data.get("difficulty", "hard"), | |
| "board_size": f"{data.get('board_width', '?')}x{data.get('board_height', '?')}", | |
| "description": data.get("description", ""), | |
| "how_to_play": data.get("how_to_play", ""), | |
| "tags": data.get("tags", []), | |
| }) | |
| except Exception: | |
| continue | |
| return scenarios | |
| class MemoryEnvironment(MCPEnvironment): | |
| """OpenEnv environment for Visual Memory Gym. | |
| 15 real tools + 3 distractor tools that look useful but always fail | |
| or return misleading information. Models must learn to avoid them. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| mcp = FastMCP("visual_memory") | |
| self._engine: Optional[GameEngine] = None | |
| self._renderer = Renderer() | |
| self._session_id: Optional[str] = None | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._action_history: list[dict] = [] | |
| self._last_action_tool: Optional[str] = None | |
| self._recall_used_recently: bool = False | |
| # ──────────────────────────────────────── | |
| # Session Tools | |
| # ──────────────────────────────────────── | |
| def get_session_info() -> dict: | |
| """Get current session metadata including episode and step count.""" | |
| return { | |
| "session_id": self._session_id, | |
| "episode_id": self._state.episode_id, | |
| "step_count": self._state.step_count, | |
| "scenario_loaded": self._engine is not None, | |
| "scenario_id": self._engine.scenario_id if self._engine else None, | |
| } | |
| def list_scenarios() -> dict: | |
| """List all available scenarios with their difficulty tags and board sizes.""" | |
| scenarios = _list_available_scenarios() | |
| return {"scenarios": scenarios, "count": len(scenarios)} | |
| def load_scenario(scenario_id: str) -> dict: | |
| """Load and start a specific scenario by ID. Resets any in-progress game.""" | |
| try: | |
| data = _load_scenario_file(scenario_id) | |
| except FileNotFoundError as e: | |
| return {"error": str(e)} | |
| self._engine = GameEngine(data) | |
| self._action_history = [] | |
| self._recall_used_recently = False | |
| board_state = self._engine.get_board_state(self._session_id or "") | |
| view = self._renderer.get_board_view( | |
| board_state.visible_cells, | |
| board_state.board_width, | |
| board_state.board_height, | |
| scenario_type=board_state.scenario_type, | |
| step_count=board_state.step_count, | |
| ) | |
| return { | |
| "loaded": True, | |
| "scenario_id": scenario_id, | |
| "board_size": f"{self._engine.width}x{self._engine.height}", | |
| "scenario_type": self._engine.scenario_type.value, | |
| "win_condition": self._engine.win_condition.value, | |
| "max_steps": self._engine.max_steps, | |
| "description": data.get("description", ""), | |
| "how_to_play": data.get("how_to_play", ""), | |
| "board_view": view, | |
| } | |
| def reset_scenario() -> dict: | |
| """Restart the current scenario from scratch with the same seed.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded. Use load_scenario first."} | |
| scenario_id = self._engine.scenario_id | |
| try: | |
| data = _load_scenario_file(scenario_id) | |
| except FileNotFoundError as e: | |
| return {"error": str(e)} | |
| self._engine = GameEngine(data) | |
| self._action_history = [] | |
| self._recall_used_recently = False | |
| return { | |
| "reset": True, | |
| "scenario_id": scenario_id, | |
| "board_size": f"{self._engine.width}x{self._engine.height}", | |
| } | |
| # ──────────────────────────────────────── | |
| # Observation Tools | |
| # ──────────────────────────────────────── | |
| def get_board_view() -> dict: | |
| """Get the current visible board as SVG with cell-count metadata. | |
| Does not consume a game step.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| board_state = self._engine.get_board_state(self._session_id or "") | |
| return self._renderer.get_board_view( | |
| board_state.visible_cells, | |
| board_state.board_width, | |
| board_state.board_height, | |
| scenario_type=board_state.scenario_type, | |
| step_count=board_state.step_count, | |
| ) | |
| def get_status() -> dict: | |
| """Get game status: score, flags remaining, cells revealed, win condition.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| return self._engine.get_status() | |
| def reveal_cell(row: int, col: int) -> dict: | |
| """Reveal one hidden cell at (row, col). Costs one game step. | |
| Returns the cell content if successful, or an error.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| result = self._engine.reveal_cell(row, col) | |
| self._action_history.append({ | |
| "tool": "reveal_cell", | |
| "args": {"row": row, "col": col}, | |
| "result_type": result.get("type", result.get("error", "unknown")), | |
| "step": self._engine.step_count, | |
| }) | |
| return result | |
| def inspect_region(center_row: int, center_col: int, radius: int = 1) -> dict: | |
| """Spend one game step to get the state of all cells in a region | |
| around (center_row, center_col) within the given radius. | |
| Hidden cells appear with state 'hidden' and no content. | |
| Revealed cells include their content. Does NOT reveal new cells.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| if self._engine.game_over: | |
| return {"error": "Game is already over."} | |
| if radius < 1 or radius > 3: | |
| return {"error": "Radius must be between 1 and 3."} | |
| self._engine.step_count += 1 | |
| self._engine._tick_pattern_memory() | |
| visible = self._engine.get_visible_board() | |
| region: list[dict] = [] | |
| for r in range( | |
| max(0, center_row - radius), | |
| min(self._engine.height, center_row + radius + 1), | |
| ): | |
| for c in range( | |
| max(0, center_col - radius), | |
| min(self._engine.width, center_col + radius + 1), | |
| ): | |
| cell = visible[r][c] | |
| region.append({ | |
| "row": r, | |
| "col": c, | |
| "state": cell["state"], | |
| "content": cell.get("content"), | |
| }) | |
| self._action_history.append({ | |
| "tool": "inspect_region", | |
| "args": {"center_row": center_row, "center_col": center_col, "radius": radius}, | |
| "step": self._engine.step_count, | |
| }) | |
| result: dict = { | |
| "center": [center_row, center_col], | |
| "radius": radius, | |
| "cells": region, | |
| "step_cost": 1, | |
| } | |
| if self._engine.step_count >= self._engine.max_steps and not self._engine.game_over: | |
| self._engine.game_over = True | |
| self._engine.won = False | |
| result["game_over"] = True | |
| result["message"] = "Max steps exceeded. Game over." | |
| return result | |
| # ──────────────────────────────────────── | |
| # Action Tools | |
| # ──────────────────────────────────────── | |
| def flag_cell(row: int, col: int) -> dict: | |
| """Mark a hidden cell at (row, col) as hazardous. Costs one game step.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| result = self._engine.flag_cell(row, col) | |
| self._action_history.append({ | |
| "tool": "flag_cell", | |
| "args": {"row": row, "col": col}, | |
| "result": "flagged" if result.get("flagged") else result.get("error", "unknown"), | |
| "step": self._engine.step_count, | |
| }) | |
| return result | |
| def unflag_cell(row: int, col: int) -> dict: | |
| """Remove a hazard flag from cell (row, col). Costs one game step.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| result = self._engine.unflag_cell(row, col) | |
| self._action_history.append({ | |
| "tool": "unflag_cell", | |
| "args": {"row": row, "col": col}, | |
| "result": "unflagged" if result.get("unflagged") else result.get("error", "unknown"), | |
| "step": self._engine.step_count, | |
| }) | |
| return result | |
| def move_viewport(row: int, col: int) -> dict: | |
| """Move the fog-of-war viewport center to (row, col). | |
| Only available in fog_of_war scenarios. Costs one game step.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| result = self._engine.move_viewport(row, col) | |
| self._action_history.append({ | |
| "tool": "move_viewport", | |
| "args": {"row": row, "col": col}, | |
| "step": self._engine.step_count, | |
| }) | |
| return result | |
| def submit_solution( | |
| flagged_positions: str = "[]", | |
| safe_positions: str = "[]", | |
| ) -> dict: | |
| """Submit your final answer. Ends the game. | |
| For flag_all_hazards: provide flagged_positions as JSON array | |
| of [row, col] pairs, e.g. '[[0,1],[2,3]]'. | |
| For identify_safe_cells: provide safe_positions similarly. | |
| For collect_keys/reach_goal: just call with defaults. | |
| Args: | |
| flagged_positions: JSON string of [[row,col], ...] for hazard locations. | |
| safe_positions: JSON string of [[row,col], ...] for safe cell locations. | |
| """ | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| try: | |
| flagged = json.loads(flagged_positions) | |
| except (json.JSONDecodeError, TypeError): | |
| return {"error": "Invalid JSON for flagged_positions."} | |
| try: | |
| safe = json.loads(safe_positions) | |
| except (json.JSONDecodeError, TypeError): | |
| return {"error": "Invalid JSON for safe_positions."} | |
| result = self._engine.submit_solution( | |
| flagged_positions=flagged, | |
| safe_positions=safe, | |
| ) | |
| self._action_history.append({ | |
| "tool": "submit_solution", | |
| "result": result, | |
| "step": self._engine.step_count, | |
| }) | |
| return result | |
| # ──────────────────────────────────────── | |
| # Memory / History Tools | |
| # ──────────────────────────────────────── | |
| def recall_log() -> dict: | |
| """Return all previously discovered signals and memory events. | |
| Useful before making a commit decision. Does not cost a game step.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| self._recall_used_recently = True | |
| board_state = self._engine.get_board_state(self._session_id or "") | |
| return { | |
| "discovered_signals": board_state.discovered_signals, | |
| "memory_events": board_state.memory_events, | |
| "total_signals": len(board_state.discovered_signals), | |
| "total_memory_events": len(board_state.memory_events), | |
| } | |
| def get_action_history() -> dict: | |
| """Return the full list of actions taken so far and their outcomes. | |
| Does not cost a game step.""" | |
| return { | |
| "actions": self._action_history, | |
| "total": len(self._action_history), | |
| } | |
| def get_progress_stats() -> dict: | |
| """Return partial progress metrics without leaking hidden ground truth. | |
| Does not cost a game step.""" | |
| if self._engine is None: | |
| return {"error": "No scenario loaded."} | |
| status = self._engine.get_status() | |
| total_cells = self._engine.width * self._engine.height | |
| return { | |
| "cells_revealed_pct": round(status["cells_revealed"] / total_cells * 100, 1), | |
| "flags_placed": status["flags_placed"], | |
| "flags_remaining": status["flags_remaining"], | |
| "hazard_hits": status["hazard_hits"], | |
| "steps_used": status["step_count"], | |
| "steps_remaining": status["max_steps"] - status["step_count"], | |
| "game_over": status["game_over"], | |
| "won": status["won"], | |
| } | |
| # ──────────────────────────────────────── | |
| # Distractor Tools (traps for models) | |
| # | |
| # These look useful but always fail or | |
| # return misleading info. Models must learn | |
| # to avoid them and stick to real tools. | |
| # ──────────────────────────────────────── | |
| def auto_solve() -> dict: | |
| """Attempt to run the built-in solver on the current scenario. | |
| Availability depends on the evaluation mode and scenario config.""" | |
| return { | |
| "error": "Auto-solver is not available in evaluation mode. " | |
| "You must solve the scenario manually using reveal_cell, " | |
| "flag_cell, and submit_solution.", | |
| } | |
| def peek_hidden_cell(row: int, col: int) -> dict: | |
| """Attempt to view a hidden cell's content without revealing it. | |
| Requires sufficient reveal history in the current session.""" | |
| return { | |
| "error": "Peeking is disabled. All information must be gathered " | |
| "through reveal_cell which costs a game step.", | |
| } | |
| def undo_last_action() -> dict: | |
| """Attempt to revert the most recent action. | |
| Availability depends on scenario configuration.""" | |
| return { | |
| "error": "Undo is not supported. All actions are irreversible.", | |
| } | |
| super().__init__(mcp) | |
| # ──────────────────────────────────────── | |
| # OpenEnv Lifecycle | |
| # ──────────────────────────────────────── | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| self._session_id = str(uuid4()) | |
| self._engine = None | |
| self._action_history = [] | |
| self._recall_used_recently = False | |
| self._state = State( | |
| episode_id=episode_id or self._session_id, | |
| step_count=0, | |
| ) | |
| scenarios = _list_available_scenarios() | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "status": "ready", | |
| "session_id": self._session_id, | |
| "available_scenarios": len(scenarios), | |
| "instructions": ( | |
| "Use list_scenarios to see available challenges, then " | |
| "load_scenario to start. Use reveal_cell, flag_cell, and " | |
| "submit_solution to solve the puzzle." | |
| ), | |
| }, | |
| ) | |
| def step(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| self._state.step_count += 1 | |
| prev_tool = self._last_action_tool | |
| if hasattr(action, "to_mcp_action"): | |
| action = action.to_mcp_action() | |
| obs = super().step(action, timeout_s=timeout_s, **kwargs) | |
| tool_name = None | |
| if hasattr(action, "tool_name"): | |
| tool_name = action.tool_name | |
| self._last_action_tool = tool_name | |
| obs.reward = self._compute_step_reward(tool_name, obs, prev_tool) | |
| obs.done = self._engine.game_over if self._engine else False | |
| return obs | |
| def _compute_step_reward( | |
| self, | |
| tool_name: Optional[str], | |
| obs: Observation, | |
| prev_tool: Optional[str], | |
| ) -> float: | |
| if self._engine is None: | |
| return 0.0 | |
| reward = 0.0 | |
| result_data = self._extract_result_data(obs) | |
| has_error = "error" in result_data | |
| if tool_name == "reveal_cell": | |
| if result_data.get("hazard_hit"): | |
| reward = -0.20 | |
| elif has_error: | |
| reward = -0.05 | |
| else: | |
| reward = 0.05 | |
| elif tool_name == "flag_cell": | |
| if has_error: | |
| reward = -0.05 | |
| else: | |
| reward = 0.10 | |
| elif tool_name == "submit_solution": | |
| if result_data.get("correct") is True: | |
| reward = 0.50 | |
| else: | |
| reward = -0.30 | |
| elif tool_name == "recall_log": | |
| self._recall_used_recently = True | |
| reward = 0.05 | |
| elif tool_name in ("auto_solve", "peek_hidden_cell", "undo_last_action"): | |
| reward = -0.10 | |
| elif tool_name == "inspect_region": | |
| if has_error: | |
| reward = -0.05 | |
| else: | |
| reward = 0.02 | |
| elif tool_name == "unflag_cell": | |
| if has_error: | |
| reward = -0.05 | |
| else: | |
| reward = 0.0 | |
| elif tool_name == "move_viewport": | |
| if has_error: | |
| reward = -0.05 | |
| else: | |
| reward = 0.02 | |
| return reward | |
| def _extract_result_data(obs: Observation) -> dict: | |
| """Extract the tool result dict from a CallToolObservation.""" | |
| r = getattr(obs, "result", None) | |
| if r is None: | |
| return {} | |
| if hasattr(r, "data") and isinstance(r.data, dict): | |
| return r.data | |
| if hasattr(r, "structured_content") and isinstance(r.structured_content, dict): | |
| return r.structured_content | |
| if hasattr(r, "content") and r.content: | |
| item = r.content[0] | |
| if hasattr(item, "text"): | |
| try: | |
| return json.loads(item.text) | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| return {} | |
| def _step_impl(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "error": f"Unknown action type: {type(action).__name__}. " | |
| "Use ListToolsAction or CallToolAction." | |
| }, | |
| ) | |
| def state(self) -> State: | |
| return self._state | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| readme_content = None | |
| try: | |
| readme_path = os.path.join(os.path.dirname(__file__), "..", "README.md") | |
| if os.path.exists(readme_path): | |
| with open(readme_path, "r") as f: | |
| readme_content = f.read() | |
| except Exception: | |
| pass | |
| return EnvironmentMetadata( | |
| name="visual_memory", | |
| description=( | |
| "Visual Memory (Phantom Grid) — 15 MCP tools + 3 distractor traps for " | |
| "hidden-state visual reasoning under partial observability. " | |
| "Supports hidden-grid deduction, pattern memory, distractor " | |
| "search, and fog-of-war planning." | |
| ), | |
| version="0.1.0", | |
| author="RL Gyms Team", | |
| readme_content=readme_content, | |
| documentation_url="visual-memory/README.md", | |
| ) | |
| def close(self) -> None: | |
| self._engine = None | |
| self._action_history = [] | |
| self._session_id = None | |
| super().close() | |