Spaces:
Running
Running
File size: 3,745 Bytes
245f73a e81bd91 245f73a e81bd91 245f73a 771e544 245f73a e81bd91 949c4d4 245f73a 949c4d4 245f73a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | from __future__ import annotations
import asyncio
import tempfile
import zipfile
from pathlib import Path
from fastapi.testclient import TestClient
from frontend import local_server
def main() -> int:
with tempfile.TemporaryDirectory(prefix="rh_space_check_") as tmp:
runs_dir = Path(tmp) / "runs"
local_server.configure_frontend(
managed_runs_dir=str(runs_dir),
cleanup_retention_seconds=60,
cleanup_max_runs=2,
cleanup_interval_seconds=60,
collection_enabled=True,
collection_dataset_repo="InternScience/ResearchHarness-Data",
collection_batch_size=5,
collection_max_bundle_bytes=1024 * 1024,
)
collection_root = local_server._collection_root()
if collection_root is None or not collection_root.exists():
raise RuntimeError("collection root was not created")
token = local_server._collection_token()
if token is not None and not isinstance(token, str):
raise RuntimeError("collection token helper returned a non-string value")
loop = asyncio.new_event_loop()
try:
bridge = local_server.FrontendRunBridge(loop=loop)
workspace_root, trace_dir = local_server._create_managed_run(bridge)
run_root = Path(bridge.managed_run_root)
(workspace_root / "answer.txt").write_text("ok\n", encoding="utf-8")
Path(trace_dir, "trace.jsonl").write_text('{"ok": true}\n', encoding="utf-8")
client = TestClient(local_server.app)
zip_response = client.get(f"/api/workspace.zip?token={bridge.download_token}")
if zip_response.status_code != 200:
raise RuntimeError(f"workspace zip endpoint returned {zip_response.status_code}")
workspace_zip = Path(tmp) / "workspace.zip"
workspace_zip.write_bytes(zip_response.content)
with zipfile.ZipFile(workspace_zip) as archive:
names = set(archive.namelist())
if "answer.txt" not in names:
raise RuntimeError("workspace zip did not include agent workspace file")
if any(name.startswith("agent_trace/") for name in names):
raise RuntimeError("workspace zip must not include trace files")
empty_bridge = local_server.FrontendRunBridge(loop=loop)
local_server._create_managed_run(empty_bridge)
empty_response = client.get(f"/api/workspace.zip?token={empty_bridge.download_token}")
if empty_response.status_code != 404:
raise RuntimeError("empty workspace download should return 404")
if "no downloadable files" not in empty_response.text:
raise RuntimeError("empty workspace response should explain that no files are downloadable")
app_js = Path("frontend/static/app.js").read_text(encoding="utf-8")
if "Workspace is empty. Create or handle a file with the agent" not in app_js:
raise RuntimeError("frontend must show an inline empty-workspace message")
bundle = local_server._write_collection_bundle(
run_root,
{"result_text": "ok", "termination": "result"},
)
if bundle is None or not bundle.exists():
raise RuntimeError("collection bundle was not created")
local_server._release_managed_run(empty_bridge)
local_server._release_managed_run(bridge)
local_server.cleanup_managed_runs_once()
finally:
loop.close()
print("space runtime ok")
return 0
if __name__ == "__main__":
raise SystemExit(main())
|