Spaces:
Sleeping
Sleeping
File size: 10,270 Bytes
90645a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | import json
import time
import logging
from pathlib import Path
# Imports are moved inside functions to avoid dependency issues on startup
# Configuration: Toggle which pipelines to run
RUN_BASELINE = False
RUN_HYBRID = True
RUN_GRAPH = False
START_QUESTION = 16 # 1-indexed. Set to resume from a specific question (e.g. 9 to skip Q1-8)
LIMIT_QUESTIONS = None # Set to None to run all questions from START_QUESTION onward
# Evaluation Config
GOLDEN_SET_PATH = "evaluation_set.json"
RESULTS_PATH = "benchmark_results.json"
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("sentence_transformers").setLevel(logging.WARNING)
logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# Incremental file I/O β every question saves immediately
# ------------------------------------------------------------------
def _load_results() -> dict:
"""Load existing results from disk."""
if Path(RESULTS_PATH).exists():
try:
with open(RESULTS_PATH, "r") as f:
return json.load(f)
except (json.JSONDecodeError, KeyError):
pass
return {"detailed_results": {}}
def _save_one_result(pipeline_name: str, result: dict):
"""Append a single result to disk immediately (crash-safe).
Merges by question_number so re-runs overwrite stale entries.
"""
data = _load_results()
detailed = data.get("detailed_results", {})
# Get or create the list for this pipeline
entries = detailed.get(pipeline_name, [])
# Merge: replace if same question_number exists, else append
qn = result.get("question_number")
replaced = False
for i, existing in enumerate(entries):
if existing.get("question_number") == qn:
entries[i] = result
replaced = True
break
if not replaced:
entries.append(result)
# Sort by question number
entries.sort(key=lambda r: r.get("question_number", 0))
detailed[pipeline_name] = entries
# Write back (no summary yet β that comes at the end)
data["detailed_results"] = detailed
with open(RESULTS_PATH, "w") as f:
json.dump(data, f, indent=4)
def _compute_and_save_summary():
"""Read all results, compute batch-averaged metrics (Official Bulk BERTScore), and write summary."""
data = _load_results()
results_dict = data.get("detailed_results", {})
if not results_dict:
return
from services.metrics_service import MetricsService
metrics_svc = MetricsService()
summary = {}
for pipe_name, entries in results_dict.items():
if not entries:
continue
# Only score entries that have a judge result
valid_entries = [e for e in entries if e.get("judge") is not None]
if not valid_entries:
continue
passes = [e for e in valid_entries if e.get("judge") == "PASS"]
total_scored = len(valid_entries)
# Collective BERTScore (Batch mode as per Notion)
preds = [e["answer"] for e in valid_entries if e.get("answer")]
# Map questions to correct answers from evaluation_set for bulk score
with open(GOLDEN_SET_PATH, "r") as f:
eval_set = json.load(f)
ref_map = {item["question"]: (item.get("correct_answer") or item.get("exact_answer")) for item in eval_set}
refs = [ref_map.get(e["question"]) for e in valid_entries if e.get("answer")]
bulk_bert = 0.0
if preds and refs:
try:
# Direct batch calculation to satisfy Notion "Bulk" requirement
bert_results = metrics_svc.bertscore.compute(
predictions=preds,
references=refs,
lang="en",
rescale_with_baseline=True
)
bulk_bert = float(sum(bert_results["f1"]) / len(bert_results["f1"]))
except Exception as e:
logger.error(f"Error calculating bulk BERTScore for {pipe_name}: {e}")
avg_latency = sum(e.get("latency", 0) for e in valid_entries) / total_scored
summary[pipe_name] = {
"pass_rate": f"{(len(passes)/total_scored*100):.1f}%",
"avg_bert_score": f"{bulk_bert:.4f}",
"avg_latency": f"{avg_latency:.2f}s",
"total_questions": total_scored,
"scored_questions": total_scored
}
print("="*50)
for name, stats in summary.items():
print(f"\nPipeline: {name}")
print(f" - Pass Rate: {stats['pass_rate']}")
print(f" - Bulk BERTScore: {stats['avg_bert_score']} (Threshold for bonus: 0.55)")
print(f" - Avg Latency: {stats['avg_latency']}")
print("="*50)
print(f"Full results saved to {RESULTS_PATH}")
def _print_checkpoint(questions_done: int):
"""Print a quick running summary from whatever's on disk."""
data = _load_results()
detailed = data.get("detailed_results", {})
for name, entries in detailed.items():
valid = [r for r in entries
if r.get("judge") is not None and r.get("bert_score") is not None]
if not valid:
continue
passes = sum(1 for r in valid if r["judge"] == "PASS")
avg_bert = sum(float(r["bert_score"]) for r in valid) / len(valid)
pct = (passes / len(valid)) * 100
print(f"\nπ CHECKPOINT after {questions_done} questions "
f"({len(valid)} scored) β {name}")
print(f" Pass Rate: {pct:.1f}% | Avg BERT: {avg_bert:.4f}")
# ------------------------------------------------------------------
# Rate limit detection
# ------------------------------------------------------------------
def _is_rate_limit(text: str) -> bool:
text = str(text).lower()
return "429" in text and ("rate_limit" in text or "rate limit" in text)
# ------------------------------------------------------------------
# Main
# ------------------------------------------------------------------
def load_golden_set():
with open(GOLDEN_SET_PATH, "r") as f:
return json.load(f)
def run_benchmark():
golden_set = load_golden_set()
# Apply start offset (1-indexed β 0-indexed)
start_idx = max(0, START_QUESTION - 1)
golden_set = golden_set[start_idx:]
if LIMIT_QUESTIONS:
golden_set = golden_set[:LIMIT_QUESTIONS]
total = len(golden_set)
logger.info(f"Running benchmark on questions {START_QUESTION} to {START_QUESTION + total - 1} "
f"({total} questions)...")
# Initialize Pipelines
pipelines = {}
if RUN_BASELINE:
from core.pipeline_1.logic import PipelineLLMOnly
logger.info("Initializing Baseline Pipeline...")
pipelines["Baseline (LLM Only)"] = PipelineLLMOnly(top_n=5, max_full_text=5)
if RUN_HYBRID:
from core.pipeline_2.logic import PipelineRAG
logger.info("Initializing Hybrid RAG Pipeline...")
pipelines["Hybrid RAG (Pipeline 2)"] = PipelineRAG(retrieval_top_k=50, rerank_top_n=10)
if RUN_GRAPH:
from core.pipeline_3.logic import PipelineGraphRAG
logger.info("Initializing GraphRAG Pipeline...")
pipelines["GraphRAG (Pipeline 3)"] = PipelineGraphRAG(rerank_top_n=10)
logger.info("Warming up BERTScore model (avoids cold-start penalty on Q1)...")
from services.metrics_service import MetricsService as _MS
_ms_warmup = _MS()
_ms_warmup.bertscore.compute(predictions=["warmup"], references=["warmup"], lang="en", rescale_with_baseline=True)
logger.info("BERTScore ready.")
rate_limited = False
questions_done = 0
for i, item in enumerate(golden_set):
q_num = start_idx + i + 1
question = item["question"]
correct_answer = item.get("correct_answer") or item.get("exact_answer")
q_type = item.get("type", "factual")
logger.info(f"\n[Q{q_num}/{start_idx + total}] Testing: {question[:80]}...")
for name, pipe in pipelines.items():
logger.info(f"Running {name}...")
try:
output = pipe.run(question, ground_truth=correct_answer)
# Rate limit check
if _is_rate_limit(output.get("error", "")):
logger.warning(f"π Rate limit hit on Q{q_num}. Saving and stopping.")
rate_limited = True
break
# Extract metrics and save immediately
m = output.get("metrics", {})
res = {
"question_number": q_num,
"question": question,
"type": q_type,
"answer": output.get("answer"),
"judge": m.get("accuracy", {}).get("llm_judge"),
"bert_score": m.get("accuracy", {}).get("bert_score"),
"latency": m.get("latency_seconds"),
"cost": m.get("cost_usd"),
"tokens": m.get("tokens", {}).get("total"),
}
# π Save to disk RIGHT NOW β crash-safe
_save_one_result(name, res)
logger.info(f"β
Q{q_num} saved | {res['judge']} | BERT: {res['bert_score']}")
except Exception as e:
if _is_rate_limit(str(e)):
logger.warning(f"π Rate limit exception on Q{q_num}. Saving and stopping.")
rate_limited = True
break
logger.error(f"Error running {name} on Q{q_num}: {e}")
if rate_limited:
break
questions_done += 1
# ββ Checkpoint every 5 questions ββ
if questions_done % 5 == 0:
_print_checkpoint(questions_done)
# Compute final averages
_compute_and_save_summary()
if rate_limited:
logger.info(f"\nπ‘ To resume, set START_QUESTION = {q_num} in run_benchmarks.py")
if __name__ == "__main__":
run_benchmark()
|