czty's picture
Add files using upload-large-folder tool
9473841 verified
Raw
History Blame Contribute Delete
3.25 kB
#!/usr/bin/env python3
"""Summarize per-scale Biomni MCP scaling metrics into a table-shaped JSON/CSV."""
from __future__ import annotations
import argparse
import csv
import json
from pathlib import Path
from typing import Any
DEFAULT_EXP_DIR = Path(__file__).resolve().parents[1]
DEFAULT_RESULTS_DIR = DEFAULT_EXP_DIR / "results"
DEFAULT_OUT_JSON = DEFAULT_RESULTS_DIR / "experiment1_scaling_table.json"
DEFAULT_OUT_CSV = DEFAULT_RESULTS_DIR / "experiment1_scaling_table.csv"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def metric_path(results_dir: Path, scale: int) -> Path | None:
candidates = [
results_dir / f"scale_{scale}_mcp_metrics_v2.json",
results_dir / f"scale_{scale}_mcp_metrics.json",
]
for path in candidates:
if path.exists():
return path
return None
def row_from_payload(scale: int, payload: dict[str, Any]) -> dict[str, Any]:
agg = payload.get("aggregate", {})
return {
"Scales": scale,
"Retrieval Recall@k": agg.get("Retrieval Recall@k"),
"Workflow Validity": agg.get("Workflow Validity"),
"Execution Success Rate": agg.get("Execution Success Rate"),
"Context Tokens": agg.get("Context Tokens"),
"Planning Latency": agg.get("Planning Latency"),
"Hallucinated Tool Rate": agg.get("Hallucinated Tool Rate"),
"Data-Type Mismatch Rate": agg.get("Data-Type Mismatch Rate"),
"Biological Constraint Error Rate": agg.get("Biological Constraint Error Rate"),
"retrieval_recall_k": agg.get("retrieval_recall_k"),
"run_count": agg.get("run_count"),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--results-dir", type=Path, default=DEFAULT_RESULTS_DIR)
parser.add_argument("--scales", type=int, nargs="+", default=[100, 500, 1000])
parser.add_argument("--out-json", type=Path, default=DEFAULT_OUT_JSON)
parser.add_argument("--out-csv", type=Path, default=DEFAULT_OUT_CSV)
args = parser.parse_args()
rows = []
for scale in args.scales:
path = metric_path(args.results_dir, scale)
if path is None:
continue
rows.append(row_from_payload(scale, load_json(path)))
payload = {"rows": rows}
args.out_json.parent.mkdir(parents=True, exist_ok=True)
args.out_json.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
with args.out_csv.open("w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(
fh,
fieldnames=[
"Scales",
"Retrieval Recall@k",
"Workflow Validity",
"Execution Success Rate",
"Context Tokens",
"Planning Latency",
"Hallucinated Tool Rate",
"Data-Type Mismatch Rate",
"Biological Constraint Error Rate",
"retrieval_recall_k",
"run_count",
],
)
writer.writeheader()
writer.writerows(rows)
print(json.dumps(payload, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())