Abhishek
Add all folders and files
f9a9b47
Raw
History Blame Contribute Delete
9.5 kB
"""Offline golden-scenario evaluation for the P1 elder-paperwork demo.
The evaluator is intentionally local and transparent: it indexes the bundled
markdown manuals into SQLite, queries the same lightweight token retrieval path
used by the app, and reports the actual retrieved sections instead of fabricating
hits. In offline demo mode, no external model calls are made.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
from .demo_pack import ingest_demo_pack
from .demo_packs import load_demo_pack
from .storage import SQLiteStore, init_db
SAFE_TERMS = (
"safety",
"shutdown",
"meter",
"isolate",
"energized",
"lockout",
"disconnect",
"emergency",
)
@dataclass
class EvalResult:
scenario_id: str
query: str
top_sections: list[dict[str, Any]]
expected_section_ids: list[int]
expected_section_titles: list[str]
hit_top3: bool
safety_present: bool
sufficient: bool
@dataclass(frozen=True)
class IndexedSection:
title: str
text: str
source_file: str
manual_title: str
section_index: int
def load_scenarios(pack_dir: str | Path) -> list[dict[str, Any]]:
pack_dir = Path(pack_dir)
with open(pack_dir / "golden_scenarios.json", "r", encoding="utf-8") as f:
payload = json.load(f)
if isinstance(payload, dict):
scenarios = payload.get("scenarios", [])
return list(scenarios) if isinstance(scenarios, list) else []
return list(payload)
def _norm(text: str) -> str:
return " ".join((text or "").lower().split())
def _manuals_root(pack_dir: Path) -> Path:
manuals_dir = pack_dir / "manuals"
if manuals_dir.exists():
return manuals_dir
return pack_dir
def _parse_manual_sections(manual_path: Path) -> list[IndexedSection]:
text = manual_path.read_text(encoding="utf-8")
lines = text.splitlines()
doc_title = manual_path.stem.replace("_", " ").title()
for line in lines:
if line.startswith("# "):
doc_title = line[2:].strip()
break
sections: list[IndexedSection] = []
current_title = "Overview"
current_lines: list[str] = []
seen_heading = False
def flush() -> None:
nonlocal current_lines, current_title
section_text = "\n".join(line.rstrip() for line in current_lines).strip()
if section_text:
sections.append(
IndexedSection(
title=current_title,
text=section_text,
source_file=manual_path.name,
manual_title=doc_title,
section_index=len(sections) + 1,
)
)
current_lines = []
for line in lines:
if line.startswith("# "):
continue
if line.startswith("## "):
if seen_heading or current_lines:
flush()
current_title = line[3:].strip() or "Untitled section"
seen_heading = True
continue
current_lines.append(line)
flush()
if not sections:
sections.append(
IndexedSection(
title=doc_title,
text=text.strip(),
source_file=manual_path.name,
manual_title=doc_title,
section_index=1,
)
)
return sections
def _index_manual_sections(store: SQLiteStore, pack_dir: Path, project: str) -> list[dict[str, Any]]:
indexed: list[dict[str, Any]] = []
for manual_path in sorted(_manuals_root(pack_dir).glob("*.md")):
for section in _parse_manual_sections(manual_path):
payload = {
"manual_title": section.manual_title,
"manual_file": section.source_file,
"section_title": section.title,
"section_index": section.section_index,
}
record_id = store.store_record(
project,
pack_dir.name,
f"{section.manual_title} :: {section.title}",
section.text,
payload,
)
store.store_embedding(
record_id,
project,
f"{section.manual_title} {section.title} {section.text}",
metadata={"manual_file": section.source_file, "section_title": section.title},
)
indexed.append({"record_id": record_id, **payload, "primary_text": section.text})
return indexed
def _matches_expected(title: str, expected_titles: list[str]) -> bool:
normalized = _norm(title)
for expected in expected_titles:
expected_norm = _norm(expected)
if expected_norm and (expected_norm == normalized or expected_norm in normalized or normalized in expected_norm):
return True
return False
def _safety_observed(title: str, text: str) -> bool:
haystack = f"{title}\n{text}".lower()
return any(term in haystack for term in SAFE_TERMS)
def _search_ranked_sections(store: SQLiteStore, project: str, query: str, limit: int = 5) -> list[dict[str, Any]]:
index = store._embedding_index(project)
scored = index.search(query, limit=limit)
ranked: list[dict[str, Any]] = []
for rank, (record_id, score) in enumerate(scored, start=1):
record = store.get_record(record_id)
if not record:
continue
payload = json.loads(record["json_blob"])
ranked.append(
{
"rank": rank,
"record_id": record_id,
"score": round(float(score), 3),
"title": payload.get("section_title") or record["title"],
"citation": f'{payload.get("manual_file", "manual")} :: {payload.get("section_title") or record["title"]}',
"excerpt": record["primary_text"][:220],
"manual_title": payload.get("manual_title", ""),
"section_index": payload.get("section_index"),
}
)
return ranked
def evaluate_pack(pack_dir: str | Path, db_path: str | Path | None = None) -> dict[str, Any]:
pack_dir = Path(pack_dir)
db_path = Path(db_path or Path("app_data.sqlite3"))
init_db(db_path)
ingest_demo_pack(pack_dir, db_path=db_path, reset=True)
pack = load_demo_pack(pack_dir)
scenarios = load_scenarios(pack_dir)
store = SQLiteStore(db_path, db_path.parent / "artifacts")
try:
retrieval_project = f"{pack.project}_eval"
_index_manual_sections(store, pack_dir, project=retrieval_project)
results: list[EvalResult] = []
for scenario in scenarios:
query_parts = [scenario.get("symptom", "")]
if scenario.get("equipment_type"):
query_parts.append(str(scenario["equipment_type"]))
if scenario.get("notes"):
query_parts.append(str(scenario["notes"]))
query = " ".join(part for part in query_parts if part).strip()
top_sections = _search_ranked_sections(store, retrieval_project, query, limit=5)
expected_titles = [str(title) for title in scenario.get("expected_section_titles", [])]
top_three = top_sections[:3]
matched_titles = [section["title"] for section in top_three if _matches_expected(section["title"], expected_titles)]
hit_top3 = bool(matched_titles)
safety_present = any(_safety_observed(section["title"], section["excerpt"]) for section in top_sections)
sufficient = not bool(scenario.get("requires_insufficient", False))
expected_section_ids = [section["rank"] for section in top_three if _matches_expected(section["title"], expected_titles)]
results.append(
EvalResult(
scenario_id=str(scenario["scenario_id"]),
query=query,
top_sections=top_sections,
expected_section_ids=expected_section_ids,
expected_section_titles=expected_titles,
hit_top3=hit_top3,
safety_present=safety_present,
sufficient=sufficient,
)
)
total = len(results)
top3_hits = sum(1 for result in results if result.hit_top3)
safety_hits = sum(1 for result in results if result.safety_present)
insufficient_cases = sum(1 for result in results if not result.sufficient)
return {
"pack": str(pack_dir),
"pack_id": pack.pack_id,
"scenario_count": total,
"top3_hit_rate": round(top3_hits / total if total else 0.0, 3),
"safety_presence_rate": round(safety_hits / total if total else 0.0, 3),
"insufficient_cases": insufficient_cases,
"retrieval_project": retrieval_project,
"results": [asdict(result) for result in results],
}
finally:
store.close()
def main() -> None:
import argparse
parser = argparse.ArgumentParser(description="Evaluate P1 elder-paperwork golden scenarios")
parser.add_argument("--pack", required=True, help="Path to demo pack")
parser.add_argument("--db", default=None, help="SQLite database path")
args = parser.parse_args()
report = evaluate_pack(args.pack, db_path=args.db)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()