| """ |
| Phase 3 β Evaluation Framework |
| ================================ |
| Multi-metric evaluation pipeline: |
| β’ ROUGE (rouge1, rouge2, rougeL) |
| β’ BERTScore |
| β’ Faithfulness / hallucination detection |
| β’ LLM-as-Judge (model comparison) |
| β’ Knowledge gap detector (feeds Phase 4) |
| |
| Usage: |
| python -m phase3_evaluation.evaluate --model ./merged --dataset squad --output ./evals |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import re |
| from dataclasses import dataclass, field, asdict |
| from pathlib import Path |
| from typing import Optional |
|
|
| import torch |
| import typer |
| from rich.console import Console |
| from rich.table import Table |
| from rich.progress import track |
|
|
| from configs.settings import ( |
| EVAL_DIR, ROUGE_TYPES, JUDGE_MODEL, CFG, HF_TOKEN |
| ) |
| from utils.logger import logger |
|
|
| app = typer.Typer(help="Phase 3: Evaluation framework") |
| console = Console() |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Sample: |
| id: str |
| context: str |
| question: str |
| reference: str |
| prediction: str = "" |
|
|
|
|
| @dataclass |
| class SampleScore: |
| sample_id: str |
| rouge1: float = 0.0 |
| rouge2: float = 0.0 |
| rougeL: float = 0.0 |
| bertscore_f1: float = 0.0 |
| faithfulness: float = 0.0 |
| hallucinated: bool = False |
| judge_score: float = 0.0 |
| gap_signals: list[str] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class EvalResult: |
| model_id: str |
| dataset: str |
| n_samples: int |
| avg_rouge1: float = 0.0 |
| avg_rouge2: float = 0.0 |
| avg_rougeL: float = 0.0 |
| avg_bertscore: float = 0.0 |
| avg_faithfulness: float = 0.0 |
| hallucination_rate: float = 0.0 |
| avg_judge_score: float = 0.0 |
| gap_categories: list[str] = field(default_factory=list) |
| sample_scores: list[SampleScore] = field(default_factory=list) |
|
|
|
|
| |
| |
| |
|
|
| class RougeScorer: |
| def __init__(self): |
| from rouge_score import rouge_scorer as rs |
| self._scorer = rs.RougeScorer(ROUGE_TYPES, use_stemmer=True) |
|
|
| def score(self, prediction: str, reference: str) -> dict[str, float]: |
| scores = self._scorer.score(reference, prediction) |
| return {k: scores[k].fmeasure for k in ROUGE_TYPES} |
|
|
|
|
| |
| |
| |
|
|
| class BertScorer: |
| def __init__(self, lang: str = "en", device: Optional[str] = None): |
| self.lang = lang |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
| self._model = None |
|
|
| def _load(self): |
| if self._model is None: |
| from bert_score import BERTScorer |
| self._model = BERTScorer(lang=self.lang, device=self.device, rescale_with_baseline=True) |
|
|
| def score_batch(self, predictions: list[str], references: list[str]) -> list[float]: |
| self._load() |
| _, _, f1 = self._model.score(predictions, references) |
| return f1.tolist() |
|
|
|
|
| |
| |
| |
|
|
| HALLUCINATION_PATTERNS = [ |
| r"\b(definitely|certainly|always|never|everyone|nobody)\b", |
| r"\b(100%|absolutely|guaranteed)\b", |
| r"\bin \d{4},? .{0,30} (invented|discovered|created|founded)\b", |
| ] |
|
|
| class FaithfulnessChecker: |
| """ |
| Lightweight faithfulness checker using: |
| 1. Heuristic hallucination patterns |
| 2. NLI-based entailment (context β prediction) |
| """ |
|
|
| def __init__(self): |
| self._nli = None |
|
|
| def _load_nli(self): |
| if self._nli is None: |
| from transformers import pipeline |
| self._nli = pipeline( |
| "text-classification", |
| model="cross-encoder/nli-deberta-v3-small", |
| device=0 if torch.cuda.is_available() else -1, |
| ) |
|
|
| def _heuristic_hallucination(self, prediction: str) -> bool: |
| return any(re.search(p, prediction, re.IGNORECASE) for p in HALLUCINATION_PATTERNS) |
|
|
| def check(self, context: str, prediction: str) -> tuple[float, bool]: |
| """ |
| Returns (faithfulness_score 0-1, is_hallucinated). |
| faithfulness_score = NLI entailment probability. |
| """ |
| hallucinated = self._heuristic_hallucination(prediction) |
| try: |
| self._load_nli() |
| pair = f"{context[:512]} [SEP] {prediction[:256]}" |
| result = self._nli(pair, truncation=True, max_length=512)[0] |
| label = result["label"].upper() |
| score = result["score"] |
| faith = score if label == "ENTAILMENT" else (1.0 - score if label == "CONTRADICTION" else 0.5) |
| if faith < 0.3: |
| hallucinated = True |
| except Exception as e: |
| logger.warning(f"NLI check failed: {e}") |
| faith = 0.5 |
| return faith, hallucinated |
|
|
| def check_batch(self, contexts: list[str], predictions: list[str]) -> list[tuple[float, bool]]: |
| return [self.check(c, p) for c, p in zip(contexts, predictions)] |
|
|
|
|
| |
| |
| |
|
|
| JUDGE_PROMPT = """You are an expert evaluator. Score the following answer on a scale of 0-10. |
| |
| Question: {question} |
| Reference Answer: {reference} |
| Model Answer: {prediction} |
| |
| Evaluation criteria: |
| - Accuracy (does it match the reference?) |
| - Completeness (does it cover all key points?) |
| - Clarity (is it well-expressed?) |
| - Faithfulness (no hallucinations?) |
| |
| Respond ONLY with: SCORE: <integer 0-10>\nREASON: <one sentence> |
| """ |
|
|
| class LLMJudge: |
| def __init__(self, judge_model_id: str = JUDGE_MODEL): |
| self.model_id = judge_model_id |
| self._pipe = None |
|
|
| def _load(self): |
| if self._pipe is None: |
| from transformers import pipeline, BitsAndBytesConfig |
| bnb = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16) |
| self._pipe = pipeline( |
| "text-generation", |
| model=self.model_id, |
| model_kwargs={"quantization_config": bnb}, |
| device_map="auto", |
| token=HF_TOKEN or None, |
| trust_remote_code=True, |
| ) |
|
|
| def score(self, question: str, reference: str, prediction: str) -> tuple[float, str]: |
| """Returns (score 0-10, reason string).""" |
| try: |
| self._load() |
| prompt = JUDGE_PROMPT.format( |
| question=question, reference=reference, prediction=prediction |
| ) |
| out = self._pipe(prompt, max_new_tokens=64, do_sample=False)[0]["generated_text"] |
| |
| score_match = re.search(r"SCORE:\s*(\d+)", out) |
| reason_match = re.search(r"REASON:\s*(.+)", out) |
| score = int(score_match.group(1)) if score_match else 5 |
| reason = reason_match.group(1).strip() if reason_match else "" |
| return min(10.0, max(0.0, float(score))), reason |
| except Exception as e: |
| logger.warning(f"Judge scoring failed: {e}") |
| return 5.0, "" |
|
|
| def compare_models( |
| self, |
| question: str, reference: str, |
| pred_a: str, pred_b: str, |
| model_a_name: str = "A", model_b_name: str = "B", |
| ) -> dict: |
| score_a, _ = self.score(question, reference, pred_a) |
| score_b, _ = self.score(question, reference, pred_b) |
| winner = model_a_name if score_a >= score_b else model_b_name |
| return { |
| "winner": winner, |
| model_a_name: score_a, |
| model_b_name: score_b, |
| "delta": abs(score_a - score_b), |
| } |
|
|
|
|
| |
| |
| |
|
|
| GAP_THRESHOLDS = { |
| "rouge": 0.30, |
| "bertscore": 0.50, |
| "faithfulness": 0.50, |
| "judge": 5.0, |
| } |
|
|
| GAP_CATEGORY_KEYWORDS = { |
| "factual_recall": ["who", "when", "where", "what year", "which country"], |
| "multi_step_reasoning": ["therefore", "because", "if", "then", "step"], |
| "numerical": ["how many", "calculate", "percent", "number", "count"], |
| "code_generation": ["write a function", "implement", "code", "python", "sql"], |
| "summarization": ["summarize", "summary", "main points", "briefly"], |
| } |
|
|
|
|
| def detect_gaps(sample_scores: list[SampleScore], samples: list[Sample]) -> list[str]: |
| """ |
| Analyze per-sample scores to identify systematic weaknesses. |
| Returns list of gap category labels. |
| """ |
| gap_counts: dict[str, int] = {} |
| failing: list[int] = [] |
|
|
| for i, ss in enumerate(sample_scores): |
| is_failing = ( |
| ss.rouge1 < GAP_THRESHOLDS["rouge"] or |
| ss.bertscore_f1 < GAP_THRESHOLDS["bertscore"] or |
| ss.faithfulness < GAP_THRESHOLDS["faithfulness"] or |
| ss.judge_score < GAP_THRESHOLDS["judge"] or |
| ss.hallucinated |
| ) |
| if is_failing: |
| failing.append(i) |
|
|
| for idx in failing: |
| question = (samples[idx].question + " " + samples[idx].context).lower() |
| for cat, keywords in GAP_CATEGORY_KEYWORDS.items(): |
| if any(kw in question for kw in keywords): |
| gap_counts[cat] = gap_counts.get(cat, 0) + 1 |
|
|
| |
| gaps = [cat for cat, cnt in gap_counts.items() if cnt >= 2] |
| logger.info(f"[GapDetect] Identified gaps: {gaps or ['none']}") |
| return gaps |
|
|
|
|
| |
| |
| |
|
|
| def run_inference( |
| model_id: str, |
| samples: list[Sample], |
| batch_size: int = 4, |
| ) -> list[Sample]: |
| """Fill sample.prediction for each sample using the target model.""" |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig |
| bnb = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16) |
| tok = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN or None, trust_remote_code=True) |
| if tok.pad_token is None: |
| tok.pad_token = tok.eos_token |
|
|
| model = AutoModelForCausalLM.from_pretrained( |
| model_id, |
| quantization_config=bnb, |
| device_map="auto", |
| token=HF_TOKEN or None, |
| trust_remote_code=True, |
| ) |
| model.eval() |
|
|
| for i in range(0, len(samples), batch_size): |
| batch = samples[i : i + batch_size] |
| prompts = [ |
| f"Context: {s.context}\nQuestion: {s.question}\nAnswer:" |
| for s in batch |
| ] |
| inputs = tok(prompts, return_tensors="pt", padding=True, |
| truncation=True, max_length=1024).to(model.device) |
| with torch.no_grad(): |
| out_ids = model.generate(**inputs, max_new_tokens=128, do_sample=False) |
| for j, sample in enumerate(batch): |
| new_tokens = out_ids[j][inputs["input_ids"].shape[1]:] |
| sample.prediction = tok.decode(new_tokens, skip_special_tokens=True).strip() |
|
|
| del model |
| torch.cuda.empty_cache() |
| return samples |
|
|
|
|
| |
| |
| |
|
|
| def evaluate( |
| model_id: str, |
| samples: list[Sample], |
| dataset_name: str = "custom", |
| run_judge: bool = True, |
| save: bool = True, |
| ) -> EvalResult: |
| """ |
| Full evaluation pipeline for one model on a dataset. |
| """ |
| n = len(samples) |
| logger.info(f"[Eval] Evaluating '{model_id}' on {n} samples...") |
|
|
| |
| samples = run_inference(model_id, samples) |
|
|
| |
| rouge = RougeScorer() |
| bert = BertScorer() |
| faith = FaithfulnessChecker() |
| judge = LLMJudge() if run_judge else None |
|
|
| |
| predictions = [s.prediction for s in samples] |
| references = [s.reference for s in samples] |
|
|
| |
| bs_f1s = bert.score_batch(predictions, references) |
|
|
| sample_scores: list[SampleScore] = [] |
|
|
| for i, s in enumerate(track(samples, description="Scoring")): |
| rscores = rouge.score(s.prediction, s.reference) |
| faith_sc, hallu = faith.check(s.context, s.prediction) |
| j_score = 5.0 |
| if judge: |
| j_score, _ = judge.score(s.question, s.reference, s.prediction) |
|
|
| ss = SampleScore( |
| sample_id = s.id, |
| rouge1 = rscores["rouge1"], |
| rouge2 = rscores["rouge2"], |
| rougeL = rscores["rougeL"], |
| bertscore_f1 = bs_f1s[i], |
| faithfulness = faith_sc, |
| hallucinated = hallu, |
| judge_score = j_score, |
| ) |
| sample_scores.append(ss) |
|
|
| |
| def avg(lst): return sum(lst) / len(lst) if lst else 0.0 |
|
|
| result = EvalResult( |
| model_id = model_id, |
| dataset = dataset_name, |
| n_samples = n, |
| avg_rouge1 = avg([s.rouge1 for s in sample_scores]), |
| avg_rouge2 = avg([s.rouge2 for s in sample_scores]), |
| avg_rougeL = avg([s.rougeL for s in sample_scores]), |
| avg_bertscore = avg([s.bertscore_f1 for s in sample_scores]), |
| avg_faithfulness = avg([s.faithfulness for s in sample_scores]), |
| hallucination_rate = avg([float(s.hallucinated) for s in sample_scores]), |
| avg_judge_score = avg([s.judge_score for s in sample_scores]), |
| gap_categories = detect_gaps(sample_scores, samples), |
| sample_scores = sample_scores, |
| ) |
|
|
| |
| _print_eval_table(result) |
|
|
| |
| if save: |
| out = EVAL_DIR / f"eval_{model_id.replace('/', '_')}_{dataset_name}.json" |
| with open(out, "w") as f: |
| json.dump(asdict(result), f, indent=2) |
| logger.info(f"[Eval] Results saved β {out}") |
|
|
| return result |
|
|
|
|
| def compare_models( |
| model_ids: list[str], |
| samples: list[Sample], |
| dataset_name: str = "custom", |
| ) -> list[EvalResult]: |
| """Evaluate multiple models and print a side-by-side comparison.""" |
| results = [evaluate(mid, samples, dataset_name) for mid in model_ids] |
| _print_comparison_table(results) |
| return results |
|
|
|
|
| def _print_eval_table(r: EvalResult) -> None: |
| table = Table(title=f"Evaluation β {r.model_id}", show_lines=True) |
| table.add_column("Metric", style="cyan") |
| table.add_column("Score", justify="right", style="green") |
| table.add_column("Threshold", justify="right", style="dim") |
|
|
| rows = [ |
| ("ROUGE-1", f"{r.avg_rouge1:.3f}", f"{GAP_THRESHOLDS['rouge']:.2f}"), |
| ("ROUGE-2", f"{r.avg_rouge2:.3f}", "β"), |
| ("ROUGE-L", f"{r.avg_rougeL:.3f}", "β"), |
| ("BERTScore F1", f"{r.avg_bertscore:.3f}", f"{GAP_THRESHOLDS['bertscore']:.2f}"), |
| ("Faithfulness", f"{r.avg_faithfulness:.3f}", f"{GAP_THRESHOLDS['faithfulness']:.2f}"), |
| ("Hallucination %", f"{r.hallucination_rate*100:.1f}%", "< 10%"), |
| ("Judge Score", f"{r.avg_judge_score:.1f}/10", f"{GAP_THRESHOLDS['judge']:.0f}/10"), |
| ] |
| for row in rows: |
| table.add_row(*row) |
| console.print(table) |
| if r.gap_categories: |
| console.print(f"[yellow]β Gaps detected:[/yellow] {', '.join(r.gap_categories)}") |
|
|
|
|
| def _print_comparison_table(results: list[EvalResult]) -> None: |
| table = Table(title="Model Comparison", show_lines=True) |
| table.add_column("Metric", style="cyan") |
| for r in results: |
| table.add_column(r.model_id.split("/")[-1], justify="right") |
| metrics = [ |
| ("ROUGE-1", lambda r: f"{r.avg_rouge1:.3f}"), |
| ("BERTScore", lambda r: f"{r.avg_bertscore:.3f}"), |
| ("Faithfulness",lambda r: f"{r.avg_faithfulness:.3f}"), |
| ("Halluc. %", lambda r: f"{r.hallucination_rate*100:.1f}%"), |
| ("Judge", lambda r: f"{r.avg_judge_score:.1f}"), |
| ] |
| for label, fn in metrics: |
| table.add_row(label, *[fn(r) for r in results]) |
| console.print(table) |
|
|
|
|
| |
| |
| |
|
|
| def load_squad(n: int = 100) -> list[Sample]: |
| """Load SQuAD v2 validation set as Sample list.""" |
| from datasets import load_dataset |
| ds = load_dataset("squad_v2", split=f"validation[:{n}]") |
| samples = [] |
| for i, row in enumerate(ds): |
| if not row["answers"]["text"]: |
| continue |
| samples.append(Sample( |
| id = str(i), |
| context = row["context"], |
| question = row["question"], |
| reference = row["answers"]["text"][0], |
| )) |
| return samples[:n] |
|
|
|
|
| def load_custom(path: str) -> list[Sample]: |
| """Load custom JSONL dataset: {id, context, question, reference}""" |
| with open(path) as f: |
| return [Sample(**json.loads(line)) for line in f] |
|
|
|
|
| |
| |
| |
|
|
| @app.command() |
| def run( |
| model: str = typer.Argument(..., help="Model ID or local path"), |
| dataset: str = typer.Option("squad", help="squad | custom"), |
| data_path: str = typer.Option(None, help="Path to custom JSONL (if dataset=custom)"), |
| n_samples: int = typer.Option(100, help="Number of samples to evaluate"), |
| judge: bool = typer.Option(True, "--judge/--no-judge"), |
| compare: list[str] = typer.Option([], "--compare", help="Additional models for comparison"), |
| ): |
| if dataset == "squad": |
| samples = load_squad(n_samples) |
| elif dataset == "custom" and data_path: |
| samples = load_custom(data_path)[:n_samples] |
| else: |
| raise typer.BadParameter("Provide --data-path for custom dataset") |
|
|
| if compare: |
| compare_models([model] + list(compare), samples, dataset) |
| else: |
| evaluate(model, samples, dataset, run_judge=judge) |
|
|
|
|
| if __name__ == "__main__": |
| app() |
|
|