TigerGraph-Hack / run_benchmarks.py
Meshyboi's picture
Upload 27 files
90645a4 verified
Raw
History Blame Contribute Delete
10.3 kB
import json
import time
import logging
from pathlib import Path
# Imports are moved inside functions to avoid dependency issues on startup
# Configuration: Toggle which pipelines to run
RUN_BASELINE = False
RUN_HYBRID = True
RUN_GRAPH = False
START_QUESTION = 16 # 1-indexed. Set to resume from a specific question (e.g. 9 to skip Q1-8)
LIMIT_QUESTIONS = None # Set to None to run all questions from START_QUESTION onward
# Evaluation Config
GOLDEN_SET_PATH = "evaluation_set.json"
RESULTS_PATH = "benchmark_results.json"
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("sentence_transformers").setLevel(logging.WARNING)
logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# Incremental file I/O β€” every question saves immediately
# ------------------------------------------------------------------
def _load_results() -> dict:
"""Load existing results from disk."""
if Path(RESULTS_PATH).exists():
try:
with open(RESULTS_PATH, "r") as f:
return json.load(f)
except (json.JSONDecodeError, KeyError):
pass
return {"detailed_results": {}}
def _save_one_result(pipeline_name: str, result: dict):
"""Append a single result to disk immediately (crash-safe).
Merges by question_number so re-runs overwrite stale entries.
"""
data = _load_results()
detailed = data.get("detailed_results", {})
# Get or create the list for this pipeline
entries = detailed.get(pipeline_name, [])
# Merge: replace if same question_number exists, else append
qn = result.get("question_number")
replaced = False
for i, existing in enumerate(entries):
if existing.get("question_number") == qn:
entries[i] = result
replaced = True
break
if not replaced:
entries.append(result)
# Sort by question number
entries.sort(key=lambda r: r.get("question_number", 0))
detailed[pipeline_name] = entries
# Write back (no summary yet β€” that comes at the end)
data["detailed_results"] = detailed
with open(RESULTS_PATH, "w") as f:
json.dump(data, f, indent=4)
def _compute_and_save_summary():
"""Read all results, compute batch-averaged metrics (Official Bulk BERTScore), and write summary."""
data = _load_results()
results_dict = data.get("detailed_results", {})
if not results_dict:
return
from services.metrics_service import MetricsService
metrics_svc = MetricsService()
summary = {}
for pipe_name, entries in results_dict.items():
if not entries:
continue
# Only score entries that have a judge result
valid_entries = [e for e in entries if e.get("judge") is not None]
if not valid_entries:
continue
passes = [e for e in valid_entries if e.get("judge") == "PASS"]
total_scored = len(valid_entries)
# Collective BERTScore (Batch mode as per Notion)
preds = [e["answer"] for e in valid_entries if e.get("answer")]
# Map questions to correct answers from evaluation_set for bulk score
with open(GOLDEN_SET_PATH, "r") as f:
eval_set = json.load(f)
ref_map = {item["question"]: (item.get("correct_answer") or item.get("exact_answer")) for item in eval_set}
refs = [ref_map.get(e["question"]) for e in valid_entries if e.get("answer")]
bulk_bert = 0.0
if preds and refs:
try:
# Direct batch calculation to satisfy Notion "Bulk" requirement
bert_results = metrics_svc.bertscore.compute(
predictions=preds,
references=refs,
lang="en",
rescale_with_baseline=True
)
bulk_bert = float(sum(bert_results["f1"]) / len(bert_results["f1"]))
except Exception as e:
logger.error(f"Error calculating bulk BERTScore for {pipe_name}: {e}")
avg_latency = sum(e.get("latency", 0) for e in valid_entries) / total_scored
summary[pipe_name] = {
"pass_rate": f"{(len(passes)/total_scored*100):.1f}%",
"avg_bert_score": f"{bulk_bert:.4f}",
"avg_latency": f"{avg_latency:.2f}s",
"total_questions": total_scored,
"scored_questions": total_scored
}
print("="*50)
for name, stats in summary.items():
print(f"\nPipeline: {name}")
print(f" - Pass Rate: {stats['pass_rate']}")
print(f" - Bulk BERTScore: {stats['avg_bert_score']} (Threshold for bonus: 0.55)")
print(f" - Avg Latency: {stats['avg_latency']}")
print("="*50)
print(f"Full results saved to {RESULTS_PATH}")
def _print_checkpoint(questions_done: int):
"""Print a quick running summary from whatever's on disk."""
data = _load_results()
detailed = data.get("detailed_results", {})
for name, entries in detailed.items():
valid = [r for r in entries
if r.get("judge") is not None and r.get("bert_score") is not None]
if not valid:
continue
passes = sum(1 for r in valid if r["judge"] == "PASS")
avg_bert = sum(float(r["bert_score"]) for r in valid) / len(valid)
pct = (passes / len(valid)) * 100
print(f"\nπŸ“Š CHECKPOINT after {questions_done} questions "
f"({len(valid)} scored) β€” {name}")
print(f" Pass Rate: {pct:.1f}% | Avg BERT: {avg_bert:.4f}")
# ------------------------------------------------------------------
# Rate limit detection
# ------------------------------------------------------------------
def _is_rate_limit(text: str) -> bool:
text = str(text).lower()
return "429" in text and ("rate_limit" in text or "rate limit" in text)
# ------------------------------------------------------------------
# Main
# ------------------------------------------------------------------
def load_golden_set():
with open(GOLDEN_SET_PATH, "r") as f:
return json.load(f)
def run_benchmark():
golden_set = load_golden_set()
# Apply start offset (1-indexed β†’ 0-indexed)
start_idx = max(0, START_QUESTION - 1)
golden_set = golden_set[start_idx:]
if LIMIT_QUESTIONS:
golden_set = golden_set[:LIMIT_QUESTIONS]
total = len(golden_set)
logger.info(f"Running benchmark on questions {START_QUESTION} to {START_QUESTION + total - 1} "
f"({total} questions)...")
# Initialize Pipelines
pipelines = {}
if RUN_BASELINE:
from core.pipeline_1.logic import PipelineLLMOnly
logger.info("Initializing Baseline Pipeline...")
pipelines["Baseline (LLM Only)"] = PipelineLLMOnly(top_n=5, max_full_text=5)
if RUN_HYBRID:
from core.pipeline_2.logic import PipelineRAG
logger.info("Initializing Hybrid RAG Pipeline...")
pipelines["Hybrid RAG (Pipeline 2)"] = PipelineRAG(retrieval_top_k=50, rerank_top_n=10)
if RUN_GRAPH:
from core.pipeline_3.logic import PipelineGraphRAG
logger.info("Initializing GraphRAG Pipeline...")
pipelines["GraphRAG (Pipeline 3)"] = PipelineGraphRAG(rerank_top_n=10)
logger.info("Warming up BERTScore model (avoids cold-start penalty on Q1)...")
from services.metrics_service import MetricsService as _MS
_ms_warmup = _MS()
_ms_warmup.bertscore.compute(predictions=["warmup"], references=["warmup"], lang="en", rescale_with_baseline=True)
logger.info("BERTScore ready.")
rate_limited = False
questions_done = 0
for i, item in enumerate(golden_set):
q_num = start_idx + i + 1
question = item["question"]
correct_answer = item.get("correct_answer") or item.get("exact_answer")
q_type = item.get("type", "factual")
logger.info(f"\n[Q{q_num}/{start_idx + total}] Testing: {question[:80]}...")
for name, pipe in pipelines.items():
logger.info(f"Running {name}...")
try:
output = pipe.run(question, ground_truth=correct_answer)
# Rate limit check
if _is_rate_limit(output.get("error", "")):
logger.warning(f"πŸ›‘ Rate limit hit on Q{q_num}. Saving and stopping.")
rate_limited = True
break
# Extract metrics and save immediately
m = output.get("metrics", {})
res = {
"question_number": q_num,
"question": question,
"type": q_type,
"answer": output.get("answer"),
"judge": m.get("accuracy", {}).get("llm_judge"),
"bert_score": m.get("accuracy", {}).get("bert_score"),
"latency": m.get("latency_seconds"),
"cost": m.get("cost_usd"),
"tokens": m.get("tokens", {}).get("total"),
}
# πŸ”’ Save to disk RIGHT NOW β€” crash-safe
_save_one_result(name, res)
logger.info(f"βœ… Q{q_num} saved | {res['judge']} | BERT: {res['bert_score']}")
except Exception as e:
if _is_rate_limit(str(e)):
logger.warning(f"πŸ›‘ Rate limit exception on Q{q_num}. Saving and stopping.")
rate_limited = True
break
logger.error(f"Error running {name} on Q{q_num}: {e}")
if rate_limited:
break
questions_done += 1
# ── Checkpoint every 5 questions ──
if questions_done % 5 == 0:
_print_checkpoint(questions_done)
# Compute final averages
_compute_and_save_summary()
if rate_limited:
logger.info(f"\nπŸ’‘ To resume, set START_QUESTION = {q_num} in run_benchmarks.py")
if __name__ == "__main__":
run_benchmark()