""" benchmark/compare.py - Pluto vs single-model baseline comparison helpers. """ from __future__ import annotations import time from pathlib import Path from pluto.models import FinalAnswer, FinalOutput, Section, TraceSummary from pluto.pipeline import PipelineRunner def _normalize_selected_doc_ids(selected_doc_ids: list[str] | None) -> list[str]: seen: set[str] = set() normalized: list[str] = [] for raw_doc_id in selected_doc_ids or []: doc_id = str(raw_doc_id or "").strip() if not doc_id or doc_id in seen: continue seen.add(doc_id) normalized.append(doc_id) return normalized def _normalize_detail_level(detail_level: str | None) -> str: return "detailed" if str(detail_level or "").strip().lower() == "detailed" else "standard" class SimpleRunner: """ Single-model baseline: one LLM call over top keyword-matched chunks. No routing, no extraction schema, no evidence check. """ def __init__(self, corpus_dir: str, doc_index=None): self.corpus_dir = Path(corpus_dir) self.doc_index = doc_index def run( self, query: str, selected_doc_ids: list[str] | None = None, detail_level: str = "standard", ) -> FinalOutput: from pluto.dispatcher import dispatch from pluto.modes import MODE_REGISTRY selected_doc_ids = _normalize_selected_doc_ids(selected_doc_ids) detail_level = _normalize_detail_level(detail_level) selected_doc_set = set(selected_doc_ids) query_words = {word for word in query.lower().split() if word} chunks: list[str] = [] for md_file in sorted(self.corpus_dir.glob("*.md")): if selected_doc_set and md_file.stem not in selected_doc_set: continue text = md_file.read_text(encoding="utf-8", errors="replace") parts = [text[i : i + 1000] for i in range(0, len(text), 1000)] scored = sorted( parts, key=lambda part: sum(1 for word in query_words if word in part.lower()), reverse=True, ) per_doc_limit = 3 if detail_level == "detailed" else 2 chunks.extend(scored[:per_doc_limit]) chunk_cap = 8 if detail_level == "detailed" else 5 top_chunks = chunks[:chunk_cap] quick_model = MODE_REGISTRY["MODE_QUICK"].model_id if not top_chunks: return FinalOutput( final_answer=FinalAnswer(response="No documents found in corpus.", sections=[]), evidence=[], trace_summary=TraceSummary( real_switching=False, modes_used_counts={"MODE_QUICK": 1}, models_used=[quick_model], chunks_processed=0, search_queries=[query], budget_notes=f"Baseline ({detail_level} mode)", ), confidence=0.0, ) context = "\n\n---\n\n".join(top_chunks) detail_instruction = ( "Provide a thorough, evidence-grounded answer with methodology, findings, limitations, and implications when available." if detail_level == "detailed" else "Provide a clear, direct answer." ) prompt = f"""Answer the following question based ONLY on the provided context. QUESTION: {query} CONTEXT: {context[:7000]} {detail_instruction} If the context does not contain enough information, say so.""" try: response = dispatch("MODE_QUICK", prompt) except Exception as exc: response = f"Baseline LLM call failed: {exc}" return FinalOutput( final_answer=FinalAnswer( response=response, sections=[Section(title="Answer", content=response)], ), evidence=[], trace_summary=TraceSummary( real_switching=False, modes_used_counts={"MODE_QUICK": 1}, models_used=[quick_model], chunks_processed=len(top_chunks), search_queries=[query], budget_notes=f"Baseline ({detail_level} mode)", ), confidence=0.5, ) class ComparisonRunner: """Run Pluto vs baseline and return comparable metrics.""" def __init__(self, corpus_dir: str, doc_index=None): self.pluto = PipelineRunner(corpus_dir, doc_index=doc_index) self.baseline = SimpleRunner(corpus_dir, doc_index=doc_index) def compare( self, query: str, selected_doc_ids: list[str] | None = None, detail_level: str = "standard", ) -> dict: selected_doc_ids = _normalize_selected_doc_ids(selected_doc_ids) detail_level = _normalize_detail_level(detail_level) pluto_metrics = self._run_side( "Pluto", lambda: self.pluto.run( query, selected_doc_ids=selected_doc_ids, detail_level=detail_level, ), evidence_checked=True, ) baseline_metrics = self._run_side( "Baseline", lambda: self.baseline.run( query, selected_doc_ids=selected_doc_ids, detail_level=detail_level, ), evidence_checked=False, ) winner = "Unavailable" if not pluto_metrics.get("error") and ( baseline_metrics.get("error") or pluto_metrics["confidence"] >= baseline_metrics["confidence"] ): winner = "Pluto" elif not baseline_metrics.get("error"): winner = "Baseline" return { "query": query, "detail_level": detail_level, "selected_doc_ids": selected_doc_ids, "pluto": pluto_metrics, "baseline": baseline_metrics, "winner": winner, } def _run_side(self, label: str, runner, evidence_checked: bool) -> dict: start_time = time.time() try: result = runner() return { "latency_s": round(time.time() - start_time, 2), "confidence": round(result.confidence, 2), "evidence_count": len(result.evidence), "chunks_processed": result.trace_summary.chunks_processed, "evidence_checked": evidence_checked, "answer_preview": (result.final_answer.response or "")[:300], "models_used": result.trace_summary.models_used, "real_switching": result.trace_summary.real_switching if evidence_checked else False, "error": None, } except Exception as exc: return { "latency_s": round(time.time() - start_time, 2), "confidence": 0.0, "evidence_count": 0, "chunks_processed": 0, "evidence_checked": evidence_checked, "answer_preview": f"{label} failed: {exc}"[:300], "models_used": [], "real_switching": False, "error": str(exc), } if __name__ == "__main__": import json runner = ComparisonRunner("./corpus") results = runner.compare("What is this paper about?") print(json.dumps(results, indent=2))