discovery-env / server.py
echoboi
Deploy Discovery Environment API
8c464db
#!/usr/bin/env python3
"""Discovery Environment MCP Server.
Exposes a black-box dynamical system to an LLM agent via MCP tools.
The agent can query the system but never sees the rule implementation.
Usage:
python discovery_env_server/server.py # default: G01
PROBLEM_ID=G02 python discovery_env_server/server.py
Logging: All interactions are written to experiments/logs/ for the dashboard.
Dashboard: http://127.0.0.1:8787
"""
import json
import sys
import os
import time
import numpy as np
from pathlib import Path
# Add project root so we can import discovery_env
PROJECT_ROOT = str(Path(__file__).parent.parent)
sys.path.insert(0, PROJECT_ROOT)
from mcp.server.fastmcp import FastMCP
from discovery_env import get_problem
from discovery_env.scoring import compile_law, _generate_test_states, strip_comments, stripped_code_length
# ============================================================
# LOGGING (writes to experiments/logs/ for dashboard)
# ============================================================
LOG_DIR = Path(PROJECT_ROOT) / "experiments" / "logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
PROBLEM_ID = os.environ.get("PROBLEM_ID", "G01")
AGENT_WORKSPACE = os.environ.get("AGENT_WORKSPACE", "")
_env = get_problem(PROBLEM_ID)
_start_time = time.time()
_query_count = 0
_log_path = LOG_DIR / f"{PROBLEM_ID}_{int(_start_time)}.jsonl"
# Point the LATEST.txt so dashboard can find us
(LOG_DIR / "LATEST.txt").write_text(str(_log_path))
class _Enc(json.JSONEncoder):
def default(self, o):
if isinstance(o, np.ndarray):
return o.tolist()
if isinstance(o, (np.integer,)):
return int(o)
if isinstance(o, (np.floating,)):
return float(o)
return super().default(o)
def _log(action: str, data: dict):
entry = {
"action": action,
"t": round(time.time() - _start_time, 3),
"query_num": _query_count,
**data,
}
with open(_log_path, "a") as f:
f.write(json.dumps(entry, cls=_Enc) + "\n")
# Log session start
_log("session_start", {
"problem_id": PROBLEM_ID,
"shape_info": _env.get_state_shape(),
})
# ============================================================
# MCP SERVER
# ============================================================
mcp = FastMCP("discovery-env")
def _save_data(name: str, data) -> str:
"""Save data to a JSON file in the agent workspace. Returns the file path."""
if not AGENT_WORKSPACE:
return ""
data_dir = Path(AGENT_WORKSPACE) / "data"
data_dir.mkdir(exist_ok=True)
fpath = data_dir / f"{name}.json"
with open(fpath, "w") as f:
json.dump(data, f)
return str(fpath)
@mcp.tool()
def simulate(state_json: str, n_steps: int) -> str:
"""Simulate the system forward n_steps from the given initial state.
Args:
state_json: JSON array representing the current state.
For 2D grids: [[0,1,2],[3,4,0],...]
n_steps: Number of timesteps to simulate (1-100).
Returns:
JSON with trajectory file path and summary. Load the file for full data.
"""
global _query_count
_query_count += 1
state = np.array(json.loads(state_json))
n_steps = min(max(1, n_steps), 100)
_env.set_initial_conditions(state)
trajectory = []
for _ in range(n_steps):
state = _env.step(1)
trajectory.append(state.tolist())
cells_changed = int(np.sum(np.array(json.loads(state_json)) != np.array(trajectory[-1])))
_log("step", {
"n": n_steps,
"state": trajectory[-1],
"cells_changed": cells_changed,
"step_count": _env.step_count,
})
fpath = _save_data(f"trajectory_q{_query_count}", trajectory)
if fpath:
return json.dumps({
"n_steps": n_steps,
"cells_changed": cells_changed,
"file": fpath,
"hint": "Load the file with json.load() to access full trajectory data.",
})
return json.dumps(trajectory)
@mcp.tool()
def random_state(seed: int = 0) -> str:
"""Generate a random initial condition for the system.
Args:
seed: Random seed for reproducibility (0 = random).
Returns:
JSON with state file path. Load the file with json.load() for the grid data.
"""
global _query_count
_query_count += 1
meta = _env.get_state_shape()
rows, cols = meta["rows"], meta["cols"]
vals = meta.get("values", "0-1")
lo, hi = [int(x) for x in vals.split("-")]
rng = np.random.default_rng(seed if seed > 0 else None)
state = rng.integers(lo, hi + 1, size=(rows, cols))
_log("random_state", {"seed": seed, "state": state.tolist()})
fpath = _save_data(f"state_seed{seed}", state.tolist())
if fpath:
return json.dumps({
"seed": seed,
"shape": [rows, cols],
"file": fpath,
"hint": "Load the file with json.load() to access the state grid.",
})
return json.dumps(state.tolist())
@mcp.tool()
def get_system_info() -> str:
"""Get a description of the system being investigated.
Returns information about the state space (dimensions, value range).
Does NOT reveal the update rule.
"""
meta = _env.get_state_shape()
_log("get_info", {"info": meta})
return json.dumps({
"type": meta["type"],
"rows": meta["rows"],
"cols": meta["cols"],
"values": meta["values"],
"description": meta["description"],
})
@mcp.tool()
def submit_rule(code: str) -> str:
"""Submit a proposed update rule for scoring.
Args:
code: Python code defining a function predict_next(grid) -> next_grid.
The function receives a numpy array and must return the predicted
next state as a numpy array.
Returns:
JSON with scoring results: functional_accuracy, parsimony_bonus, etc.
"""
fn = compile_law(code)
if fn is None:
result = {"functional_accuracy": 0.0, "error": "Could not compile code", "total": 0.0}
_log("submit", {"result": result})
return json.dumps(result)
test_states = _generate_test_states(_env, n=500, seed=9999)
correct = 0
for state in test_states:
expected = _env.get_true_next(state)
try:
predicted = fn(state.copy())
if isinstance(predicted, np.ndarray) and np.array_equal(predicted, expected):
correct += 1
except Exception:
pass
accuracy = correct / len(test_states)
code_len = len(code.strip())
# Delta-DL parsimony
agent_dl = stripped_code_length(code)
try:
ref_code = _env.__class__.reference_code()
ref_dl = stripped_code_length(ref_code)
except NotImplementedError:
ref_code = None
ref_dl = 0
delta_dl = max(0, agent_dl - ref_dl)
max_delta = 300
parsimony = 0.2 * max(0.0, 1.0 - delta_dl / max_delta)
efficiency = 0.1 * max(0.0, 1.0 - _query_count / 60)
result = {
"functional_accuracy": accuracy,
"parsimony_bonus": round(parsimony, 4),
"efficiency_bonus": round(efficiency, 4),
"total": round(accuracy + parsimony + efficiency, 4),
"correct_states": correct,
"total_states": len(test_states),
"queries_used": _query_count,
"code_length": code_len,
"agent_dl": agent_dl,
"reference_dl": ref_dl,
"delta_dl": delta_dl,
}
_log("submit", {"result": result})
return json.dumps(result)
if __name__ == "__main__":
print(f"Discovery Env MCP Server: {PROBLEM_ID}", file=sys.stderr)
print(f"Log: {_log_path}", file=sys.stderr)
mcp.run(transport="stdio")