| |
| """Summarize per-scale Biomni MCP scaling metrics into a table-shaped JSON/CSV.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| DEFAULT_EXP_DIR = Path(__file__).resolve().parents[1] |
| DEFAULT_RESULTS_DIR = DEFAULT_EXP_DIR / "results" |
| DEFAULT_OUT_JSON = DEFAULT_RESULTS_DIR / "experiment1_scaling_table.json" |
| DEFAULT_OUT_CSV = DEFAULT_RESULTS_DIR / "experiment1_scaling_table.csv" |
|
|
|
|
| def load_json(path: Path) -> dict[str, Any]: |
| return json.loads(path.read_text(encoding="utf-8")) |
|
|
|
|
| def metric_path(results_dir: Path, scale: int) -> Path | None: |
| candidates = [ |
| results_dir / f"scale_{scale}_mcp_metrics_v2.json", |
| results_dir / f"scale_{scale}_mcp_metrics.json", |
| ] |
| for path in candidates: |
| if path.exists(): |
| return path |
| return None |
|
|
|
|
| def row_from_payload(scale: int, payload: dict[str, Any]) -> dict[str, Any]: |
| agg = payload.get("aggregate", {}) |
| return { |
| "Scales": scale, |
| "Retrieval Recall@k": agg.get("Retrieval Recall@k"), |
| "Workflow Validity": agg.get("Workflow Validity"), |
| "Execution Success Rate": agg.get("Execution Success Rate"), |
| "Context Tokens": agg.get("Context Tokens"), |
| "Planning Latency": agg.get("Planning Latency"), |
| "Hallucinated Tool Rate": agg.get("Hallucinated Tool Rate"), |
| "Data-Type Mismatch Rate": agg.get("Data-Type Mismatch Rate"), |
| "Biological Constraint Error Rate": agg.get("Biological Constraint Error Rate"), |
| "retrieval_recall_k": agg.get("retrieval_recall_k"), |
| "run_count": agg.get("run_count"), |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--results-dir", type=Path, default=DEFAULT_RESULTS_DIR) |
| parser.add_argument("--scales", type=int, nargs="+", default=[100, 500, 1000]) |
| parser.add_argument("--out-json", type=Path, default=DEFAULT_OUT_JSON) |
| parser.add_argument("--out-csv", type=Path, default=DEFAULT_OUT_CSV) |
| args = parser.parse_args() |
|
|
| rows = [] |
| for scale in args.scales: |
| path = metric_path(args.results_dir, scale) |
| if path is None: |
| continue |
| rows.append(row_from_payload(scale, load_json(path))) |
|
|
| payload = {"rows": rows} |
| args.out_json.parent.mkdir(parents=True, exist_ok=True) |
| args.out_json.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") |
|
|
| with args.out_csv.open("w", encoding="utf-8", newline="") as fh: |
| writer = csv.DictWriter( |
| fh, |
| fieldnames=[ |
| "Scales", |
| "Retrieval Recall@k", |
| "Workflow Validity", |
| "Execution Success Rate", |
| "Context Tokens", |
| "Planning Latency", |
| "Hallucinated Tool Rate", |
| "Data-Type Mismatch Rate", |
| "Biological Constraint Error Rate", |
| "retrieval_recall_k", |
| "run_count", |
| ], |
| ) |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
| print(json.dumps(payload, indent=2, ensure_ascii=False)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|