Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Discovery Environment MCP Server β HF Space proxy. | |
| Instead of running the env locally, this proxies all 4 tool calls to the | |
| Hugging Face Space REST API. Agents get the same MCP interface but the | |
| hidden rules live only on HF β they cannot be read from local files. | |
| Usage: | |
| HF_SPACE_URL=https://echoboi-discovery-env.hf.space \ | |
| PROBLEM_ID=G01 \ | |
| python3 discovery_env_server/server_hf.py | |
| Env vars: | |
| HF_SPACE_URL Base URL of the HF Space (no trailing slash) | |
| PROBLEM_ID G01βG08 (default: G01) | |
| HF_API_KEY Optional API key if the space has one set | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import requests | |
| from mcp.server.fastmcp import FastMCP | |
| HF_BASE = os.environ.get("HF_SPACE_URL", "https://echoboi-discovery-env.hf.space").rstrip("/") | |
| PROBLEM_ID = os.environ.get("PROBLEM_ID", "G01") | |
| HF_API_KEY = os.environ.get("HF_API_KEY", "") | |
| HEADERS = {} | |
| if HF_API_KEY: | |
| HEADERS["X-Api-Key"] = HF_API_KEY | |
| def _post(path: str, body: dict | None = None) -> dict: | |
| try: | |
| r = requests.post(f"{HF_BASE}{path}", json=body or {}, headers=HEADERS, timeout=60) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as exc: | |
| return {"error": str(exc)} | |
| def _get(path: str) -> dict: | |
| try: | |
| r = requests.get(f"{HF_BASE}{path}", headers=HEADERS, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| except requests.RequestException as exc: | |
| return {"error": str(exc)} | |
| # ββ Create session on HF at startup ββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"Connecting to {HF_BASE} (problem={PROBLEM_ID}) ...", file=sys.stderr) | |
| resp = _post("/session", {"problem_id": PROBLEM_ID}) | |
| if "session_id" not in resp: | |
| print(f"ERROR: could not create session: {resp}", file=sys.stderr) | |
| sys.exit(1) | |
| SESSION_ID = resp["session_id"] | |
| print(f"Session ready: {SESSION_ID}", file=sys.stderr) | |
| # ββ Local logging (mirrors original server.py format) βββββββββββββββββββββββββ | |
| _LOG_DIR = Path(os.environ.get("AGENT_WORKSPACE", "/project/agent_workspace")).parent / "experiments" / "logs" | |
| _LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| _LOG_FILE = _LOG_DIR / f"{PROBLEM_ID}_{time.strftime('%Y-%m-%d_%H-%M-%S')}.jsonl" | |
| (_LOG_DIR / "LATEST.txt").write_text(str(_LOG_FILE)) | |
| _START_T = time.time() | |
| _QUERY_NUM = 0 | |
| def _log(entry: dict) -> None: | |
| global _QUERY_NUM | |
| _QUERY_NUM += 1 | |
| entry.update({"t": round(time.time() - _START_T, 3), "query_num": _QUERY_NUM}) | |
| with open(_LOG_FILE, "a") as fh: | |
| fh.write(json.dumps(entry) + "\n") | |
| # Write session start | |
| with open(_LOG_FILE, "w") as fh: | |
| fh.write(json.dumps({"action": "session_start", "problem_id": PROBLEM_ID, | |
| "session_id": SESSION_ID, "t": 0, "query_num": 0}) + "\n") | |
| # ββ MCP server ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| mcp = FastMCP("discovery-env") | |
| def get_system_info() -> str: | |
| """Get a description of the system being investigated. | |
| Returns information about the state space (dimensions, value range). | |
| Does NOT reveal the update rule. | |
| """ | |
| result = _get(f"/session/{SESSION_ID}/info") | |
| _log({"action": "get_info"}) | |
| return json.dumps(result) | |
| def random_state(seed: int = 0) -> str: | |
| """Generate a random initial condition for the system. | |
| Args: | |
| seed: Random seed for reproducibility (0 = random). | |
| Returns: | |
| JSON with state array and shape. | |
| """ | |
| result = _post(f"/session/{SESSION_ID}/random_state", {"seed": seed}) | |
| _log({"action": "random_state", "seed": seed}) | |
| return json.dumps(result) | |
| def simulate(state_json: str, n_steps: int) -> str: | |
| """Simulate the system forward n_steps from the given initial state. | |
| Args: | |
| state_json: JSON array representing the current state. | |
| For 2D grids: [[0,1,2],[3,4,0],...] | |
| n_steps: Number of timesteps to simulate (1-100). | |
| Returns: | |
| JSON with trajectory and summary. | |
| """ | |
| result = _post(f"/session/{SESSION_ID}/simulate", { | |
| "state_json": state_json, | |
| "n_steps": n_steps, | |
| }) | |
| _log({"action": "simulate", "n_steps": n_steps, | |
| "cells_changed": result.get("cells_changed", "?")}) | |
| return json.dumps(result) | |
| def submit_rule(code: str) -> str: | |
| """Submit a proposed update rule for scoring. | |
| Args: | |
| code: Python code defining a function predict_next(grid) -> next_grid. | |
| The function receives a numpy array and must return the predicted | |
| next state as a numpy array. | |
| Returns: | |
| JSON with scoring results: functional_accuracy, parsimony_bonus, etc. | |
| """ | |
| result = _post(f"/session/{SESSION_ID}/submit_rule", {"code": code}) | |
| _log({"action": "submit", "result": result}) | |
| return json.dumps(result) | |
| if __name__ == "__main__": | |
| mcp.run(transport="stdio") | |