Spaces:
Sleeping
Sleeping
| """Spreadsheet Environment β MCPEnvironment with 13 real MCP tools.""" | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| from fastmcp import FastMCP | |
| from openenv.core.env_server.mcp_environment import MCPEnvironment | |
| from openenv.core.env_server.types import Action, EnvironmentMetadata, Observation, State | |
| from .scenario_loader import list_scenarios as _list_scenarios | |
| from .scenario_loader import load_scenario_def, prepare_workbook_for_session | |
| from .workbook_engine import WorkbookEngine, WorkbookSession | |
| WRITE_TOOLS = frozenset({"write_cell", "write_range"}) | |
| READ_TOOLS = frozenset({"read_range", "read_cell"}) | |
| class SpreadsheetEnvironment(MCPEnvironment): | |
| """Workbook manipulation environment β 13 MCP tools exposed via OpenEnv.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self): | |
| mcp = FastMCP("spreadsheet") | |
| self._session_id: Optional[str] = None | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._action_history: list[dict] = [] | |
| self._engine = WorkbookEngine() | |
| self._scenario: Optional[dict] = None | |
| self._last_validate_passed: int = 0 | |
| def _record(tool_name: str, **kwargs: Any) -> None: | |
| self._action_history.append({"tool": tool_name, "arguments": kwargs}) | |
| # ββ Tool 1: get_session_info ββββββββββββββββββββββββββββββββββ | |
| def get_session_info() -> dict: | |
| """Return current session metadata: session ID, loaded scenario, step count, edit count, and solve status.""" | |
| _record("get_session_info") | |
| if not self._session_id: | |
| return {"status": "no_session", "message": "Reset the environment first."} | |
| return self._engine.get_session_info(self._session_id) | |
| # ββ Tool 2: list_scenarios ββββββββββββββββββββββββββββββββββββ | |
| def list_scenarios() -> dict: | |
| """List all available spreadsheet task scenarios. Each entry has a scenario_id, description, workbook name, and max_steps.""" | |
| _record("list_scenarios") | |
| scenarios = _list_scenarios() | |
| return {"scenarios": scenarios, "count": len(scenarios)} | |
| # ββ Tool 3: load_scenario βββββββββββββββββββββββββββββββββββββ | |
| def load_scenario(scenario_id: str) -> dict: | |
| """Load a scenario and its workbook to begin working on a task. | |
| Args: | |
| scenario_id: The ID of the scenario to load (from list_scenarios). | |
| Returns the scenario description, instructions, sheet list, and target regions. | |
| """ | |
| _record("load_scenario", scenario_id=scenario_id) | |
| try: | |
| scenario_def = load_scenario_def(scenario_id) | |
| except FileNotFoundError as e: | |
| return {"error": str(e)} | |
| wb_path = prepare_workbook_for_session(scenario_id, self._session_id) | |
| session = WorkbookSession( | |
| session_id=self._session_id, | |
| scenario_id=scenario_id, | |
| workbook_path=wb_path, | |
| ) | |
| self._engine.load_workbook(session) | |
| self._scenario = scenario_def | |
| self._last_validate_passed = 0 | |
| sheets = self._engine.list_sheets(self._session_id) | |
| targets = self._engine.get_named_targets(self._session_id) | |
| return { | |
| "scenario_id": scenario_id, | |
| "description": scenario_def.get("description", ""), | |
| "instructions": scenario_def.get("instructions", ""), | |
| "max_steps": scenario_def.get("max_steps", 50), | |
| "sheets": sheets, | |
| "target_regions": targets, | |
| } | |
| # ββ Tool 4: list_sheets βββββββββββββββββββββββββββββββββββββββ | |
| def list_sheets() -> dict: | |
| """List all sheets in the current workbook with their names, row/column dimensions, and visibility state. | |
| Returns an error if no scenario is loaded. | |
| """ | |
| _record("list_sheets") | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| sheets = self._engine.list_sheets(self._session_id) | |
| return {"sheets": sheets} | |
| # ββ Tool 5: read_range ββββββββββββββββββββββββββββββββββββββββ | |
| def read_range(sheet: str, range: str) -> dict: | |
| """Read a rectangular range of cells from a sheet. | |
| Args: | |
| sheet: Sheet name (e.g. "Summary", "Engineering"). | |
| range: Cell range in A1 notation (e.g. "A1", "B2:D10", "A1:Z100"). | |
| Returns a 2D array of cell values. Formulas are shown as their formula strings (e.g. "=SUM(A1:A10)"). | |
| """ | |
| _record("read_range", sheet=sheet, range=range) | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| try: | |
| data = self._engine.read_range(self._session_id, sheet, range) | |
| return {"sheet": sheet, "range": range, "data": data} | |
| except (ValueError, KeyError) as e: | |
| return {"error": str(e)} | |
| # ββ Tool 6: write_cell ββββββββββββββββββββββββββββββββββββββββ | |
| def write_cell(sheet: str, cell: str, value: str) -> dict: | |
| """Write a value or formula to a single cell. | |
| Args: | |
| sheet: Sheet name. | |
| cell: Cell reference in A1 notation (e.g. "C15"). | |
| value: The value to write. Use "=" prefix for formulas (e.g. "=SUM(A1:A10)"). | |
| Numeric strings are auto-converted to numbers. | |
| Returns confirmation of the write. | |
| """ | |
| _record("write_cell", sheet=sheet, cell=cell, value=value) | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| try: | |
| parsed = _parse_value(value) | |
| result = self._engine.write_cell(self._session_id, sheet, cell, parsed) | |
| return result | |
| except (ValueError, KeyError) as e: | |
| return {"error": str(e)} | |
| # ββ Tool 7: write_range βββββββββββββββββββββββββββββββββββββββ | |
| def write_range(sheet: str, start_cell: str, data: str) -> dict: | |
| """Write a 2D block of values starting from a cell. | |
| Args: | |
| sheet: Sheet name. | |
| start_cell: Top-left cell in A1 notation (e.g. "A1"). | |
| data: JSON string of a 2D array, e.g. '[[1, 2], [3, 4]]'. | |
| Use "=" prefix for formulas within cells. | |
| Returns the range written and cell count. | |
| """ | |
| _record("write_range", sheet=sheet, start_cell=start_cell, data=data) | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| try: | |
| parsed_data = json.loads(data) | |
| if not isinstance(parsed_data, list): | |
| return {"error": "data must be a JSON 2D array, e.g. '[[1, 2], [3, 4]]'"} | |
| converted = [[_parse_value(str(v)) for v in row] for row in parsed_data] | |
| result = self._engine.write_range(self._session_id, sheet, start_cell, converted) | |
| return result | |
| except json.JSONDecodeError: | |
| return {"error": "Invalid JSON in data parameter."} | |
| except (ValueError, KeyError) as e: | |
| return {"error": str(e)} | |
| # ββ Tool 8: inspect_formula βββββββββββββββββββββββββββββββββββ | |
| def inspect_formula(sheet: str, cell: str) -> dict: | |
| """Return the raw formula string from a cell, or indicate it's not a formula. | |
| Args: | |
| sheet: Sheet name. | |
| cell: Cell reference (e.g. "C15"). | |
| Returns the formula string if the cell contains one, or is_formula=false otherwise. | |
| """ | |
| _record("inspect_formula", sheet=sheet, cell=cell) | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| try: | |
| return self._engine.inspect_formula(self._session_id, sheet, cell) | |
| except (ValueError, KeyError) as e: | |
| return {"error": str(e)} | |
| # ββ Tool 9: list_named_targets ββββββββββββββββββββββββββββββββ | |
| def list_named_targets() -> dict: | |
| """Show the target areas and allowed output zones for the current scenario. | |
| Target regions are the cells/ranges where the agent is expected to write. | |
| Writing outside these areas may incur a penalty. | |
| """ | |
| _record("list_named_targets") | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| targets = self._engine.get_named_targets(self._session_id) | |
| return {"target_regions": targets} | |
| # ββ Tool 10: validate_partial βββββββββββββββββββββββββββββββββ | |
| def validate_partial() -> dict: | |
| """Check partial progress on the current scenario. | |
| Returns the number of hidden test checks that pass and fail, | |
| without revealing the specific expected answers. Use this to | |
| gauge progress before submitting. | |
| """ | |
| _record("validate_partial") | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| result = self._engine.validate_partial(self._session_id) | |
| self._last_validate_passed = result.get("passed", 0) | |
| return result | |
| # ββ Tool 11: submit_workbook ββββββββββββββββββββββββββββββββββ | |
| def submit_workbook() -> dict: | |
| """Submit the workbook for final evaluation against hidden tests. | |
| Runs all hidden test checks and returns structured results including | |
| pass rate, per-check pass/fail, and whether the scenario is fully solved. | |
| """ | |
| _record("submit_workbook") | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| result = self._engine.run_hidden_tests(self._session_id) | |
| return result | |
| # ββ Tool 12: get_edit_history βββββββββββββββββββββββββββββββββ | |
| def get_edit_history() -> dict: | |
| """Return the full list of cell edits made in this session, in order. | |
| Each entry shows the sheet, cell, value written, and the step number. | |
| """ | |
| _record("get_edit_history") | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| history = self._engine.get_edit_history(self._session_id) | |
| return {"edits": history, "count": len(history)} | |
| # ββ Tool 13: reset_scenario βββββββββββββββββββββββββββββββββββ | |
| def reset_scenario() -> dict: | |
| """Restore the workbook to its original state, discarding all edits. | |
| The scenario remains loaded; you do not need to call load_scenario again. | |
| """ | |
| _record("reset_scenario") | |
| if not self._session_id or self._session_id not in self._engine._sessions: | |
| return {"error": "No workbook loaded. Use load_scenario first."} | |
| self._engine.reset_workbook(self._session_id) | |
| self._last_validate_passed = 0 | |
| sheets = self._engine.list_sheets(self._session_id) | |
| return {"message": "Workbook reset to original state.", "sheets": sheets} | |
| super().__init__(mcp) | |
| # ββ Lifecycle βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| if self._session_id and self._session_id in self._engine._sessions: | |
| self._engine.close_session(self._session_id) | |
| self._session_id = str(uuid4()) | |
| self._state = State( | |
| episode_id=episode_id or self._session_id, | |
| step_count=0, | |
| ) | |
| self._scenario = None | |
| self._action_history = [] | |
| self._last_validate_passed = 0 | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "status": "ready", | |
| "session_id": self._session_id, | |
| "instructions": ( | |
| "Use list_scenarios to see available tasks, then load_scenario to begin. " | |
| "Read the workbook structure with list_sheets and read_range before making edits. " | |
| "Use submit_workbook when done." | |
| ), | |
| }, | |
| ) | |
| def step(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| self._state.step_count += 1 | |
| if hasattr(action, "to_mcp_action"): | |
| action = action.to_mcp_action() | |
| obs = super().step(action, timeout_s=timeout_s, **kwargs) | |
| tool_name = getattr(action, "tool_name", None) | |
| args = getattr(action, "arguments", None) or {} | |
| result = getattr(obs, "result", None) | |
| if hasattr(result, "data"): | |
| result = result.data | |
| elif isinstance(result, dict) and "data" in result: | |
| result = result["data"] | |
| if not isinstance(result, dict): | |
| result = {} | |
| reward = self._compute_step_reward(tool_name, args, result) | |
| if reward != 0: | |
| obs.reward = (obs.reward or 0) + reward | |
| session = self._engine._sessions.get(self._session_id) | |
| if session: | |
| obs.done = session.solved | |
| return obs | |
| def _compute_step_reward(self, tool_name: Optional[str], args: dict, result: dict) -> float: | |
| """Layer 1 per-step reward heuristics (internal, approximate).""" | |
| if isinstance(result, dict) and result.get("error"): | |
| return 0.0 | |
| if tool_name == "inspect_formula": | |
| return 0.05 | |
| if tool_name == "validate_partial": | |
| new_passed = result.get("passed", 0) | |
| if new_passed > self._last_validate_passed: | |
| return 0.10 | |
| return 0.05 | |
| if tool_name in WRITE_TOOLS: | |
| sheet = args.get("sheet", "") | |
| cell = args.get("cell", args.get("start_cell", "")) | |
| in_target = True | |
| if self._session_id and self._session_id in self._engine._sessions: | |
| in_target = self._engine.is_in_target_region(self._session_id, sheet, cell) | |
| if not in_target: | |
| return -0.10 | |
| recent_reads = any( | |
| a["tool"] in ("read_range", "read_cell") | |
| for a in self._action_history[-4:-1] | |
| ) | |
| reward = 0.05 | |
| if recent_reads: | |
| reward += 0.05 | |
| if self._session_id and self._session_id in self._engine._sessions: | |
| cell_ref = cell.upper() | |
| write_count = sum( | |
| 1 for a in self._action_history | |
| if a["tool"] in WRITE_TOOLS | |
| and a["arguments"].get("cell", a["arguments"].get("start_cell", "")).upper() == cell_ref | |
| and a["arguments"].get("sheet", "") == sheet | |
| ) | |
| if write_count >= 3: | |
| reward -= 0.05 | |
| return reward | |
| if tool_name in READ_TOOLS: | |
| return 0.0 | |
| if tool_name == "submit_workbook": | |
| pass_rate = result.get("pass_rate", 0) | |
| if pass_rate == 1.0: | |
| return 0.50 | |
| if pass_rate > 0.5: | |
| return 0.20 | |
| if pass_rate < 0.3: | |
| return -0.10 | |
| return 0.0 | |
| return 0.0 | |
| def _step_impl(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "error": f"Unknown action type: {type(action).__name__}. " | |
| "Use ListToolsAction or CallToolAction." | |
| }, | |
| ) | |
| def state(self) -> State: | |
| return self._state | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| return EnvironmentMetadata( | |
| name="spreadsheet", | |
| description="Spreadsheet β exact workbook manipulation and reasoning over realistic spreadsheet tasks", | |
| version="0.1.0", | |
| ) | |
| def _parse_value(value: str) -> Any: | |
| """Convert string input to appropriate Python type for cell writing.""" | |
| if isinstance(value, str) and value.startswith("="): | |
| return value | |
| try: | |
| if "." in value: | |
| return float(value) | |
| return int(value) | |
| except (ValueError, TypeError): | |
| pass | |
| if value.lower() in ("true",): | |
| return True | |
| if value.lower() in ("false",): | |
| return False | |
| if value.lower() in ("none", "null", ""): | |
| return None | |
| return value | |