Spaces:
Sleeping
Sleeping
File size: 7,745 Bytes
8c464db | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 | #!/usr/bin/env python3
"""Discovery Environment MCP Server.
Exposes a black-box dynamical system to an LLM agent via MCP tools.
The agent can query the system but never sees the rule implementation.
Usage:
python discovery_env_server/server.py # default: G01
PROBLEM_ID=G02 python discovery_env_server/server.py
Logging: All interactions are written to experiments/logs/ for the dashboard.
Dashboard: http://127.0.0.1:8787
"""
import json
import sys
import os
import time
import numpy as np
from pathlib import Path
# Add project root so we can import discovery_env
PROJECT_ROOT = str(Path(__file__).parent.parent)
sys.path.insert(0, PROJECT_ROOT)
from mcp.server.fastmcp import FastMCP
from discovery_env import get_problem
from discovery_env.scoring import compile_law, _generate_test_states, strip_comments, stripped_code_length
# ============================================================
# LOGGING (writes to experiments/logs/ for dashboard)
# ============================================================
LOG_DIR = Path(PROJECT_ROOT) / "experiments" / "logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
PROBLEM_ID = os.environ.get("PROBLEM_ID", "G01")
AGENT_WORKSPACE = os.environ.get("AGENT_WORKSPACE", "")
_env = get_problem(PROBLEM_ID)
_start_time = time.time()
_query_count = 0
_log_path = LOG_DIR / f"{PROBLEM_ID}_{int(_start_time)}.jsonl"
# Point the LATEST.txt so dashboard can find us
(LOG_DIR / "LATEST.txt").write_text(str(_log_path))
class _Enc(json.JSONEncoder):
def default(self, o):
if isinstance(o, np.ndarray):
return o.tolist()
if isinstance(o, (np.integer,)):
return int(o)
if isinstance(o, (np.floating,)):
return float(o)
return super().default(o)
def _log(action: str, data: dict):
entry = {
"action": action,
"t": round(time.time() - _start_time, 3),
"query_num": _query_count,
**data,
}
with open(_log_path, "a") as f:
f.write(json.dumps(entry, cls=_Enc) + "\n")
# Log session start
_log("session_start", {
"problem_id": PROBLEM_ID,
"shape_info": _env.get_state_shape(),
})
# ============================================================
# MCP SERVER
# ============================================================
mcp = FastMCP("discovery-env")
def _save_data(name: str, data) -> str:
"""Save data to a JSON file in the agent workspace. Returns the file path."""
if not AGENT_WORKSPACE:
return ""
data_dir = Path(AGENT_WORKSPACE) / "data"
data_dir.mkdir(exist_ok=True)
fpath = data_dir / f"{name}.json"
with open(fpath, "w") as f:
json.dump(data, f)
return str(fpath)
@mcp.tool()
def simulate(state_json: str, n_steps: int) -> str:
"""Simulate the system forward n_steps from the given initial state.
Args:
state_json: JSON array representing the current state.
For 2D grids: [[0,1,2],[3,4,0],...]
n_steps: Number of timesteps to simulate (1-100).
Returns:
JSON with trajectory file path and summary. Load the file for full data.
"""
global _query_count
_query_count += 1
state = np.array(json.loads(state_json))
n_steps = min(max(1, n_steps), 100)
_env.set_initial_conditions(state)
trajectory = []
for _ in range(n_steps):
state = _env.step(1)
trajectory.append(state.tolist())
cells_changed = int(np.sum(np.array(json.loads(state_json)) != np.array(trajectory[-1])))
_log("step", {
"n": n_steps,
"state": trajectory[-1],
"cells_changed": cells_changed,
"step_count": _env.step_count,
})
fpath = _save_data(f"trajectory_q{_query_count}", trajectory)
if fpath:
return json.dumps({
"n_steps": n_steps,
"cells_changed": cells_changed,
"file": fpath,
"hint": "Load the file with json.load() to access full trajectory data.",
})
return json.dumps(trajectory)
@mcp.tool()
def random_state(seed: int = 0) -> str:
"""Generate a random initial condition for the system.
Args:
seed: Random seed for reproducibility (0 = random).
Returns:
JSON with state file path. Load the file with json.load() for the grid data.
"""
global _query_count
_query_count += 1
meta = _env.get_state_shape()
rows, cols = meta["rows"], meta["cols"]
vals = meta.get("values", "0-1")
lo, hi = [int(x) for x in vals.split("-")]
rng = np.random.default_rng(seed if seed > 0 else None)
state = rng.integers(lo, hi + 1, size=(rows, cols))
_log("random_state", {"seed": seed, "state": state.tolist()})
fpath = _save_data(f"state_seed{seed}", state.tolist())
if fpath:
return json.dumps({
"seed": seed,
"shape": [rows, cols],
"file": fpath,
"hint": "Load the file with json.load() to access the state grid.",
})
return json.dumps(state.tolist())
@mcp.tool()
def get_system_info() -> str:
"""Get a description of the system being investigated.
Returns information about the state space (dimensions, value range).
Does NOT reveal the update rule.
"""
meta = _env.get_state_shape()
_log("get_info", {"info": meta})
return json.dumps({
"type": meta["type"],
"rows": meta["rows"],
"cols": meta["cols"],
"values": meta["values"],
"description": meta["description"],
})
@mcp.tool()
def submit_rule(code: str) -> str:
"""Submit a proposed update rule for scoring.
Args:
code: Python code defining a function predict_next(grid) -> next_grid.
The function receives a numpy array and must return the predicted
next state as a numpy array.
Returns:
JSON with scoring results: functional_accuracy, parsimony_bonus, etc.
"""
fn = compile_law(code)
if fn is None:
result = {"functional_accuracy": 0.0, "error": "Could not compile code", "total": 0.0}
_log("submit", {"result": result})
return json.dumps(result)
test_states = _generate_test_states(_env, n=500, seed=9999)
correct = 0
for state in test_states:
expected = _env.get_true_next(state)
try:
predicted = fn(state.copy())
if isinstance(predicted, np.ndarray) and np.array_equal(predicted, expected):
correct += 1
except Exception:
pass
accuracy = correct / len(test_states)
code_len = len(code.strip())
# Delta-DL parsimony
agent_dl = stripped_code_length(code)
try:
ref_code = _env.__class__.reference_code()
ref_dl = stripped_code_length(ref_code)
except NotImplementedError:
ref_code = None
ref_dl = 0
delta_dl = max(0, agent_dl - ref_dl)
max_delta = 300
parsimony = 0.2 * max(0.0, 1.0 - delta_dl / max_delta)
efficiency = 0.1 * max(0.0, 1.0 - _query_count / 60)
result = {
"functional_accuracy": accuracy,
"parsimony_bonus": round(parsimony, 4),
"efficiency_bonus": round(efficiency, 4),
"total": round(accuracy + parsimony + efficiency, 4),
"correct_states": correct,
"total_states": len(test_states),
"queries_used": _query_count,
"code_length": code_len,
"agent_dl": agent_dl,
"reference_dl": ref_dl,
"delta_dl": delta_dl,
}
_log("submit", {"result": result})
return json.dumps(result)
if __name__ == "__main__":
print(f"Discovery Env MCP Server: {PROBLEM_ID}", file=sys.stderr)
print(f"Log: {_log_path}", file=sys.stderr)
mcp.run(transport="stdio")
|