import json import time import logging from pathlib import Path # Imports are moved inside functions to avoid dependency issues on startup # Configuration: Toggle which pipelines to run RUN_BASELINE = False RUN_HYBRID = True RUN_GRAPH = False START_QUESTION = 16 # 1-indexed. Set to resume from a specific question (e.g. 9 to skip Q1-8) LIMIT_QUESTIONS = None # Set to None to run all questions from START_QUESTION onward # Evaluation Config GOLDEN_SET_PATH = "evaluation_set.json" RESULTS_PATH = "benchmark_results.json" logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("sentence_transformers").setLevel(logging.WARNING) logging.getLogger("huggingface_hub").setLevel(logging.WARNING) logger = logging.getLogger(__name__) # ------------------------------------------------------------------ # Incremental file I/O — every question saves immediately # ------------------------------------------------------------------ def _load_results() -> dict: """Load existing results from disk.""" if Path(RESULTS_PATH).exists(): try: with open(RESULTS_PATH, "r") as f: return json.load(f) except (json.JSONDecodeError, KeyError): pass return {"detailed_results": {}} def _save_one_result(pipeline_name: str, result: dict): """Append a single result to disk immediately (crash-safe). Merges by question_number so re-runs overwrite stale entries. """ data = _load_results() detailed = data.get("detailed_results", {}) # Get or create the list for this pipeline entries = detailed.get(pipeline_name, []) # Merge: replace if same question_number exists, else append qn = result.get("question_number") replaced = False for i, existing in enumerate(entries): if existing.get("question_number") == qn: entries[i] = result replaced = True break if not replaced: entries.append(result) # Sort by question number entries.sort(key=lambda r: r.get("question_number", 0)) detailed[pipeline_name] = entries # Write back (no summary yet — that comes at the end) data["detailed_results"] = detailed with open(RESULTS_PATH, "w") as f: json.dump(data, f, indent=4) def _compute_and_save_summary(): """Read all results, compute batch-averaged metrics (Official Bulk BERTScore), and write summary.""" data = _load_results() results_dict = data.get("detailed_results", {}) if not results_dict: return from services.metrics_service import MetricsService metrics_svc = MetricsService() summary = {} for pipe_name, entries in results_dict.items(): if not entries: continue # Only score entries that have a judge result valid_entries = [e for e in entries if e.get("judge") is not None] if not valid_entries: continue passes = [e for e in valid_entries if e.get("judge") == "PASS"] total_scored = len(valid_entries) # Collective BERTScore (Batch mode as per Notion) preds = [e["answer"] for e in valid_entries if e.get("answer")] # Map questions to correct answers from evaluation_set for bulk score with open(GOLDEN_SET_PATH, "r") as f: eval_set = json.load(f) ref_map = {item["question"]: (item.get("correct_answer") or item.get("exact_answer")) for item in eval_set} refs = [ref_map.get(e["question"]) for e in valid_entries if e.get("answer")] bulk_bert = 0.0 if preds and refs: try: # Direct batch calculation to satisfy Notion "Bulk" requirement bert_results = metrics_svc.bertscore.compute( predictions=preds, references=refs, lang="en", rescale_with_baseline=True ) bulk_bert = float(sum(bert_results["f1"]) / len(bert_results["f1"])) except Exception as e: logger.error(f"Error calculating bulk BERTScore for {pipe_name}: {e}") avg_latency = sum(e.get("latency", 0) for e in valid_entries) / total_scored summary[pipe_name] = { "pass_rate": f"{(len(passes)/total_scored*100):.1f}%", "avg_bert_score": f"{bulk_bert:.4f}", "avg_latency": f"{avg_latency:.2f}s", "total_questions": total_scored, "scored_questions": total_scored } print("="*50) for name, stats in summary.items(): print(f"\nPipeline: {name}") print(f" - Pass Rate: {stats['pass_rate']}") print(f" - Bulk BERTScore: {stats['avg_bert_score']} (Threshold for bonus: 0.55)") print(f" - Avg Latency: {stats['avg_latency']}") print("="*50) print(f"Full results saved to {RESULTS_PATH}") def _print_checkpoint(questions_done: int): """Print a quick running summary from whatever's on disk.""" data = _load_results() detailed = data.get("detailed_results", {}) for name, entries in detailed.items(): valid = [r for r in entries if r.get("judge") is not None and r.get("bert_score") is not None] if not valid: continue passes = sum(1 for r in valid if r["judge"] == "PASS") avg_bert = sum(float(r["bert_score"]) for r in valid) / len(valid) pct = (passes / len(valid)) * 100 print(f"\nšŸ“Š CHECKPOINT after {questions_done} questions " f"({len(valid)} scored) — {name}") print(f" Pass Rate: {pct:.1f}% | Avg BERT: {avg_bert:.4f}") # ------------------------------------------------------------------ # Rate limit detection # ------------------------------------------------------------------ def _is_rate_limit(text: str) -> bool: text = str(text).lower() return "429" in text and ("rate_limit" in text or "rate limit" in text) # ------------------------------------------------------------------ # Main # ------------------------------------------------------------------ def load_golden_set(): with open(GOLDEN_SET_PATH, "r") as f: return json.load(f) def run_benchmark(): golden_set = load_golden_set() # Apply start offset (1-indexed → 0-indexed) start_idx = max(0, START_QUESTION - 1) golden_set = golden_set[start_idx:] if LIMIT_QUESTIONS: golden_set = golden_set[:LIMIT_QUESTIONS] total = len(golden_set) logger.info(f"Running benchmark on questions {START_QUESTION} to {START_QUESTION + total - 1} " f"({total} questions)...") # Initialize Pipelines pipelines = {} if RUN_BASELINE: from core.pipeline_1.logic import PipelineLLMOnly logger.info("Initializing Baseline Pipeline...") pipelines["Baseline (LLM Only)"] = PipelineLLMOnly(top_n=5, max_full_text=5) if RUN_HYBRID: from core.pipeline_2.logic import PipelineRAG logger.info("Initializing Hybrid RAG Pipeline...") pipelines["Hybrid RAG (Pipeline 2)"] = PipelineRAG(retrieval_top_k=50, rerank_top_n=10) if RUN_GRAPH: from core.pipeline_3.logic import PipelineGraphRAG logger.info("Initializing GraphRAG Pipeline...") pipelines["GraphRAG (Pipeline 3)"] = PipelineGraphRAG(rerank_top_n=10) logger.info("Warming up BERTScore model (avoids cold-start penalty on Q1)...") from services.metrics_service import MetricsService as _MS _ms_warmup = _MS() _ms_warmup.bertscore.compute(predictions=["warmup"], references=["warmup"], lang="en", rescale_with_baseline=True) logger.info("BERTScore ready.") rate_limited = False questions_done = 0 for i, item in enumerate(golden_set): q_num = start_idx + i + 1 question = item["question"] correct_answer = item.get("correct_answer") or item.get("exact_answer") q_type = item.get("type", "factual") logger.info(f"\n[Q{q_num}/{start_idx + total}] Testing: {question[:80]}...") for name, pipe in pipelines.items(): logger.info(f"Running {name}...") try: output = pipe.run(question, ground_truth=correct_answer) # Rate limit check if _is_rate_limit(output.get("error", "")): logger.warning(f"šŸ›‘ Rate limit hit on Q{q_num}. Saving and stopping.") rate_limited = True break # Extract metrics and save immediately m = output.get("metrics", {}) res = { "question_number": q_num, "question": question, "type": q_type, "answer": output.get("answer"), "judge": m.get("accuracy", {}).get("llm_judge"), "bert_score": m.get("accuracy", {}).get("bert_score"), "latency": m.get("latency_seconds"), "cost": m.get("cost_usd"), "tokens": m.get("tokens", {}).get("total"), } # šŸ”’ Save to disk RIGHT NOW — crash-safe _save_one_result(name, res) logger.info(f"āœ… Q{q_num} saved | {res['judge']} | BERT: {res['bert_score']}") except Exception as e: if _is_rate_limit(str(e)): logger.warning(f"šŸ›‘ Rate limit exception on Q{q_num}. Saving and stopping.") rate_limited = True break logger.error(f"Error running {name} on Q{q_num}: {e}") if rate_limited: break questions_done += 1 # ── Checkpoint every 5 questions ── if questions_done % 5 == 0: _print_checkpoint(questions_done) # Compute final averages _compute_and_save_summary() if rate_limited: logger.info(f"\nšŸ’” To resume, set START_QUESTION = {q_num} in run_benchmarks.py") if __name__ == "__main__": run_benchmark()