ayushKishor's picture
Add Pluto memory layer and pipeline fixes
23cdeed
"""
benchmark/compare.py - Pluto vs single-model baseline comparison helpers.
"""
from __future__ import annotations
import time
from pathlib import Path
from pluto.models import FinalAnswer, FinalOutput, Section, TraceSummary
from pluto.pipeline import PipelineRunner
def _normalize_selected_doc_ids(selected_doc_ids: list[str] | None) -> list[str]:
seen: set[str] = set()
normalized: list[str] = []
for raw_doc_id in selected_doc_ids or []:
doc_id = str(raw_doc_id or "").strip()
if not doc_id or doc_id in seen:
continue
seen.add(doc_id)
normalized.append(doc_id)
return normalized
def _normalize_detail_level(detail_level: str | None) -> str:
return "detailed" if str(detail_level or "").strip().lower() == "detailed" else "standard"
class SimpleRunner:
"""
Single-model baseline: one LLM call over top keyword-matched chunks.
No routing, no extraction schema, no evidence check.
"""
def __init__(self, corpus_dir: str, doc_index=None):
self.corpus_dir = Path(corpus_dir)
self.doc_index = doc_index
def run(
self,
query: str,
selected_doc_ids: list[str] | None = None,
detail_level: str = "standard",
) -> FinalOutput:
from pluto.dispatcher import dispatch
from pluto.modes import MODE_REGISTRY
selected_doc_ids = _normalize_selected_doc_ids(selected_doc_ids)
detail_level = _normalize_detail_level(detail_level)
selected_doc_set = set(selected_doc_ids)
query_words = {word for word in query.lower().split() if word}
chunks: list[str] = []
for md_file in sorted(self.corpus_dir.glob("*.md")):
if selected_doc_set and md_file.stem not in selected_doc_set:
continue
text = md_file.read_text(encoding="utf-8", errors="replace")
parts = [text[i : i + 1000] for i in range(0, len(text), 1000)]
scored = sorted(
parts,
key=lambda part: sum(1 for word in query_words if word in part.lower()),
reverse=True,
)
per_doc_limit = 3 if detail_level == "detailed" else 2
chunks.extend(scored[:per_doc_limit])
chunk_cap = 8 if detail_level == "detailed" else 5
top_chunks = chunks[:chunk_cap]
quick_model = MODE_REGISTRY["MODE_QUICK"].model_id
if not top_chunks:
return FinalOutput(
final_answer=FinalAnswer(response="No documents found in corpus.", sections=[]),
evidence=[],
trace_summary=TraceSummary(
real_switching=False,
modes_used_counts={"MODE_QUICK": 1},
models_used=[quick_model],
chunks_processed=0,
search_queries=[query],
budget_notes=f"Baseline ({detail_level} mode)",
),
confidence=0.0,
)
context = "\n\n---\n\n".join(top_chunks)
detail_instruction = (
"Provide a thorough, evidence-grounded answer with methodology, findings, limitations, and implications when available."
if detail_level == "detailed"
else "Provide a clear, direct answer."
)
prompt = f"""Answer the following question based ONLY on the provided context.
QUESTION: {query}
CONTEXT:
{context[:7000]}
{detail_instruction}
If the context does not contain enough information, say so."""
try:
response = dispatch("MODE_QUICK", prompt)
except Exception as exc:
response = f"Baseline LLM call failed: {exc}"
return FinalOutput(
final_answer=FinalAnswer(
response=response,
sections=[Section(title="Answer", content=response)],
),
evidence=[],
trace_summary=TraceSummary(
real_switching=False,
modes_used_counts={"MODE_QUICK": 1},
models_used=[quick_model],
chunks_processed=len(top_chunks),
search_queries=[query],
budget_notes=f"Baseline ({detail_level} mode)",
),
confidence=0.5,
)
class ComparisonRunner:
"""Run Pluto vs baseline and return comparable metrics."""
def __init__(self, corpus_dir: str, doc_index=None):
self.pluto = PipelineRunner(corpus_dir, doc_index=doc_index)
self.baseline = SimpleRunner(corpus_dir, doc_index=doc_index)
def compare(
self,
query: str,
selected_doc_ids: list[str] | None = None,
detail_level: str = "standard",
) -> dict:
selected_doc_ids = _normalize_selected_doc_ids(selected_doc_ids)
detail_level = _normalize_detail_level(detail_level)
pluto_metrics = self._run_side(
"Pluto",
lambda: self.pluto.run(
query,
selected_doc_ids=selected_doc_ids,
detail_level=detail_level,
),
evidence_checked=True,
)
baseline_metrics = self._run_side(
"Baseline",
lambda: self.baseline.run(
query,
selected_doc_ids=selected_doc_ids,
detail_level=detail_level,
),
evidence_checked=False,
)
winner = "Unavailable"
if not pluto_metrics.get("error") and (
baseline_metrics.get("error") or pluto_metrics["confidence"] >= baseline_metrics["confidence"]
):
winner = "Pluto"
elif not baseline_metrics.get("error"):
winner = "Baseline"
return {
"query": query,
"detail_level": detail_level,
"selected_doc_ids": selected_doc_ids,
"pluto": pluto_metrics,
"baseline": baseline_metrics,
"winner": winner,
}
def _run_side(self, label: str, runner, evidence_checked: bool) -> dict:
start_time = time.time()
try:
result = runner()
return {
"latency_s": round(time.time() - start_time, 2),
"confidence": round(result.confidence, 2),
"evidence_count": len(result.evidence),
"chunks_processed": result.trace_summary.chunks_processed,
"evidence_checked": evidence_checked,
"answer_preview": (result.final_answer.response or "")[:300],
"models_used": result.trace_summary.models_used,
"real_switching": result.trace_summary.real_switching if evidence_checked else False,
"error": None,
}
except Exception as exc:
return {
"latency_s": round(time.time() - start_time, 2),
"confidence": 0.0,
"evidence_count": 0,
"chunks_processed": 0,
"evidence_checked": evidence_checked,
"answer_preview": f"{label} failed: {exc}"[:300],
"models_used": [],
"real_switching": False,
"error": str(exc),
}
if __name__ == "__main__":
import json
runner = ComparisonRunner("./corpus")
results = runner.compare("What is this paper about?")
print(json.dumps(results, indent=2))