Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Discovery Environment MCP Server. | |
| Exposes a black-box dynamical system to an LLM agent via MCP tools. | |
| The agent can query the system but never sees the rule implementation. | |
| Usage: | |
| python discovery_env_server/server.py # default: G01 | |
| PROBLEM_ID=G02 python discovery_env_server/server.py | |
| Logging: All interactions are written to experiments/logs/ for the dashboard. | |
| Dashboard: http://127.0.0.1:8787 | |
| """ | |
| import json | |
| import sys | |
| import os | |
| import time | |
| import numpy as np | |
| from pathlib import Path | |
| # Add project root so we can import discovery_env | |
| PROJECT_ROOT = str(Path(__file__).parent.parent) | |
| sys.path.insert(0, PROJECT_ROOT) | |
| from mcp.server.fastmcp import FastMCP | |
| from discovery_env import get_problem | |
| from discovery_env.scoring import compile_law, _generate_test_states, strip_comments, stripped_code_length | |
| # ============================================================ | |
| # LOGGING (writes to experiments/logs/ for dashboard) | |
| # ============================================================ | |
| LOG_DIR = Path(PROJECT_ROOT) / "experiments" / "logs" | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| PROBLEM_ID = os.environ.get("PROBLEM_ID", "G01") | |
| AGENT_WORKSPACE = os.environ.get("AGENT_WORKSPACE", "") | |
| _env = get_problem(PROBLEM_ID) | |
| _start_time = time.time() | |
| _query_count = 0 | |
| _log_path = LOG_DIR / f"{PROBLEM_ID}_{int(_start_time)}.jsonl" | |
| # Point the LATEST.txt so dashboard can find us | |
| (LOG_DIR / "LATEST.txt").write_text(str(_log_path)) | |
| class _Enc(json.JSONEncoder): | |
| def default(self, o): | |
| if isinstance(o, np.ndarray): | |
| return o.tolist() | |
| if isinstance(o, (np.integer,)): | |
| return int(o) | |
| if isinstance(o, (np.floating,)): | |
| return float(o) | |
| return super().default(o) | |
| def _log(action: str, data: dict): | |
| entry = { | |
| "action": action, | |
| "t": round(time.time() - _start_time, 3), | |
| "query_num": _query_count, | |
| **data, | |
| } | |
| with open(_log_path, "a") as f: | |
| f.write(json.dumps(entry, cls=_Enc) + "\n") | |
| # Log session start | |
| _log("session_start", { | |
| "problem_id": PROBLEM_ID, | |
| "shape_info": _env.get_state_shape(), | |
| }) | |
| # ============================================================ | |
| # MCP SERVER | |
| # ============================================================ | |
| mcp = FastMCP("discovery-env") | |
| def _save_data(name: str, data) -> str: | |
| """Save data to a JSON file in the agent workspace. Returns the file path.""" | |
| if not AGENT_WORKSPACE: | |
| return "" | |
| data_dir = Path(AGENT_WORKSPACE) / "data" | |
| data_dir.mkdir(exist_ok=True) | |
| fpath = data_dir / f"{name}.json" | |
| with open(fpath, "w") as f: | |
| json.dump(data, f) | |
| return str(fpath) | |
| def simulate(state_json: str, n_steps: int) -> str: | |
| """Simulate the system forward n_steps from the given initial state. | |
| Args: | |
| state_json: JSON array representing the current state. | |
| For 2D grids: [[0,1,2],[3,4,0],...] | |
| n_steps: Number of timesteps to simulate (1-100). | |
| Returns: | |
| JSON with trajectory file path and summary. Load the file for full data. | |
| """ | |
| global _query_count | |
| _query_count += 1 | |
| state = np.array(json.loads(state_json)) | |
| n_steps = min(max(1, n_steps), 100) | |
| _env.set_initial_conditions(state) | |
| trajectory = [] | |
| for _ in range(n_steps): | |
| state = _env.step(1) | |
| trajectory.append(state.tolist()) | |
| cells_changed = int(np.sum(np.array(json.loads(state_json)) != np.array(trajectory[-1]))) | |
| _log("step", { | |
| "n": n_steps, | |
| "state": trajectory[-1], | |
| "cells_changed": cells_changed, | |
| "step_count": _env.step_count, | |
| }) | |
| fpath = _save_data(f"trajectory_q{_query_count}", trajectory) | |
| if fpath: | |
| return json.dumps({ | |
| "n_steps": n_steps, | |
| "cells_changed": cells_changed, | |
| "file": fpath, | |
| "hint": "Load the file with json.load() to access full trajectory data.", | |
| }) | |
| return json.dumps(trajectory) | |
| def random_state(seed: int = 0) -> str: | |
| """Generate a random initial condition for the system. | |
| Args: | |
| seed: Random seed for reproducibility (0 = random). | |
| Returns: | |
| JSON with state file path. Load the file with json.load() for the grid data. | |
| """ | |
| global _query_count | |
| _query_count += 1 | |
| meta = _env.get_state_shape() | |
| rows, cols = meta["rows"], meta["cols"] | |
| vals = meta.get("values", "0-1") | |
| lo, hi = [int(x) for x in vals.split("-")] | |
| rng = np.random.default_rng(seed if seed > 0 else None) | |
| state = rng.integers(lo, hi + 1, size=(rows, cols)) | |
| _log("random_state", {"seed": seed, "state": state.tolist()}) | |
| fpath = _save_data(f"state_seed{seed}", state.tolist()) | |
| if fpath: | |
| return json.dumps({ | |
| "seed": seed, | |
| "shape": [rows, cols], | |
| "file": fpath, | |
| "hint": "Load the file with json.load() to access the state grid.", | |
| }) | |
| return json.dumps(state.tolist()) | |
| def get_system_info() -> str: | |
| """Get a description of the system being investigated. | |
| Returns information about the state space (dimensions, value range). | |
| Does NOT reveal the update rule. | |
| """ | |
| meta = _env.get_state_shape() | |
| _log("get_info", {"info": meta}) | |
| return json.dumps({ | |
| "type": meta["type"], | |
| "rows": meta["rows"], | |
| "cols": meta["cols"], | |
| "values": meta["values"], | |
| "description": meta["description"], | |
| }) | |
| def submit_rule(code: str) -> str: | |
| """Submit a proposed update rule for scoring. | |
| Args: | |
| code: Python code defining a function predict_next(grid) -> next_grid. | |
| The function receives a numpy array and must return the predicted | |
| next state as a numpy array. | |
| Returns: | |
| JSON with scoring results: functional_accuracy, parsimony_bonus, etc. | |
| """ | |
| fn = compile_law(code) | |
| if fn is None: | |
| result = {"functional_accuracy": 0.0, "error": "Could not compile code", "total": 0.0} | |
| _log("submit", {"result": result}) | |
| return json.dumps(result) | |
| test_states = _generate_test_states(_env, n=500, seed=9999) | |
| correct = 0 | |
| for state in test_states: | |
| expected = _env.get_true_next(state) | |
| try: | |
| predicted = fn(state.copy()) | |
| if isinstance(predicted, np.ndarray) and np.array_equal(predicted, expected): | |
| correct += 1 | |
| except Exception: | |
| pass | |
| accuracy = correct / len(test_states) | |
| code_len = len(code.strip()) | |
| # Delta-DL parsimony | |
| agent_dl = stripped_code_length(code) | |
| try: | |
| ref_code = _env.__class__.reference_code() | |
| ref_dl = stripped_code_length(ref_code) | |
| except NotImplementedError: | |
| ref_code = None | |
| ref_dl = 0 | |
| delta_dl = max(0, agent_dl - ref_dl) | |
| max_delta = 300 | |
| parsimony = 0.2 * max(0.0, 1.0 - delta_dl / max_delta) | |
| efficiency = 0.1 * max(0.0, 1.0 - _query_count / 60) | |
| result = { | |
| "functional_accuracy": accuracy, | |
| "parsimony_bonus": round(parsimony, 4), | |
| "efficiency_bonus": round(efficiency, 4), | |
| "total": round(accuracy + parsimony + efficiency, 4), | |
| "correct_states": correct, | |
| "total_states": len(test_states), | |
| "queries_used": _query_count, | |
| "code_length": code_len, | |
| "agent_dl": agent_dl, | |
| "reference_dl": ref_dl, | |
| "delta_dl": delta_dl, | |
| } | |
| _log("submit", {"result": result}) | |
| return json.dumps(result) | |
| if __name__ == "__main__": | |
| print(f"Discovery Env MCP Server: {PROBLEM_ID}", file=sys.stderr) | |
| print(f"Log: {_log_path}", file=sys.stderr) | |
| mcp.run(transport="stdio") | |