Spaces:
Sleeping
Sleeping
| import json | |
| import time | |
| import logging | |
| from pathlib import Path | |
| # Imports are moved inside functions to avoid dependency issues on startup | |
| # Configuration: Toggle which pipelines to run | |
| RUN_BASELINE = False | |
| RUN_HYBRID = True | |
| RUN_GRAPH = False | |
| START_QUESTION = 16 # 1-indexed. Set to resume from a specific question (e.g. 9 to skip Q1-8) | |
| LIMIT_QUESTIONS = None # Set to None to run all questions from START_QUESTION onward | |
| # Evaluation Config | |
| GOLDEN_SET_PATH = "evaluation_set.json" | |
| RESULTS_PATH = "benchmark_results.json" | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| logging.getLogger("sentence_transformers").setLevel(logging.WARNING) | |
| logging.getLogger("huggingface_hub").setLevel(logging.WARNING) | |
| logger = logging.getLogger(__name__) | |
| # ------------------------------------------------------------------ | |
| # Incremental file I/O β every question saves immediately | |
| # ------------------------------------------------------------------ | |
| def _load_results() -> dict: | |
| """Load existing results from disk.""" | |
| if Path(RESULTS_PATH).exists(): | |
| try: | |
| with open(RESULTS_PATH, "r") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, KeyError): | |
| pass | |
| return {"detailed_results": {}} | |
| def _save_one_result(pipeline_name: str, result: dict): | |
| """Append a single result to disk immediately (crash-safe). | |
| Merges by question_number so re-runs overwrite stale entries. | |
| """ | |
| data = _load_results() | |
| detailed = data.get("detailed_results", {}) | |
| # Get or create the list for this pipeline | |
| entries = detailed.get(pipeline_name, []) | |
| # Merge: replace if same question_number exists, else append | |
| qn = result.get("question_number") | |
| replaced = False | |
| for i, existing in enumerate(entries): | |
| if existing.get("question_number") == qn: | |
| entries[i] = result | |
| replaced = True | |
| break | |
| if not replaced: | |
| entries.append(result) | |
| # Sort by question number | |
| entries.sort(key=lambda r: r.get("question_number", 0)) | |
| detailed[pipeline_name] = entries | |
| # Write back (no summary yet β that comes at the end) | |
| data["detailed_results"] = detailed | |
| with open(RESULTS_PATH, "w") as f: | |
| json.dump(data, f, indent=4) | |
| def _compute_and_save_summary(): | |
| """Read all results, compute batch-averaged metrics (Official Bulk BERTScore), and write summary.""" | |
| data = _load_results() | |
| results_dict = data.get("detailed_results", {}) | |
| if not results_dict: | |
| return | |
| from services.metrics_service import MetricsService | |
| metrics_svc = MetricsService() | |
| summary = {} | |
| for pipe_name, entries in results_dict.items(): | |
| if not entries: | |
| continue | |
| # Only score entries that have a judge result | |
| valid_entries = [e for e in entries if e.get("judge") is not None] | |
| if not valid_entries: | |
| continue | |
| passes = [e for e in valid_entries if e.get("judge") == "PASS"] | |
| total_scored = len(valid_entries) | |
| # Collective BERTScore (Batch mode as per Notion) | |
| preds = [e["answer"] for e in valid_entries if e.get("answer")] | |
| # Map questions to correct answers from evaluation_set for bulk score | |
| with open(GOLDEN_SET_PATH, "r") as f: | |
| eval_set = json.load(f) | |
| ref_map = {item["question"]: (item.get("correct_answer") or item.get("exact_answer")) for item in eval_set} | |
| refs = [ref_map.get(e["question"]) for e in valid_entries if e.get("answer")] | |
| bulk_bert = 0.0 | |
| if preds and refs: | |
| try: | |
| # Direct batch calculation to satisfy Notion "Bulk" requirement | |
| bert_results = metrics_svc.bertscore.compute( | |
| predictions=preds, | |
| references=refs, | |
| lang="en", | |
| rescale_with_baseline=True | |
| ) | |
| bulk_bert = float(sum(bert_results["f1"]) / len(bert_results["f1"])) | |
| except Exception as e: | |
| logger.error(f"Error calculating bulk BERTScore for {pipe_name}: {e}") | |
| avg_latency = sum(e.get("latency", 0) for e in valid_entries) / total_scored | |
| summary[pipe_name] = { | |
| "pass_rate": f"{(len(passes)/total_scored*100):.1f}%", | |
| "avg_bert_score": f"{bulk_bert:.4f}", | |
| "avg_latency": f"{avg_latency:.2f}s", | |
| "total_questions": total_scored, | |
| "scored_questions": total_scored | |
| } | |
| print("="*50) | |
| for name, stats in summary.items(): | |
| print(f"\nPipeline: {name}") | |
| print(f" - Pass Rate: {stats['pass_rate']}") | |
| print(f" - Bulk BERTScore: {stats['avg_bert_score']} (Threshold for bonus: 0.55)") | |
| print(f" - Avg Latency: {stats['avg_latency']}") | |
| print("="*50) | |
| print(f"Full results saved to {RESULTS_PATH}") | |
| def _print_checkpoint(questions_done: int): | |
| """Print a quick running summary from whatever's on disk.""" | |
| data = _load_results() | |
| detailed = data.get("detailed_results", {}) | |
| for name, entries in detailed.items(): | |
| valid = [r for r in entries | |
| if r.get("judge") is not None and r.get("bert_score") is not None] | |
| if not valid: | |
| continue | |
| passes = sum(1 for r in valid if r["judge"] == "PASS") | |
| avg_bert = sum(float(r["bert_score"]) for r in valid) / len(valid) | |
| pct = (passes / len(valid)) * 100 | |
| print(f"\nπ CHECKPOINT after {questions_done} questions " | |
| f"({len(valid)} scored) β {name}") | |
| print(f" Pass Rate: {pct:.1f}% | Avg BERT: {avg_bert:.4f}") | |
| # ------------------------------------------------------------------ | |
| # Rate limit detection | |
| # ------------------------------------------------------------------ | |
| def _is_rate_limit(text: str) -> bool: | |
| text = str(text).lower() | |
| return "429" in text and ("rate_limit" in text or "rate limit" in text) | |
| # ------------------------------------------------------------------ | |
| # Main | |
| # ------------------------------------------------------------------ | |
| def load_golden_set(): | |
| with open(GOLDEN_SET_PATH, "r") as f: | |
| return json.load(f) | |
| def run_benchmark(): | |
| golden_set = load_golden_set() | |
| # Apply start offset (1-indexed β 0-indexed) | |
| start_idx = max(0, START_QUESTION - 1) | |
| golden_set = golden_set[start_idx:] | |
| if LIMIT_QUESTIONS: | |
| golden_set = golden_set[:LIMIT_QUESTIONS] | |
| total = len(golden_set) | |
| logger.info(f"Running benchmark on questions {START_QUESTION} to {START_QUESTION + total - 1} " | |
| f"({total} questions)...") | |
| # Initialize Pipelines | |
| pipelines = {} | |
| if RUN_BASELINE: | |
| from core.pipeline_1.logic import PipelineLLMOnly | |
| logger.info("Initializing Baseline Pipeline...") | |
| pipelines["Baseline (LLM Only)"] = PipelineLLMOnly(top_n=5, max_full_text=5) | |
| if RUN_HYBRID: | |
| from core.pipeline_2.logic import PipelineRAG | |
| logger.info("Initializing Hybrid RAG Pipeline...") | |
| pipelines["Hybrid RAG (Pipeline 2)"] = PipelineRAG(retrieval_top_k=50, rerank_top_n=10) | |
| if RUN_GRAPH: | |
| from core.pipeline_3.logic import PipelineGraphRAG | |
| logger.info("Initializing GraphRAG Pipeline...") | |
| pipelines["GraphRAG (Pipeline 3)"] = PipelineGraphRAG(rerank_top_n=10) | |
| logger.info("Warming up BERTScore model (avoids cold-start penalty on Q1)...") | |
| from services.metrics_service import MetricsService as _MS | |
| _ms_warmup = _MS() | |
| _ms_warmup.bertscore.compute(predictions=["warmup"], references=["warmup"], lang="en", rescale_with_baseline=True) | |
| logger.info("BERTScore ready.") | |
| rate_limited = False | |
| questions_done = 0 | |
| for i, item in enumerate(golden_set): | |
| q_num = start_idx + i + 1 | |
| question = item["question"] | |
| correct_answer = item.get("correct_answer") or item.get("exact_answer") | |
| q_type = item.get("type", "factual") | |
| logger.info(f"\n[Q{q_num}/{start_idx + total}] Testing: {question[:80]}...") | |
| for name, pipe in pipelines.items(): | |
| logger.info(f"Running {name}...") | |
| try: | |
| output = pipe.run(question, ground_truth=correct_answer) | |
| # Rate limit check | |
| if _is_rate_limit(output.get("error", "")): | |
| logger.warning(f"π Rate limit hit on Q{q_num}. Saving and stopping.") | |
| rate_limited = True | |
| break | |
| # Extract metrics and save immediately | |
| m = output.get("metrics", {}) | |
| res = { | |
| "question_number": q_num, | |
| "question": question, | |
| "type": q_type, | |
| "answer": output.get("answer"), | |
| "judge": m.get("accuracy", {}).get("llm_judge"), | |
| "bert_score": m.get("accuracy", {}).get("bert_score"), | |
| "latency": m.get("latency_seconds"), | |
| "cost": m.get("cost_usd"), | |
| "tokens": m.get("tokens", {}).get("total"), | |
| } | |
| # π Save to disk RIGHT NOW β crash-safe | |
| _save_one_result(name, res) | |
| logger.info(f"β Q{q_num} saved | {res['judge']} | BERT: {res['bert_score']}") | |
| except Exception as e: | |
| if _is_rate_limit(str(e)): | |
| logger.warning(f"π Rate limit exception on Q{q_num}. Saving and stopping.") | |
| rate_limited = True | |
| break | |
| logger.error(f"Error running {name} on Q{q_num}: {e}") | |
| if rate_limited: | |
| break | |
| questions_done += 1 | |
| # ββ Checkpoint every 5 questions ββ | |
| if questions_done % 5 == 0: | |
| _print_checkpoint(questions_done) | |
| # Compute final averages | |
| _compute_and_save_summary() | |
| if rate_limited: | |
| logger.info(f"\nπ‘ To resume, set START_QUESTION = {q_num} in run_benchmarks.py") | |
| if __name__ == "__main__": | |
| run_benchmark() | |