File size: 5,232 Bytes
d0e30e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
"""Discovery Environment MCP Server β€” HF Space proxy.

Instead of running the env locally, this proxies all 4 tool calls to the
Hugging Face Space REST API. Agents get the same MCP interface but the
hidden rules live only on HF β€” they cannot be read from local files.

Usage:
    HF_SPACE_URL=https://echoboi-discovery-env.hf.space \
    PROBLEM_ID=G01 \
    python3 discovery_env_server/server_hf.py

Env vars:
    HF_SPACE_URL   Base URL of the HF Space (no trailing slash)
    PROBLEM_ID     G01–G08  (default: G01)
    HF_API_KEY     Optional API key if the space has one set
"""

import json
import os
import sys
import time
from pathlib import Path

import requests
from mcp.server.fastmcp import FastMCP

HF_BASE = os.environ.get("HF_SPACE_URL", "https://echoboi-discovery-env.hf.space").rstrip("/")
PROBLEM_ID = os.environ.get("PROBLEM_ID", "G01")
HF_API_KEY = os.environ.get("HF_API_KEY", "")

HEADERS = {}
if HF_API_KEY:
    HEADERS["X-Api-Key"] = HF_API_KEY


def _post(path: str, body: dict | None = None) -> dict:
    try:
        r = requests.post(f"{HF_BASE}{path}", json=body or {}, headers=HEADERS, timeout=60)
        r.raise_for_status()
        return r.json()
    except requests.RequestException as exc:
        return {"error": str(exc)}


def _get(path: str) -> dict:
    try:
        r = requests.get(f"{HF_BASE}{path}", headers=HEADERS, timeout=30)
        r.raise_for_status()
        return r.json()
    except requests.RequestException as exc:
        return {"error": str(exc)}


# ── Create session on HF at startup ──────────────────────────────────────────

print(f"Connecting to {HF_BASE} (problem={PROBLEM_ID}) ...", file=sys.stderr)
resp = _post("/session", {"problem_id": PROBLEM_ID})
if "session_id" not in resp:
    print(f"ERROR: could not create session: {resp}", file=sys.stderr)
    sys.exit(1)

SESSION_ID = resp["session_id"]
print(f"Session ready: {SESSION_ID}", file=sys.stderr)

# ── Local logging (mirrors original server.py format) ─────────────────────────

_LOG_DIR = Path(os.environ.get("AGENT_WORKSPACE", "/project/agent_workspace")).parent / "experiments" / "logs"
_LOG_DIR.mkdir(parents=True, exist_ok=True)
_LOG_FILE = _LOG_DIR / f"{PROBLEM_ID}_{time.strftime('%Y-%m-%d_%H-%M-%S')}.jsonl"
(_LOG_DIR / "LATEST.txt").write_text(str(_LOG_FILE))

_START_T = time.time()
_QUERY_NUM = 0


def _log(entry: dict) -> None:
    global _QUERY_NUM
    _QUERY_NUM += 1
    entry.update({"t": round(time.time() - _START_T, 3), "query_num": _QUERY_NUM})
    with open(_LOG_FILE, "a") as fh:
        fh.write(json.dumps(entry) + "\n")


# Write session start
with open(_LOG_FILE, "w") as fh:
    fh.write(json.dumps({"action": "session_start", "problem_id": PROBLEM_ID,
                         "session_id": SESSION_ID, "t": 0, "query_num": 0}) + "\n")

# ── MCP server ────────────────────────────────────────────────────────────────

mcp = FastMCP("discovery-env")


@mcp.tool()
def get_system_info() -> str:
    """Get a description of the system being investigated.

    Returns information about the state space (dimensions, value range).
    Does NOT reveal the update rule.
    """
    result = _get(f"/session/{SESSION_ID}/info")
    _log({"action": "get_info"})
    return json.dumps(result)


@mcp.tool()
def random_state(seed: int = 0) -> str:
    """Generate a random initial condition for the system.

    Args:
        seed: Random seed for reproducibility (0 = random).

    Returns:
        JSON with state array and shape.
    """
    result = _post(f"/session/{SESSION_ID}/random_state", {"seed": seed})
    _log({"action": "random_state", "seed": seed})
    return json.dumps(result)


@mcp.tool()
def simulate(state_json: str, n_steps: int) -> str:
    """Simulate the system forward n_steps from the given initial state.

    Args:
        state_json: JSON array representing the current state.
                    For 2D grids: [[0,1,2],[3,4,0],...]
        n_steps: Number of timesteps to simulate (1-100).

    Returns:
        JSON with trajectory and summary.
    """
    result = _post(f"/session/{SESSION_ID}/simulate", {
        "state_json": state_json,
        "n_steps": n_steps,
    })
    _log({"action": "simulate", "n_steps": n_steps,
          "cells_changed": result.get("cells_changed", "?")})
    return json.dumps(result)


@mcp.tool()
def submit_rule(code: str) -> str:
    """Submit a proposed update rule for scoring.

    Args:
        code: Python code defining a function predict_next(grid) -> next_grid.
              The function receives a numpy array and must return the predicted
              next state as a numpy array.

    Returns:
        JSON with scoring results: functional_accuracy, parsimony_bonus, etc.
    """
    result = _post(f"/session/{SESSION_ID}/submit_rule", {"code": code})
    _log({"action": "submit", "result": result})
    return json.dumps(result)


if __name__ == "__main__":
    mcp.run(transport="stdio")