Spaces:
Runtime error
Runtime error
| """Offline golden-scenario evaluation for the P1 elder-paperwork demo. | |
| The evaluator is intentionally local and transparent: it indexes the bundled | |
| markdown manuals into SQLite, queries the same lightweight token retrieval path | |
| used by the app, and reports the actual retrieved sections instead of fabricating | |
| hits. In offline demo mode, no external model calls are made. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from .demo_pack import ingest_demo_pack | |
| from .demo_packs import load_demo_pack | |
| from .storage import SQLiteStore, init_db | |
| SAFE_TERMS = ( | |
| "safety", | |
| "shutdown", | |
| "meter", | |
| "isolate", | |
| "energized", | |
| "lockout", | |
| "disconnect", | |
| "emergency", | |
| ) | |
| class EvalResult: | |
| scenario_id: str | |
| query: str | |
| top_sections: list[dict[str, Any]] | |
| expected_section_ids: list[int] | |
| expected_section_titles: list[str] | |
| hit_top3: bool | |
| safety_present: bool | |
| sufficient: bool | |
| class IndexedSection: | |
| title: str | |
| text: str | |
| source_file: str | |
| manual_title: str | |
| section_index: int | |
| def load_scenarios(pack_dir: str | Path) -> list[dict[str, Any]]: | |
| pack_dir = Path(pack_dir) | |
| with open(pack_dir / "golden_scenarios.json", "r", encoding="utf-8") as f: | |
| payload = json.load(f) | |
| if isinstance(payload, dict): | |
| scenarios = payload.get("scenarios", []) | |
| return list(scenarios) if isinstance(scenarios, list) else [] | |
| return list(payload) | |
| def _norm(text: str) -> str: | |
| return " ".join((text or "").lower().split()) | |
| def _manuals_root(pack_dir: Path) -> Path: | |
| manuals_dir = pack_dir / "manuals" | |
| if manuals_dir.exists(): | |
| return manuals_dir | |
| return pack_dir | |
| def _parse_manual_sections(manual_path: Path) -> list[IndexedSection]: | |
| text = manual_path.read_text(encoding="utf-8") | |
| lines = text.splitlines() | |
| doc_title = manual_path.stem.replace("_", " ").title() | |
| for line in lines: | |
| if line.startswith("# "): | |
| doc_title = line[2:].strip() | |
| break | |
| sections: list[IndexedSection] = [] | |
| current_title = "Overview" | |
| current_lines: list[str] = [] | |
| seen_heading = False | |
| def flush() -> None: | |
| nonlocal current_lines, current_title | |
| section_text = "\n".join(line.rstrip() for line in current_lines).strip() | |
| if section_text: | |
| sections.append( | |
| IndexedSection( | |
| title=current_title, | |
| text=section_text, | |
| source_file=manual_path.name, | |
| manual_title=doc_title, | |
| section_index=len(sections) + 1, | |
| ) | |
| ) | |
| current_lines = [] | |
| for line in lines: | |
| if line.startswith("# "): | |
| continue | |
| if line.startswith("## "): | |
| if seen_heading or current_lines: | |
| flush() | |
| current_title = line[3:].strip() or "Untitled section" | |
| seen_heading = True | |
| continue | |
| current_lines.append(line) | |
| flush() | |
| if not sections: | |
| sections.append( | |
| IndexedSection( | |
| title=doc_title, | |
| text=text.strip(), | |
| source_file=manual_path.name, | |
| manual_title=doc_title, | |
| section_index=1, | |
| ) | |
| ) | |
| return sections | |
| def _index_manual_sections(store: SQLiteStore, pack_dir: Path, project: str) -> list[dict[str, Any]]: | |
| indexed: list[dict[str, Any]] = [] | |
| for manual_path in sorted(_manuals_root(pack_dir).glob("*.md")): | |
| for section in _parse_manual_sections(manual_path): | |
| payload = { | |
| "manual_title": section.manual_title, | |
| "manual_file": section.source_file, | |
| "section_title": section.title, | |
| "section_index": section.section_index, | |
| } | |
| record_id = store.store_record( | |
| project, | |
| pack_dir.name, | |
| f"{section.manual_title} :: {section.title}", | |
| section.text, | |
| payload, | |
| ) | |
| store.store_embedding( | |
| record_id, | |
| project, | |
| f"{section.manual_title} {section.title} {section.text}", | |
| metadata={"manual_file": section.source_file, "section_title": section.title}, | |
| ) | |
| indexed.append({"record_id": record_id, **payload, "primary_text": section.text}) | |
| return indexed | |
| def _matches_expected(title: str, expected_titles: list[str]) -> bool: | |
| normalized = _norm(title) | |
| for expected in expected_titles: | |
| expected_norm = _norm(expected) | |
| if expected_norm and (expected_norm == normalized or expected_norm in normalized or normalized in expected_norm): | |
| return True | |
| return False | |
| def _safety_observed(title: str, text: str) -> bool: | |
| haystack = f"{title}\n{text}".lower() | |
| return any(term in haystack for term in SAFE_TERMS) | |
| def _search_ranked_sections(store: SQLiteStore, project: str, query: str, limit: int = 5) -> list[dict[str, Any]]: | |
| index = store._embedding_index(project) | |
| scored = index.search(query, limit=limit) | |
| ranked: list[dict[str, Any]] = [] | |
| for rank, (record_id, score) in enumerate(scored, start=1): | |
| record = store.get_record(record_id) | |
| if not record: | |
| continue | |
| payload = json.loads(record["json_blob"]) | |
| ranked.append( | |
| { | |
| "rank": rank, | |
| "record_id": record_id, | |
| "score": round(float(score), 3), | |
| "title": payload.get("section_title") or record["title"], | |
| "citation": f'{payload.get("manual_file", "manual")} :: {payload.get("section_title") or record["title"]}', | |
| "excerpt": record["primary_text"][:220], | |
| "manual_title": payload.get("manual_title", ""), | |
| "section_index": payload.get("section_index"), | |
| } | |
| ) | |
| return ranked | |
| def evaluate_pack(pack_dir: str | Path, db_path: str | Path | None = None) -> dict[str, Any]: | |
| pack_dir = Path(pack_dir) | |
| db_path = Path(db_path or Path("app_data.sqlite3")) | |
| init_db(db_path) | |
| ingest_demo_pack(pack_dir, db_path=db_path, reset=True) | |
| pack = load_demo_pack(pack_dir) | |
| scenarios = load_scenarios(pack_dir) | |
| store = SQLiteStore(db_path, db_path.parent / "artifacts") | |
| try: | |
| retrieval_project = f"{pack.project}_eval" | |
| _index_manual_sections(store, pack_dir, project=retrieval_project) | |
| results: list[EvalResult] = [] | |
| for scenario in scenarios: | |
| query_parts = [scenario.get("symptom", "")] | |
| if scenario.get("equipment_type"): | |
| query_parts.append(str(scenario["equipment_type"])) | |
| if scenario.get("notes"): | |
| query_parts.append(str(scenario["notes"])) | |
| query = " ".join(part for part in query_parts if part).strip() | |
| top_sections = _search_ranked_sections(store, retrieval_project, query, limit=5) | |
| expected_titles = [str(title) for title in scenario.get("expected_section_titles", [])] | |
| top_three = top_sections[:3] | |
| matched_titles = [section["title"] for section in top_three if _matches_expected(section["title"], expected_titles)] | |
| hit_top3 = bool(matched_titles) | |
| safety_present = any(_safety_observed(section["title"], section["excerpt"]) for section in top_sections) | |
| sufficient = not bool(scenario.get("requires_insufficient", False)) | |
| expected_section_ids = [section["rank"] for section in top_three if _matches_expected(section["title"], expected_titles)] | |
| results.append( | |
| EvalResult( | |
| scenario_id=str(scenario["scenario_id"]), | |
| query=query, | |
| top_sections=top_sections, | |
| expected_section_ids=expected_section_ids, | |
| expected_section_titles=expected_titles, | |
| hit_top3=hit_top3, | |
| safety_present=safety_present, | |
| sufficient=sufficient, | |
| ) | |
| ) | |
| total = len(results) | |
| top3_hits = sum(1 for result in results if result.hit_top3) | |
| safety_hits = sum(1 for result in results if result.safety_present) | |
| insufficient_cases = sum(1 for result in results if not result.sufficient) | |
| return { | |
| "pack": str(pack_dir), | |
| "pack_id": pack.pack_id, | |
| "scenario_count": total, | |
| "top3_hit_rate": round(top3_hits / total if total else 0.0, 3), | |
| "safety_presence_rate": round(safety_hits / total if total else 0.0, 3), | |
| "insufficient_cases": insufficient_cases, | |
| "retrieval_project": retrieval_project, | |
| "results": [asdict(result) for result in results], | |
| } | |
| finally: | |
| store.close() | |
| def main() -> None: | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Evaluate P1 elder-paperwork golden scenarios") | |
| parser.add_argument("--pack", required=True, help="Path to demo pack") | |
| parser.add_argument("--db", default=None, help="SQLite database path") | |
| args = parser.parse_args() | |
| report = evaluate_pack(args.pack, db_path=args.db) | |
| print(json.dumps(report, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |