| """ |
| benchmark/compare.py - Pluto vs single-model baseline comparison helpers. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import time |
| from pathlib import Path |
|
|
| from pluto.models import FinalAnswer, FinalOutput, Section, TraceSummary |
| from pluto.pipeline import PipelineRunner |
|
|
|
|
| def _normalize_selected_doc_ids(selected_doc_ids: list[str] | None) -> list[str]: |
| seen: set[str] = set() |
| normalized: list[str] = [] |
| for raw_doc_id in selected_doc_ids or []: |
| doc_id = str(raw_doc_id or "").strip() |
| if not doc_id or doc_id in seen: |
| continue |
| seen.add(doc_id) |
| normalized.append(doc_id) |
| return normalized |
|
|
|
|
| def _normalize_detail_level(detail_level: str | None) -> str: |
| return "detailed" if str(detail_level or "").strip().lower() == "detailed" else "standard" |
|
|
|
|
| class SimpleRunner: |
| """ |
| Single-model baseline: one LLM call over top keyword-matched chunks. |
| No routing, no extraction schema, no evidence check. |
| """ |
|
|
| def __init__(self, corpus_dir: str, doc_index=None): |
| self.corpus_dir = Path(corpus_dir) |
| self.doc_index = doc_index |
|
|
| def run( |
| self, |
| query: str, |
| selected_doc_ids: list[str] | None = None, |
| detail_level: str = "standard", |
| ) -> FinalOutput: |
| from pluto.dispatcher import dispatch |
| from pluto.modes import MODE_REGISTRY |
|
|
| selected_doc_ids = _normalize_selected_doc_ids(selected_doc_ids) |
| detail_level = _normalize_detail_level(detail_level) |
| selected_doc_set = set(selected_doc_ids) |
| query_words = {word for word in query.lower().split() if word} |
| chunks: list[str] = [] |
|
|
| for md_file in sorted(self.corpus_dir.glob("*.md")): |
| if selected_doc_set and md_file.stem not in selected_doc_set: |
| continue |
| text = md_file.read_text(encoding="utf-8", errors="replace") |
| parts = [text[i : i + 1000] for i in range(0, len(text), 1000)] |
| scored = sorted( |
| parts, |
| key=lambda part: sum(1 for word in query_words if word in part.lower()), |
| reverse=True, |
| ) |
| per_doc_limit = 3 if detail_level == "detailed" else 2 |
| chunks.extend(scored[:per_doc_limit]) |
|
|
| chunk_cap = 8 if detail_level == "detailed" else 5 |
| top_chunks = chunks[:chunk_cap] |
| quick_model = MODE_REGISTRY["MODE_QUICK"].model_id |
| if not top_chunks: |
| return FinalOutput( |
| final_answer=FinalAnswer(response="No documents found in corpus.", sections=[]), |
| evidence=[], |
| trace_summary=TraceSummary( |
| real_switching=False, |
| modes_used_counts={"MODE_QUICK": 1}, |
| models_used=[quick_model], |
| chunks_processed=0, |
| search_queries=[query], |
| budget_notes=f"Baseline ({detail_level} mode)", |
| ), |
| confidence=0.0, |
| ) |
|
|
| context = "\n\n---\n\n".join(top_chunks) |
| detail_instruction = ( |
| "Provide a thorough, evidence-grounded answer with methodology, findings, limitations, and implications when available." |
| if detail_level == "detailed" |
| else "Provide a clear, direct answer." |
| ) |
| prompt = f"""Answer the following question based ONLY on the provided context. |
| |
| QUESTION: {query} |
| |
| CONTEXT: |
| {context[:7000]} |
| |
| {detail_instruction} |
| If the context does not contain enough information, say so.""" |
|
|
| try: |
| response = dispatch("MODE_QUICK", prompt) |
| except Exception as exc: |
| response = f"Baseline LLM call failed: {exc}" |
|
|
| return FinalOutput( |
| final_answer=FinalAnswer( |
| response=response, |
| sections=[Section(title="Answer", content=response)], |
| ), |
| evidence=[], |
| trace_summary=TraceSummary( |
| real_switching=False, |
| modes_used_counts={"MODE_QUICK": 1}, |
| models_used=[quick_model], |
| chunks_processed=len(top_chunks), |
| search_queries=[query], |
| budget_notes=f"Baseline ({detail_level} mode)", |
| ), |
| confidence=0.5, |
| ) |
|
|
|
|
| class ComparisonRunner: |
| """Run Pluto vs baseline and return comparable metrics.""" |
|
|
| def __init__(self, corpus_dir: str, doc_index=None): |
| self.pluto = PipelineRunner(corpus_dir, doc_index=doc_index) |
| self.baseline = SimpleRunner(corpus_dir, doc_index=doc_index) |
|
|
| def compare( |
| self, |
| query: str, |
| selected_doc_ids: list[str] | None = None, |
| detail_level: str = "standard", |
| ) -> dict: |
| selected_doc_ids = _normalize_selected_doc_ids(selected_doc_ids) |
| detail_level = _normalize_detail_level(detail_level) |
|
|
| pluto_metrics = self._run_side( |
| "Pluto", |
| lambda: self.pluto.run( |
| query, |
| selected_doc_ids=selected_doc_ids, |
| detail_level=detail_level, |
| ), |
| evidence_checked=True, |
| ) |
| baseline_metrics = self._run_side( |
| "Baseline", |
| lambda: self.baseline.run( |
| query, |
| selected_doc_ids=selected_doc_ids, |
| detail_level=detail_level, |
| ), |
| evidence_checked=False, |
| ) |
|
|
| winner = "Unavailable" |
| if not pluto_metrics.get("error") and ( |
| baseline_metrics.get("error") or pluto_metrics["confidence"] >= baseline_metrics["confidence"] |
| ): |
| winner = "Pluto" |
| elif not baseline_metrics.get("error"): |
| winner = "Baseline" |
|
|
| return { |
| "query": query, |
| "detail_level": detail_level, |
| "selected_doc_ids": selected_doc_ids, |
| "pluto": pluto_metrics, |
| "baseline": baseline_metrics, |
| "winner": winner, |
| } |
|
|
| def _run_side(self, label: str, runner, evidence_checked: bool) -> dict: |
| start_time = time.time() |
| try: |
| result = runner() |
| return { |
| "latency_s": round(time.time() - start_time, 2), |
| "confidence": round(result.confidence, 2), |
| "evidence_count": len(result.evidence), |
| "chunks_processed": result.trace_summary.chunks_processed, |
| "evidence_checked": evidence_checked, |
| "answer_preview": (result.final_answer.response or "")[:300], |
| "models_used": result.trace_summary.models_used, |
| "real_switching": result.trace_summary.real_switching if evidence_checked else False, |
| "error": None, |
| } |
| except Exception as exc: |
| return { |
| "latency_s": round(time.time() - start_time, 2), |
| "confidence": 0.0, |
| "evidence_count": 0, |
| "chunks_processed": 0, |
| "evidence_checked": evidence_checked, |
| "answer_preview": f"{label} failed: {exc}"[:300], |
| "models_used": [], |
| "real_switching": False, |
| "error": str(exc), |
| } |
|
|
|
|
| if __name__ == "__main__": |
| import json |
|
|
| runner = ComparisonRunner("./corpus") |
| results = runner.compare("What is this paper about?") |
| print(json.dumps(results, indent=2)) |
|
|