#!/usr/bin/env python3 """Discovery Environment MCP Server. Exposes a black-box dynamical system to an LLM agent via MCP tools. The agent can query the system but never sees the rule implementation. Usage: python discovery_env_server/server.py # default: G01 PROBLEM_ID=G02 python discovery_env_server/server.py Logging: All interactions are written to experiments/logs/ for the dashboard. Dashboard: http://127.0.0.1:8787 """ import json import sys import os import time import numpy as np from pathlib import Path # Add project root so we can import discovery_env PROJECT_ROOT = str(Path(__file__).parent.parent) sys.path.insert(0, PROJECT_ROOT) from mcp.server.fastmcp import FastMCP from discovery_env import get_problem from discovery_env.scoring import compile_law, _generate_test_states, strip_comments, stripped_code_length # ============================================================ # LOGGING (writes to experiments/logs/ for dashboard) # ============================================================ LOG_DIR = Path(PROJECT_ROOT) / "experiments" / "logs" LOG_DIR.mkdir(parents=True, exist_ok=True) PROBLEM_ID = os.environ.get("PROBLEM_ID", "G01") AGENT_WORKSPACE = os.environ.get("AGENT_WORKSPACE", "") _env = get_problem(PROBLEM_ID) _start_time = time.time() _query_count = 0 _log_path = LOG_DIR / f"{PROBLEM_ID}_{int(_start_time)}.jsonl" # Point the LATEST.txt so dashboard can find us (LOG_DIR / "LATEST.txt").write_text(str(_log_path)) class _Enc(json.JSONEncoder): def default(self, o): if isinstance(o, np.ndarray): return o.tolist() if isinstance(o, (np.integer,)): return int(o) if isinstance(o, (np.floating,)): return float(o) return super().default(o) def _log(action: str, data: dict): entry = { "action": action, "t": round(time.time() - _start_time, 3), "query_num": _query_count, **data, } with open(_log_path, "a") as f: f.write(json.dumps(entry, cls=_Enc) + "\n") # Log session start _log("session_start", { "problem_id": PROBLEM_ID, "shape_info": _env.get_state_shape(), }) # ============================================================ # MCP SERVER # ============================================================ mcp = FastMCP("discovery-env") def _save_data(name: str, data) -> str: """Save data to a JSON file in the agent workspace. Returns the file path.""" if not AGENT_WORKSPACE: return "" data_dir = Path(AGENT_WORKSPACE) / "data" data_dir.mkdir(exist_ok=True) fpath = data_dir / f"{name}.json" with open(fpath, "w") as f: json.dump(data, f) return str(fpath) @mcp.tool() def simulate(state_json: str, n_steps: int) -> str: """Simulate the system forward n_steps from the given initial state. Args: state_json: JSON array representing the current state. For 2D grids: [[0,1,2],[3,4,0],...] n_steps: Number of timesteps to simulate (1-100). Returns: JSON with trajectory file path and summary. Load the file for full data. """ global _query_count _query_count += 1 state = np.array(json.loads(state_json)) n_steps = min(max(1, n_steps), 100) _env.set_initial_conditions(state) trajectory = [] for _ in range(n_steps): state = _env.step(1) trajectory.append(state.tolist()) cells_changed = int(np.sum(np.array(json.loads(state_json)) != np.array(trajectory[-1]))) _log("step", { "n": n_steps, "state": trajectory[-1], "cells_changed": cells_changed, "step_count": _env.step_count, }) fpath = _save_data(f"trajectory_q{_query_count}", trajectory) if fpath: return json.dumps({ "n_steps": n_steps, "cells_changed": cells_changed, "file": fpath, "hint": "Load the file with json.load() to access full trajectory data.", }) return json.dumps(trajectory) @mcp.tool() def random_state(seed: int = 0) -> str: """Generate a random initial condition for the system. Args: seed: Random seed for reproducibility (0 = random). Returns: JSON with state file path. Load the file with json.load() for the grid data. """ global _query_count _query_count += 1 meta = _env.get_state_shape() rows, cols = meta["rows"], meta["cols"] vals = meta.get("values", "0-1") lo, hi = [int(x) for x in vals.split("-")] rng = np.random.default_rng(seed if seed > 0 else None) state = rng.integers(lo, hi + 1, size=(rows, cols)) _log("random_state", {"seed": seed, "state": state.tolist()}) fpath = _save_data(f"state_seed{seed}", state.tolist()) if fpath: return json.dumps({ "seed": seed, "shape": [rows, cols], "file": fpath, "hint": "Load the file with json.load() to access the state grid.", }) return json.dumps(state.tolist()) @mcp.tool() def get_system_info() -> str: """Get a description of the system being investigated. Returns information about the state space (dimensions, value range). Does NOT reveal the update rule. """ meta = _env.get_state_shape() _log("get_info", {"info": meta}) return json.dumps({ "type": meta["type"], "rows": meta["rows"], "cols": meta["cols"], "values": meta["values"], "description": meta["description"], }) @mcp.tool() def submit_rule(code: str) -> str: """Submit a proposed update rule for scoring. Args: code: Python code defining a function predict_next(grid) -> next_grid. The function receives a numpy array and must return the predicted next state as a numpy array. Returns: JSON with scoring results: functional_accuracy, parsimony_bonus, etc. """ fn = compile_law(code) if fn is None: result = {"functional_accuracy": 0.0, "error": "Could not compile code", "total": 0.0} _log("submit", {"result": result}) return json.dumps(result) test_states = _generate_test_states(_env, n=500, seed=9999) correct = 0 for state in test_states: expected = _env.get_true_next(state) try: predicted = fn(state.copy()) if isinstance(predicted, np.ndarray) and np.array_equal(predicted, expected): correct += 1 except Exception: pass accuracy = correct / len(test_states) code_len = len(code.strip()) # Delta-DL parsimony agent_dl = stripped_code_length(code) try: ref_code = _env.__class__.reference_code() ref_dl = stripped_code_length(ref_code) except NotImplementedError: ref_code = None ref_dl = 0 delta_dl = max(0, agent_dl - ref_dl) max_delta = 300 parsimony = 0.2 * max(0.0, 1.0 - delta_dl / max_delta) efficiency = 0.1 * max(0.0, 1.0 - _query_count / 60) result = { "functional_accuracy": accuracy, "parsimony_bonus": round(parsimony, 4), "efficiency_bonus": round(efficiency, 4), "total": round(accuracy + parsimony + efficiency, 4), "correct_states": correct, "total_states": len(test_states), "queries_used": _query_count, "code_length": code_len, "agent_dl": agent_dl, "reference_dl": ref_dl, "delta_dl": delta_dl, } _log("submit", {"result": result}) return json.dumps(result) if __name__ == "__main__": print(f"Discovery Env MCP Server: {PROBLEM_ID}", file=sys.stderr) print(f"Log: {_log_path}", file=sys.stderr) mcp.run(transport="stdio")