#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional def _safe_str(value: Any) -> str: if value is None: return "" if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value) def _extract_why(answer: str) -> str: text = str(answer or "") if not text: return "" patterns = [ r"\*\*Why\*\*\s*(.*?)(?:\n\s*\*\*Next action\*\*|\Z)", r"###\s*Why this recommendation\s*(.*?)(?:\n\s*###\s*Next step|\Z)", r"###\s*Why\s*(.*?)(?:\n\s*###\s*Next|\Z)", ] for p in patterns: m = re.search(p, text, flags=re.IGNORECASE | re.DOTALL) if m: return re.sub(r"\s+", " ", m.group(1)).strip() return "" def _sources_summary(sources: Any) -> str: if not isinstance(sources, list): return "" names: List[str] = [] for src in sources: if not isinstance(src, dict): continue doc = src.get("doc") or src.get("document") or src.get("relative_path") or src.get("href") or src.get("id") if doc: names.append(str(doc)) # de-dupe preserve order seen = set() out = [] for n in names: if n in seen: continue seen.add(n) out.append(n) return " | ".join(out) def _iter_result_rows(data: Dict[str, Any], source_file: Path) -> Iterable[Dict[str, Any]]: results = data.get("results") if not isinstance(results, list): return run_generated_at = _safe_str(data.get("generated_at")) suite_total = _safe_str(data.get("total")) suite_pass_rate = _safe_str(data.get("pass_rate")) for r in results: if not isinstance(r, dict): continue assistant = _safe_str(r.get("assistant")) assistant_preview = _safe_str(r.get("assistant_preview")) query = _safe_str(r.get("query") or r.get("question") or r.get("message")) # only include rows where an answer was produced if not assistant and not assistant_preview: continue semantic = r.get("semantic") if isinstance(r.get("semantic"), dict) else {} timing = r.get("timing_ms") if isinstance(r.get("timing_ms"), dict) else {} sources = r.get("sources") if isinstance(r.get("sources"), list) else [] issues = r.get("issues") if isinstance(r.get("issues"), list) else [] notes = r.get("notes") if isinstance(r.get("notes"), list) else [] yield { "source_file": str(source_file), "run_generated_at": run_generated_at, "suite_total": suite_total, "suite_pass_rate": suite_pass_rate, "case_id": _safe_str(r.get("id")), "query": query, "assistant": assistant, "assistant_preview": assistant_preview, "why_extracted": _extract_why(assistant), "domain": _safe_str(r.get("domain")), "mode": _safe_str(r.get("mode")), "difficulty": _safe_str(r.get("difficulty")), "web_assisted": _safe_str(r.get("web_assisted")), "pass": _safe_str(r.get("pass")), "grade": _safe_str(r.get("grade")), "rule_grade": _safe_str(r.get("rule_grade")), "rule_score": _safe_str(r.get("rule_score")), "semantic_score": _safe_str(semantic.get("score")), "semantic_grade": _safe_str(semantic.get("grade")), "semantic_reason": _safe_str(semantic.get("reason") or semantic.get("notes") or semantic.get("explanation")), "final_score": _safe_str(r.get("final_score")), "latency_ms": _safe_str(r.get("latency_ms")), "timing_total_ms": _safe_str(timing.get("total")), "timing_ms_json": _safe_str(timing), "issues": " | ".join(_safe_str(x) for x in issues), "notes": " | ".join(_safe_str(x) for x in notes), "source_count": str(len(sources)), "sources_summary": _sources_summary(sources), "sources_json": _safe_str(sources), } def _iter_cases_rows(data: Dict[str, Any], source_file: Path) -> Iterable[Dict[str, Any]]: cases = data.get("cases") if not isinstance(cases, list): return for r in cases: if not isinstance(r, dict): continue assistant = _safe_str(r.get("assistant")) assistant_preview = _safe_str(r.get("assistant_preview")) query = _safe_str(r.get("query") or r.get("question") or r.get("q") or r.get("message")) if not assistant and not assistant_preview: continue yield { "source_file": str(source_file), "run_generated_at": _safe_str(data.get("generated_at")), "suite_total": "", "suite_pass_rate": "", "case_id": _safe_str(r.get("id")), "query": query, "assistant": assistant, "assistant_preview": assistant_preview, "why_extracted": _extract_why(assistant), "domain": _safe_str(r.get("domain")), "mode": _safe_str(r.get("mode")), "difficulty": _safe_str(r.get("difficulty")), "web_assisted": _safe_str(r.get("web_assisted")), "pass": _safe_str(r.get("pass")), "grade": "", "rule_grade": "", "rule_score": "", "semantic_score": "", "semantic_grade": "", "semantic_reason": _safe_str(r.get("weak_reason")), "final_score": "", "latency_ms": _safe_str(r.get("latency_ms")), "timing_total_ms": "", "timing_ms_json": "", "issues": "", "notes": _safe_str(r.get("notes")), "source_count": _safe_str(r.get("source_count")), "sources_summary": "", "sources_json": "", } def export_eval_csv(evals_root: Path, out_csv: Path) -> int: rows: List[Dict[str, Any]] = [] for json_path in sorted(evals_root.rglob("*.json")): try: with json_path.open("r", encoding="utf-8") as f: data = json.load(f) except Exception: continue if not isinstance(data, dict): continue rows.extend(_iter_result_rows(data, json_path)) rows.extend(_iter_cases_rows(data, json_path)) # stable ordering for review def _sort_key(r: Dict[str, Any]): return (r.get("source_file", ""), str(r.get("case_id", ""))) rows.sort(key=_sort_key) out_csv.parent.mkdir(parents=True, exist_ok=True) headers = [ "exported_at", "source_file", "run_generated_at", "suite_total", "suite_pass_rate", "case_id", "query", "assistant", "assistant_preview", "why_extracted", "domain", "mode", "difficulty", "web_assisted", "pass", "grade", "rule_grade", "rule_score", "semantic_score", "semantic_grade", "semantic_reason", "final_score", "latency_ms", "timing_total_ms", "timing_ms_json", "issues", "notes", "source_count", "sources_summary", "sources_json", ] exported_at = datetime.now(timezone.utc).isoformat() with out_csv.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=headers) writer.writeheader() for row in rows: out = {k: row.get(k, "") for k in headers} out["exported_at"] = exported_at writer.writerow(out) return len(rows) def main() -> int: parser = argparse.ArgumentParser(description="Export eval Q/A rows to CSV") parser.add_argument("--evals-root", default="docs/evals", help="Folder to scan for eval JSON files") parser.add_argument("--out", default="docs/evals/all_eval_questions_answers.csv", help="Output CSV path") args = parser.parse_args() root = Path(args.evals_root) out = Path(args.out) count = export_eval_csv(root, out) print(f"Wrote {count} rows to {out}") return 0 if __name__ == "__main__": raise SystemExit(main())