| |
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import re |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional |
|
|
|
|
| def _safe_str(value: Any) -> str: |
| if value is None: |
| return "" |
| if isinstance(value, (dict, list)): |
| return json.dumps(value, ensure_ascii=False) |
| return str(value) |
|
|
|
|
| def _extract_why(answer: str) -> str: |
| text = str(answer or "") |
| if not text: |
| return "" |
| patterns = [ |
| r"\*\*Why\*\*\s*(.*?)(?:\n\s*\*\*Next action\*\*|\Z)", |
| r"###\s*Why this recommendation\s*(.*?)(?:\n\s*###\s*Next step|\Z)", |
| r"###\s*Why\s*(.*?)(?:\n\s*###\s*Next|\Z)", |
| ] |
| for p in patterns: |
| m = re.search(p, text, flags=re.IGNORECASE | re.DOTALL) |
| if m: |
| return re.sub(r"\s+", " ", m.group(1)).strip() |
| return "" |
|
|
|
|
| def _sources_summary(sources: Any) -> str: |
| if not isinstance(sources, list): |
| return "" |
| names: List[str] = [] |
| for src in sources: |
| if not isinstance(src, dict): |
| continue |
| doc = src.get("doc") or src.get("document") or src.get("relative_path") or src.get("href") or src.get("id") |
| if doc: |
| names.append(str(doc)) |
| |
| seen = set() |
| out = [] |
| for n in names: |
| if n in seen: |
| continue |
| seen.add(n) |
| out.append(n) |
| return " | ".join(out) |
|
|
|
|
| def _iter_result_rows(data: Dict[str, Any], source_file: Path) -> Iterable[Dict[str, Any]]: |
| results = data.get("results") |
| if not isinstance(results, list): |
| return |
| run_generated_at = _safe_str(data.get("generated_at")) |
| suite_total = _safe_str(data.get("total")) |
| suite_pass_rate = _safe_str(data.get("pass_rate")) |
| for r in results: |
| if not isinstance(r, dict): |
| continue |
| assistant = _safe_str(r.get("assistant")) |
| assistant_preview = _safe_str(r.get("assistant_preview")) |
| query = _safe_str(r.get("query") or r.get("question") or r.get("message")) |
| |
| if not assistant and not assistant_preview: |
| continue |
| semantic = r.get("semantic") if isinstance(r.get("semantic"), dict) else {} |
| timing = r.get("timing_ms") if isinstance(r.get("timing_ms"), dict) else {} |
| sources = r.get("sources") if isinstance(r.get("sources"), list) else [] |
| issues = r.get("issues") if isinstance(r.get("issues"), list) else [] |
| notes = r.get("notes") if isinstance(r.get("notes"), list) else [] |
| yield { |
| "source_file": str(source_file), |
| "run_generated_at": run_generated_at, |
| "suite_total": suite_total, |
| "suite_pass_rate": suite_pass_rate, |
| "case_id": _safe_str(r.get("id")), |
| "query": query, |
| "assistant": assistant, |
| "assistant_preview": assistant_preview, |
| "why_extracted": _extract_why(assistant), |
| "domain": _safe_str(r.get("domain")), |
| "mode": _safe_str(r.get("mode")), |
| "difficulty": _safe_str(r.get("difficulty")), |
| "web_assisted": _safe_str(r.get("web_assisted")), |
| "pass": _safe_str(r.get("pass")), |
| "grade": _safe_str(r.get("grade")), |
| "rule_grade": _safe_str(r.get("rule_grade")), |
| "rule_score": _safe_str(r.get("rule_score")), |
| "semantic_score": _safe_str(semantic.get("score")), |
| "semantic_grade": _safe_str(semantic.get("grade")), |
| "semantic_reason": _safe_str(semantic.get("reason") or semantic.get("notes") or semantic.get("explanation")), |
| "final_score": _safe_str(r.get("final_score")), |
| "latency_ms": _safe_str(r.get("latency_ms")), |
| "timing_total_ms": _safe_str(timing.get("total")), |
| "timing_ms_json": _safe_str(timing), |
| "issues": " | ".join(_safe_str(x) for x in issues), |
| "notes": " | ".join(_safe_str(x) for x in notes), |
| "source_count": str(len(sources)), |
| "sources_summary": _sources_summary(sources), |
| "sources_json": _safe_str(sources), |
| } |
|
|
|
|
| def _iter_cases_rows(data: Dict[str, Any], source_file: Path) -> Iterable[Dict[str, Any]]: |
| cases = data.get("cases") |
| if not isinstance(cases, list): |
| return |
| for r in cases: |
| if not isinstance(r, dict): |
| continue |
| assistant = _safe_str(r.get("assistant")) |
| assistant_preview = _safe_str(r.get("assistant_preview")) |
| query = _safe_str(r.get("query") or r.get("question") or r.get("q") or r.get("message")) |
| if not assistant and not assistant_preview: |
| continue |
| yield { |
| "source_file": str(source_file), |
| "run_generated_at": _safe_str(data.get("generated_at")), |
| "suite_total": "", |
| "suite_pass_rate": "", |
| "case_id": _safe_str(r.get("id")), |
| "query": query, |
| "assistant": assistant, |
| "assistant_preview": assistant_preview, |
| "why_extracted": _extract_why(assistant), |
| "domain": _safe_str(r.get("domain")), |
| "mode": _safe_str(r.get("mode")), |
| "difficulty": _safe_str(r.get("difficulty")), |
| "web_assisted": _safe_str(r.get("web_assisted")), |
| "pass": _safe_str(r.get("pass")), |
| "grade": "", |
| "rule_grade": "", |
| "rule_score": "", |
| "semantic_score": "", |
| "semantic_grade": "", |
| "semantic_reason": _safe_str(r.get("weak_reason")), |
| "final_score": "", |
| "latency_ms": _safe_str(r.get("latency_ms")), |
| "timing_total_ms": "", |
| "timing_ms_json": "", |
| "issues": "", |
| "notes": _safe_str(r.get("notes")), |
| "source_count": _safe_str(r.get("source_count")), |
| "sources_summary": "", |
| "sources_json": "", |
| } |
|
|
|
|
| def export_eval_csv(evals_root: Path, out_csv: Path) -> int: |
| rows: List[Dict[str, Any]] = [] |
| for json_path in sorted(evals_root.rglob("*.json")): |
| try: |
| with json_path.open("r", encoding="utf-8") as f: |
| data = json.load(f) |
| except Exception: |
| continue |
| if not isinstance(data, dict): |
| continue |
| rows.extend(_iter_result_rows(data, json_path)) |
| rows.extend(_iter_cases_rows(data, json_path)) |
|
|
| |
| def _sort_key(r: Dict[str, Any]): |
| return (r.get("source_file", ""), str(r.get("case_id", ""))) |
|
|
| rows.sort(key=_sort_key) |
|
|
| out_csv.parent.mkdir(parents=True, exist_ok=True) |
| headers = [ |
| "exported_at", |
| "source_file", |
| "run_generated_at", |
| "suite_total", |
| "suite_pass_rate", |
| "case_id", |
| "query", |
| "assistant", |
| "assistant_preview", |
| "why_extracted", |
| "domain", |
| "mode", |
| "difficulty", |
| "web_assisted", |
| "pass", |
| "grade", |
| "rule_grade", |
| "rule_score", |
| "semantic_score", |
| "semantic_grade", |
| "semantic_reason", |
| "final_score", |
| "latency_ms", |
| "timing_total_ms", |
| "timing_ms_json", |
| "issues", |
| "notes", |
| "source_count", |
| "sources_summary", |
| "sources_json", |
| ] |
| exported_at = datetime.now(timezone.utc).isoformat() |
| with out_csv.open("w", encoding="utf-8", newline="") as f: |
| writer = csv.DictWriter(f, fieldnames=headers) |
| writer.writeheader() |
| for row in rows: |
| out = {k: row.get(k, "") for k in headers} |
| out["exported_at"] = exported_at |
| writer.writerow(out) |
| return len(rows) |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="Export eval Q/A rows to CSV") |
| parser.add_argument("--evals-root", default="docs/evals", help="Folder to scan for eval JSON files") |
| parser.add_argument("--out", default="docs/evals/all_eval_questions_answers.csv", help="Output CSV path") |
| args = parser.parse_args() |
|
|
| root = Path(args.evals_root) |
| out = Path(args.out) |
| count = export_eval_csv(root, out) |
| print(f"Wrote {count} rows to {out}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|