"""Collect per-query benchmark data from the live lablab UI. Runs each query through `/api/agent/stream`, accumulates the full SSE trace, and emits a JSON record per query with everything the benchmark page (docs/BENCHMARKS.md) needs: - briefing paragraph - per-Stone fired count (Cornerstone / Keystone / Touchstone / Lodestone / Capstone) - by-design / errored skip rows - Mellea attempts, rerolls, requirements passed/failed - emissions: total Wh, J, tokens, n_measured, by-kind / by-hardware - wall-clock start-to-final - geocode (lat/lon, BBL, BIN) Output: JSON written to outputs/benchmarks.json (or `--out`). Usage: PYTHONPATH=. uv run python scripts/probe_benchmarks.py PYTHONPATH=. uv run python scripts/probe_benchmarks.py \\ --queries "80 Pioneer Street, Brooklyn" "2508 Beach Channel Drive" Defaults to the canonical four addresses from CLAUDE.md. """ from __future__ import annotations import argparse import json import sys import time from pathlib import Path from urllib.parse import quote import httpx DEFAULT_BASE = "https://lablab-ai-amd-developer-hackathon-riprap-nyc.hf.space" DEFAULT_QUERIES = [ "80 Pioneer Street, Brooklyn", "2508 Beach Channel Drive, Queens", "Coney Island I Houses, Brooklyn", "Carleton Manor Houses, Queens", ] STEP_TO_STONE: dict[str, str] = { "sandy_inundation": "Cornerstone", "dep_stormwater": "Cornerstone", "ida_hwm_2021": "Cornerstone", "prithvi_eo_v2": "Cornerstone", "microtopo_lidar": "Cornerstone", "sandy_nta": "Cornerstone", "dep_extreme_2080_nta": "Cornerstone", "dep_moderate_2050_nta": "Cornerstone", "dep_moderate_current_nta": "Cornerstone", "microtopo_nta": "Cornerstone", "mta_entrance_exposure": "Keystone", "nycha_development_exposure": "Keystone", "doe_school_exposure": "Keystone", "doh_hospital_exposure": "Keystone", "terramind_synthesis": "Keystone", "eo_chip_fetch": "Keystone", "terramind_buildings": "Keystone", "floodnet": "Touchstone", "nyc311": "Touchstone", "nws_obs": "Touchstone", "noaa_tides": "Touchstone", "prithvi_eo_live": "Touchstone", "terramind_lulc": "Touchstone", "nyc311_nta": "Touchstone", "nws_alerts": "Lodestone", "ttm_forecast": "Lodestone", "ttm_311_forecast": "Lodestone", "floodnet_forecast": "Lodestone", "ttm_battery_surge": "Lodestone", "reconcile_granite41": "Capstone", "mellea_reconcile_address": "Capstone", "reconcile_neighborhood": "Capstone", "reconcile_development": "Capstone", "reconcile_live_now": "Capstone", } def stream_events(base: str, q: str, timeout_s: float): url = f"{base.rstrip('/')}/api/agent/stream?q={quote(q)}" with httpx.Client(timeout=timeout_s) as client: with client.stream("GET", url) as r: r.raise_for_status() event = None for line in r.iter_lines(): if not line: event = None continue if line.startswith("event:"): event = line.removeprefix("event:").strip() elif line.startswith("data:") and event: body = line.removeprefix("data:").strip() try: yield event, json.loads(body) except Exception: yield event, {"_raw": body} def collect_one(base: str, q: str, timeout_s: float) -> dict: print(f"\n== {q!r} ==", flush=True) t0 = time.time() fired: dict[str, list[str]] = {s: [] for s in ("Cornerstone", "Keystone", "Touchstone", "Lodestone", "Capstone")} errored: list[dict] = [] skipped: list[dict] = [] final: dict | None = None plan: dict | None = None n_token_events = 0 for event, payload in stream_events(base, q, timeout_s): if event == "plan": plan = payload elif event == "token": n_token_events += 1 elif event == "step": step = payload.get("step", "") ok = bool(payload.get("ok")) stone = STEP_TO_STONE.get(step) if stone and ok: fired[stone].append(step) elif not ok: err = (payload.get("err") or (payload.get("result") or {}).get("err") or (payload.get("result") or {}).get("skipped") or "") row = {"step": step, "stone": stone, "reason": err, "elapsed_s": payload.get("elapsed_s")} # Heuristic: by-design skips use neutral language; # genuine errors usually contain a Python exception type. blob = err.lower() is_design_skip = any(p in blob for p in [ "no entrances within radius", "only 2 historical", "no schools within radius", "no nycha", "no hospitals within radius", "out of nyc scope", "not in nyc pluto", ]) if is_design_skip: skipped.append(row) else: errored.append(row) elif event == "final": final = payload elapsed_s = round(time.time() - t0, 2) print(f" {elapsed_s}s ยท token events={n_token_events}", flush=True) em = (final or {}).get("emissions") or {} mel = (final or {}).get("mellea") or {} geo = (final or {}).get("geocode") or {} return { "query": q, "wallclock_s": elapsed_s, "n_token_events": n_token_events, "geocode": { "address": geo.get("address"), "lat": geo.get("lat"), "lon": geo.get("lon"), "bbl": geo.get("bbl"), "bin": geo.get("bin"), "borough": geo.get("borough"), }, "plan": { "intent": (plan or {}).get("intent"), "specialists": (plan or {}).get("specialists"), "rationale": (plan or {}).get("rationale"), }, "stones": { stone: {"n_fired": len(steps), "steps": steps} for stone, steps in fired.items() }, "errored": errored, "skipped_by_design": skipped, "mellea": { "n_attempts": mel.get("n_attempts"), "rerolls": mel.get("rerolls"), "requirements_passed": mel.get("requirements_passed"), "requirements_failed": mel.get("requirements_failed"), "requirements_total": mel.get("requirements_total"), "model": mel.get("model"), }, "emissions": { "n_calls": em.get("n_calls"), "n_measured": em.get("n_measured"), "total_wh": em.get("total_wh"), "total_mwh": em.get("total_mwh"), "total_joules": em.get("total_joules"), "total_duration_s": em.get("total_duration_s"), "tokens": em.get("tokens"), "by_kind": em.get("by_kind"), "by_hardware": em.get("by_hardware"), }, "paragraph": (final or {}).get("paragraph"), "paragraph_chars": len((final or {}).get("paragraph") or ""), "tier": (final or {}).get("tier"), } def main() -> int: p = argparse.ArgumentParser() p.add_argument("--base", default=DEFAULT_BASE) p.add_argument("--queries", nargs="*", default=DEFAULT_QUERIES) p.add_argument("--timeout", type=float, default=600.0) p.add_argument("--out", default="outputs/benchmarks.json") args = p.parse_args() out_path = Path(args.out) out_path.parent.mkdir(parents=True, exist_ok=True) print(f"== probe_benchmarks ==") print(f" base : {args.base}") print(f" queries: {len(args.queries)}") runs = [] for q in args.queries: try: runs.append(collect_one(args.base, q, args.timeout)) except Exception as e: print(f" FAIL {type(e).__name__}: {e}", flush=True) runs.append({"query": q, "error": f"{type(e).__name__}: {e}"}) out = {"base": args.base, "ts": time.time(), "runs": runs} out_path.write_text(json.dumps(out, indent=2, default=str)) print(f"\nwrote {out_path} ({len(runs)} runs)") return 0 if __name__ == "__main__": sys.exit(main())