| # # evaluator.py | |
| # import re | |
| # import math | |
| # import os | |
| # import numpy as np | |
| # import pandas as pd | |
| # import textstat | |
| # from typing import Tuple, Dict | |
| # # Use LanguageTool public API to avoid Java dependency in Spaces | |
| # import language_tool_python | |
| # try: | |
| # tool = language_tool_python.LanguageToolPublicAPI('en-US') | |
| # except Exception: | |
| # # final fallback: simple grammar placeholder if network issue | |
| # tool = None | |
| # # Import heavy dependencies lazily inside the hallucination detector to avoid startup OOM | |
| # HALLUCINATION_AVAILABLE = True | |
| # try: | |
| # # 'unieval' import may fail if package not installed; guard it | |
| # from unieval.metric.evaluator import get_evaluator # optional | |
| # import evaluate # required by hallucination detector | |
| # import torch | |
| # from transformers import AutoTokenizer, T5ForConditionalGeneration, AutoModelForQuestionAnswering, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM | |
| # from sentence_transformers import SentenceTransformer, util | |
| # except Exception: | |
| # HALLUCINATION_AVAILABLE = False | |
| # # ------------------------- | |
| # # Rule-based metrics | |
| # # ------------------------- | |
| # def check_instruction_following(prompt: str, response: str) -> float: | |
| # prompt = (prompt or "").lower() | |
| # response = (response or "").lower() | |
| # keywords = re.findall(r"\b\w+\b", prompt) | |
| # if not keywords: | |
| # return 0.0 | |
| # matches = sum(1 for k in set(keywords) if k in response) | |
| # return round(matches / len(set(keywords)), 3) | |
| # def check_grammar(response: str) -> Tuple[int, float]: | |
| # """ | |
| # Returns (num_matches, grammar_score_in_0_1) | |
| # grammar_score = 1 - num_matches/10 clipped | |
| # If language tool unavailable, returns (0, 0.8) as a coarse default. | |
| # """ | |
| # if not response: | |
| # return 0, 0.0 | |
| # if tool is None: | |
| # return 0, 0.8 | |
| # try: | |
| # matches = tool.check(response) | |
| # num = len(matches) | |
| # score = max(0.0, 1 - num / 10) | |
| # return num, round(score, 3) | |
| # except Exception: | |
| # return 0, 0.8 | |
| # def check_coherence(response: str) -> float: | |
| # if not response: | |
| # return 0.0 | |
| # sents = max(1, len(re.split(r"[.!?]+", response)) - 1) | |
| # words = max(1, len(re.findall(r"\w+", response))) | |
| # base = min(1.0, (words / 50.0) + (sents / 5.0)) | |
| # val = max(0.5, min(base * 0.9, 0.98)) | |
| # return round(val, 3) | |
| # def check_accuracy_embeddings(reference: str, response: str, embed_model=None) -> float: | |
| # """ | |
| # If embed_model passed and reference provided, compute cosine sim. | |
| # Otherwise return 0 or a neutral value. | |
| # """ | |
| # if not reference or not response or embed_model is None: | |
| # return 0.0 | |
| # try: | |
| # ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| # resp_emb = embed_model.encode(response, convert_to_tensor=True) | |
| # sim = float(util.cos_sim(ref_emb, resp_emb)) | |
| # sim = max(0.0, min(1.0, sim)) | |
| # return round(sim, 3) | |
| # except Exception: | |
| # return 0.0 | |
| # # ------------------------- | |
| # # Hallucination Detector wrapper | |
| # # ------------------------- | |
| # class HallucinationDetectorWrapper: | |
| # """ | |
| # Wraps the ComprehensiveHallucinationDetector logic. Loads heavy models lazily and sets | |
| # DETECTOR_AVAILABLE flag depending on success. If loading fails, methods return neutral stubs. | |
| # """ | |
| # def __init__(self): | |
| # self.ready = False | |
| # self._init_detector() | |
| # def _init_detector(self): | |
| # global HALLUCINATION_AVAILABLE | |
| # if not HALLUCINATION_AVAILABLE: | |
| # self.ready = False | |
| # return | |
| # try: | |
| # # Import inside to isolate errors | |
| # import evaluate | |
| # import torch | |
| # from transformers import AutoTokenizer, T5ForConditionalGeneration, AutoModelForQuestionAnswering, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM | |
| # from unieval.metric.evaluator import get_evaluator | |
| # # Minimal lightweight choices could be substituted here if you want smaller models | |
| # self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # # Load metrics | |
| # self.rouge = evaluate.load('rouge') | |
| # self.sacrebleu = evaluate.load('sacrebleu') | |
| # self.bertscore = evaluate.load('bertscore') | |
| # # load unieval if available | |
| # try: | |
| # self.unieval_evaluator = get_evaluator('fact') | |
| # except Exception: | |
| # self.unieval_evaluator = None | |
| # # Load QG / QA / NLI / knowledge gen models | |
| # # Note: These models may be large; this is inside try/except | |
| # try: | |
| # self.qg_tokenizer = AutoTokenizer.from_pretrained("mrm8488/t5-base-finetuned-question-generation") | |
| # self.qg_model = T5ForConditionalGeneration.from_pretrained("mrm8488/t5-base-finetuned-question-generation").to(self.device) | |
| # self.qa_tokenizer = AutoTokenizer.from_pretrained("deepset/roberta-base-squad2") | |
| # self.qa_model = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-base-squad2").to(self.device) | |
| # nli_model_name = "ynie/roberta-large-snli_mnli_fever_anli_R1_R2_R3-nli" | |
| # self.nli_tokenizer = AutoTokenizer.from_pretrained(nli_model_name) | |
| # self.nli_model = AutoModelForSequenceClassification.from_pretrained(nli_model_name).to(self.device) | |
| # judge_model_name = "google/flan-t5-large" | |
| # self.judge_tokenizer = AutoTokenizer.from_pretrained(judge_model_name) | |
| # self.judge_model = AutoModelForSeq2SeqLM.from_pretrained(judge_model_name).to(self.device) | |
| # self.ready = True | |
| # except Exception: | |
| # # If any heavy-model loading fails, disable the detector | |
| # self.ready = False | |
| # except Exception: | |
| # self.ready = False | |
| # def is_ready(self): | |
| # return self.ready | |
| # def detect(self, prompt: str, output: str) -> Dict: | |
| # """ | |
| # If ready, run the comprehensive detector and return dict of metrics. | |
| # If not ready, return neutral placeholder dict. | |
| # """ | |
| # if not self.ready: | |
| # # Neutral placeholders (so hallucination_score = 0.5 later) | |
| # return { | |
| # "knowledge_source": "", | |
| # "rouge_l": 0.0, | |
| # "sacrebleu": 0.0, | |
| # "bertscore_f1": 0.0, | |
| # "unieval_consistency": 0.0, | |
| # "q_squared_nli_contradiction": 0.5, | |
| # "critic_contradiction": 0.5 | |
| # } | |
| # # Actual detection implementation (mirrors the code you provided) | |
| # try: | |
| # # generate knowledge source using judge model | |
| # input_text = f"Provide a factual answer: {prompt}" | |
| # input_ids = self.judge_tokenizer(input_text, return_tensors="pt").input_ids.to(self.device) | |
| # outputs = self.judge_model.generate(input_ids, max_length=384, num_beams=5, early_stopping=True) | |
| # knowledge_source = self.judge_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # # n-gram & semantic | |
| # rouge_l = self.rouge.compute(predictions=[output], references=[knowledge_source])['rougeL'] | |
| # sacre = self.sacrebleu.compute(predictions=[output], references=[[knowledge_source]])['score'] / 100.0 | |
| # bert_results = self.bertscore.compute(predictions=[output], references=[knowledge_source], lang='en') | |
| # bert_f1 = np.mean(bert_results.get('f1', [0.0])) | |
| # # unieval | |
| # if self.unieval_evaluator: | |
| # try: | |
| # ue = self.unieval_evaluator.evaluate([{'source': knowledge_source, 'system_output': output}])[0]['consistency'] | |
| # except Exception: | |
| # ue = 0.0 | |
| # else: | |
| # ue = 0.0 | |
| # # q^2 | |
| # qg_input = f"generate question: {output}" | |
| # qg_input_ids = self.qg_tokenizer(qg_input, return_tensors="pt").input_ids.to(self.device) | |
| # qg_out = self.qg_model.generate(qg_input_ids, max_length=64, num_beams=4) | |
| # question = self.qg_tokenizer.decode(qg_out[0], skip_special_tokens=True) | |
| # if not question: | |
| # q2_contra = 0.5 | |
| # else: | |
| # try: | |
| # qa_inputs = self.qa_tokenizer(question, knowledge_source, return_tensors="pt").to(self.device) | |
| # with torch.no_grad(): | |
| # qa_output = self.qa_model(**qa_inputs) | |
| # answer_start = torch.argmax(qa_output.start_logits) | |
| # answer_end = torch.argmax(qa_output.end_logits) + 1 | |
| # answer_from_knowledge = self.qa_tokenizer.decode(qa_inputs["input_ids"][0][answer_start:answer_end]) | |
| # if not answer_from_knowledge: | |
| # q2_contra = 0.5 | |
| # else: | |
| # # NLI: output vs answer_from_knowledge | |
| # tokenized = self.nli_tokenizer(output, answer_from_knowledge, return_tensors='pt', truncation=True, max_length=512).to(self.device) | |
| # with torch.no_grad(): | |
| # out = self.nli_model(**tokenized) | |
| # probs = torch.softmax(out.logits, dim=1)[0].tolist() | |
| # q2_contra = probs[0] # contradiction prob | |
| # except Exception: | |
| # q2_contra = 0.5 | |
| # # critic contradiction | |
| # try: | |
| # tokenized2 = self.nli_tokenizer(knowledge_source, output, return_tensors='pt', truncation=True, max_length=512).to(self.device) | |
| # with torch.no_grad(): | |
| # out2 = self.nli_model(**tokenized2) | |
| # probs2 = torch.softmax(out2.logits, dim=1)[0].tolist() | |
| # critic_contra = probs2[0] | |
| # except Exception: | |
| # critic_contra = 0.5 | |
| # return { | |
| # "knowledge_source": knowledge_source, | |
| # "rouge_l": rouge_l, | |
| # "sacrebleu": sacre, | |
| # "bertscore_f1": bert_f1, | |
| # "unieval_consistency": ue, | |
| # "q_squared_nli_contradiction": q2_contra, | |
| # "critic_contradiction": critic_contra | |
| # } | |
| # except Exception: | |
| # # On any runtime failure, return neutral placeholders | |
| # return { | |
| # "knowledge_source": "", | |
| # "rouge_l": 0.0, | |
| # "sacrebleu": 0.0, | |
| # "bertscore_f1": 0.0, | |
| # "unieval_consistency": 0.0, | |
| # "q_squared_nli_contradiction": 0.5, | |
| # "critic_contradiction": 0.5 | |
| # } | |
| # # Singleton detector instance | |
| # _DETECTOR = None | |
| # def get_detector(): | |
| # global _DETECTOR | |
| # if _DETECTOR is None: | |
| # _DETECTOR = HallucinationDetectorWrapper() | |
| # return _DETECTOR | |
| # def hallucination_score(prompt: str, output: str) -> float: | |
| # d = get_detector() | |
| # res = d.detect(prompt, output) | |
| # weights = { | |
| # "rouge_l": 0.2, "sacrebleu": 0.05, "bertscore_f1": 0.25, | |
| # "unieval_consistency": 0.25, | |
| # "q_squared_nli_contradiction": 0.15, | |
| # "critic_contradiction": 0.10 | |
| # } | |
| # total = sum(weights.values()) | |
| # weights = {k: v/total for k, v in weights.items()} | |
| # invert_metrics = {"rouge_l", "sacrebleu", "bertscore_f1", "unieval_consistency"} | |
| # final = 0.0 | |
| # for m, w in weights.items(): | |
| # v = res.get(m, 0.0) | |
| # if m in invert_metrics: | |
| # v = 1 - v | |
| # final += w * v | |
| # # final is in [0,1], higher -> more hallucination (worse) | |
| # return float(final) | |
| # # ------------------------- | |
| # # Main evaluation function (integrate hallucination as complementary metric) | |
| # # ------------------------- | |
| # def evaluate_dataframe(df: pd.DataFrame, use_llm_judge: bool = False) -> Tuple[pd.DataFrame, list, pd.DataFrame]: | |
| # """ | |
| # Input: df with columns prompt (or instruction), response, task, agent, reference (opt) | |
| # Returns: metrics_df (per row), list of visualization image paths (path, caption), leaderboard_df | |
| # """ | |
| # # Normalize column names | |
| # df = df.rename(columns={c: c.strip() for c in df.columns}) | |
| # # Accept alternate column names | |
| # if "instruction" not in df.columns and "prompt" in df.columns: | |
| # df = df.rename(columns={"prompt": "instruction"}) | |
| # if "response" not in df.columns and "output" in df.columns: | |
| # df = df.rename(columns={"output": "response"}) | |
| # if "agent" not in df.columns: | |
| # df["agent"] = df.get("metadata", {}).apply(lambda x: x.get("agent") if isinstance(x, dict) else "Unknown") | |
| # # optional embed model for accuracy: lazy load sentence-transformers if available | |
| # embed_model = None | |
| # try: | |
| # from sentence_transformers import SentenceTransformer, util | |
| # embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| # except Exception: | |
| # embed_model = None | |
| # rows = [] | |
| # for _, r in df.iterrows(): | |
| # instr = str(r.get("instruction", "")) | |
| # response = str(r.get("response", "")) | |
| # reference = str(r.get("reference", "")) if "reference" in r else "" | |
| # agent = r.get("agent", "Unknown") | |
| # task = r.get("task", "Unknown") | |
| # inst_score = check_instruction_following(instr, response) | |
| # num_matches, grammar_score = check_grammar(response) | |
| # coh_score = check_coherence(response) | |
| # acc_emb = check_accuracy_embeddings(reference, response, embed_model) | |
| # base_components = [inst_score, coh_score, grammar_score, acc_emb] | |
| # base_final = float(sum(base_components) / max(1, len(base_components))) | |
| # row_entry = { | |
| # "Task": str(task), | |
| # "Agent": str(agent), | |
| # "Instruction": instr, | |
| # "Response": response, | |
| # "Reference": reference, | |
| # "score_instruction": inst_score, | |
| # "score_grammar": grammar_score, | |
| # "score_coherence": coh_score, | |
| # "score_accuracy": acc_emb, | |
| # "base_final_score": round(base_final, 4) | |
| # } | |
| # # optional LLM judge: compute hallucination_score | |
| # if use_llm_judge: | |
| # try: | |
| # h = hallucination_score(instr, response) | |
| # # convert to consistency (higher is better): 1 - hallucination | |
| # consistency = round(1.0 - float(h), 4) | |
| # row_entry["score_llm_consistency"] = consistency | |
| # # combine base_final and consistency (simple averaging) | |
| # final_score = round((base_final + consistency) / 2.0, 4) | |
| # row_entry["final_score"] = final_score | |
| # except Exception: | |
| # # fallback | |
| # row_entry["score_llm_consistency"] = 0.5 | |
| # row_entry["final_score"] = round(base_final, 4) | |
| # else: | |
| # row_entry["score_llm_consistency"] = np.nan | |
| # row_entry["final_score"] = round(base_final, 4) | |
| # rows.append(row_entry) | |
| # metrics_df = pd.DataFrame(rows) | |
| # # Create visualizations (saved to /tmp) | |
| # images = [] | |
| # import matplotlib.pyplot as plt | |
| # import seaborn as sns | |
| # import uuid | |
| # # Leaderboard (avg final score per agent) | |
| # try: | |
| # lb = metrics_df.groupby("Agent")["final_score"].mean().reset_index().sort_values("final_score", ascending=False) | |
| # fname = f"/tmp/{uuid.uuid4().hex}_leaderboard.png" | |
| # fig, ax = plt.subplots(figsize=(8, max(4, len(lb)*0.4))) | |
| # ax.barh(lb["Agent"], lb["final_score"], color="tab:blue") | |
| # ax.invert_yaxis() | |
| # ax.set_xlabel("Average final score") | |
| # ax.set_title("Leaderboard: Avg final score per agent") | |
| # plt.tight_layout() | |
| # fig.savefig(fname, bbox_inches="tight") | |
| # plt.close(fig) | |
| # images.append((fname, "Leaderboard (horizontal bar)")) | |
| # except Exception: | |
| # pass | |
| # # Combined spider / radar : compare all agents across metrics | |
| # try: | |
| # metric_cols = ["score_instruction", "score_coherence", "score_grammar", "score_accuracy"] | |
| # if use_llm_judge: | |
| # metric_cols.append("score_llm_consistency") | |
| # agg = metrics_df.groupby("Agent")[metric_cols].mean().reset_index() | |
| # labels = [c.replace("score_", "").replace("_", " ").capitalize() for c in metric_cols] | |
| # # Build rows as required | |
| # rows_for_plot = [] | |
| # for _, row in agg.iterrows(): | |
| # vals = [float(row[c]) * 100 for c in metric_cols] # scale to 0-100 | |
| # rows_for_plot.append({"name": row["Agent"], "values": vals}) | |
| # # draw radar using a small internal function | |
| # def spider_net_multi(labels, rows, title="Spider Chart"): | |
| # import math | |
| # N = len(labels) | |
| # angles = [n / float(N) * 2 * math.pi for n in range(N)] | |
| # angles += angles[:1] | |
| # fig = plt.figure(figsize=(6.5,6.5)) | |
| # ax = plt.subplot(111, polar=True) | |
| # ax.set_xticks(angles[:-1]) | |
| # ax.set_xticklabels(labels) | |
| # ax.set_ylim(0, 100) | |
| # for r in rows: | |
| # v = r["values"] + r["values"][:1] | |
| # ax.plot(angles, v, label=r["name"]) | |
| # ax.fill(angles, v, alpha=0.12) | |
| # ax.set_title(title) | |
| # ax.legend(loc="upper right", bbox_to_anchor=(1.3,1.1)) | |
| # return fig | |
| # fig = spider_net_multi(labels, rows_for_plot, title="All Agents Comparison (Radar)") | |
| # fname2 = f"/tmp/{uuid.uuid4().hex}_radar.png" | |
| # fig.savefig(fname2, bbox_inches="tight") | |
| # plt.close(fig) | |
| # images.append((fname2, "All agents radar chart")) | |
| # except Exception: | |
| # pass | |
| # # Per-task spider charts | |
| # try: | |
| # for task, subset in metrics_df.groupby("Task"): | |
| # agg = subset.groupby("Agent")[metric_cols].mean().reset_index() | |
| # if agg.shape[0] == 0: | |
| # continue | |
| # rows_for_plot = [] | |
| # for _, row in agg.iterrows(): | |
| # vals = [float(row[c]) * 100 for c in metric_cols] | |
| # rows_for_plot.append({"name": row["Agent"], "values": vals}) | |
| # fig = spider_net_multi(labels, rows_for_plot, title=f"{task} Agents (Radar)") | |
| # fname3 = f"/tmp/{uuid.uuid4().hex}_{task}_radar.png" | |
| # fig.savefig(fname3, bbox_inches="tight") | |
| # plt.close(fig) | |
| # images.append((fname3, f"{task} - radar")) | |
| # except Exception: | |
| # pass | |
| # # Heatmap for metric correlations | |
| # try: | |
| # metric_cols2 = ["score_instruction", "score_coherence", "score_grammar", "score_accuracy", "final_score"] | |
| # if use_llm_judge: | |
| # metric_cols2.append("score_llm_consistency") | |
| # fig, ax = plt.subplots(figsize=(7,6)) | |
| # sns.heatmap(metrics_df[metric_cols2].corr(), annot=True, fmt=".2f", cmap="coolwarm", ax=ax) | |
| # ax.set_title("Metric correlations") | |
| # fnameh = f"/tmp/{uuid.uuid4().hex}_heatmap.png" | |
| # fig.savefig(fnameh, bbox_inches="tight") | |
| # plt.close(fig) | |
| # images.append((fnameh, "Metric correlations")) | |
| # except Exception: | |
| # pass | |
| # # Leaderboard df return | |
| # leaderboard_df = metrics_df.groupby(["Agent", "Task"])["final_score"].mean().reset_index().sort_values("final_score", ascending=False) | |
| # return metrics_df, images, leaderboard_df | |
| import re | |
| import math | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Tuple, Dict | |
| # Grammar checker | |
| import language_tool_python | |
| try: | |
| tool = language_tool_python.LanguageToolPublicAPI('en-US') | |
| except Exception: | |
| tool = None # fallback if API not available | |
| # Heavy dependencies – guard unieval | |
| HALLUCINATION_AVAILABLE = True | |
| try: | |
| import evaluate | |
| import torch | |
| from transformers import ( | |
| AutoTokenizer, | |
| T5ForConditionalGeneration, | |
| AutoModelForQuestionAnswering, | |
| AutoModelForSequenceClassification, | |
| AutoModelForSeq2SeqLM | |
| ) | |
| from sentence_transformers import SentenceTransformer, util | |
| try: | |
| from unieval.metric.evaluator import get_evaluator # optional | |
| UNIEVAL_AVAILABLE = True | |
| except ImportError: | |
| print("[Warning] UniEval not installed – skipping UniEval metrics.") | |
| UNIEVAL_AVAILABLE = False | |
| except Exception: | |
| HALLUCINATION_AVAILABLE = False | |
| UNIEVAL_AVAILABLE = False | |
| # ------------------------- | |
| # Rule-based metrics | |
| # ------------------------- | |
| def check_instruction_following(prompt: str, response: str) -> float: | |
| prompt = (prompt or "").lower() | |
| response = (response or "").lower() | |
| keywords = re.findall(r"\b\w+\b", prompt) | |
| if not keywords: | |
| return 0.0 | |
| matches = sum(1 for k in set(keywords) if k in response) | |
| return round(matches / len(set(keywords)), 3) | |
| def check_grammar(response: str) -> Tuple[int, float]: | |
| """Returns (num_matches, grammar_score).""" | |
| if not response: | |
| return 0, 0.0 | |
| if tool is None: | |
| return 0, 0.8 | |
| try: | |
| matches = tool.check(response) | |
| num = len(matches) | |
| score = max(0.0, 1 - num / 10) | |
| return num, round(score, 3) | |
| except Exception: | |
| return 0, 0.8 | |
| def check_coherence(response: str) -> float: | |
| if not response: | |
| return 0.0 | |
| sents = max(1, len(re.split(r"[.!?]+", response)) - 1) | |
| words = max(1, len(re.findall(r"\w+", response))) | |
| base = min(1.0, (words / 50.0) + (sents / 5.0)) | |
| val = max(0.5, min(base * 0.9, 0.98)) | |
| return round(val, 3) | |
| def check_accuracy_embeddings(reference: str, response: str, embed_model=None) -> float: | |
| if not reference or not response or embed_model is None: | |
| return 0.0 | |
| try: | |
| ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| resp_emb = embed_model.encode(response, convert_to_tensor=True) | |
| sim = float(util.cos_sim(ref_emb, resp_emb)) | |
| return round(max(0.0, min(1.0, sim)), 3) | |
| except Exception: | |
| return 0.0 | |
| # ------------------------- | |
| # Hallucination Detector | |
| # ------------------------- | |
| class HallucinationDetectorWrapper: | |
| def __init__(self): | |
| self.ready = False | |
| self._init_detector() | |
| def _init_detector(self): | |
| global HALLUCINATION_AVAILABLE | |
| if not HALLUCINATION_AVAILABLE: | |
| return | |
| try: | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # metrics | |
| self.rouge = evaluate.load('rouge') | |
| self.sacrebleu = evaluate.load('sacrebleu') | |
| self.bertscore = evaluate.load('bertscore') | |
| # UniEval if available | |
| self.unieval_evaluator = None | |
| if UNIEVAL_AVAILABLE: | |
| try: | |
| from unieval.metric.evaluator import get_evaluator | |
| self.unieval_evaluator = get_evaluator('fact') | |
| except Exception: | |
| self.unieval_evaluator = None | |
| # load smaller models | |
| self.qg_tokenizer = AutoTokenizer.from_pretrained("mrm8488/t5-base-finetuned-question-generation") | |
| self.qg_model = T5ForConditionalGeneration.from_pretrained("mrm8488/t5-base-finetuned-question-generation").to(self.device) | |
| self.qa_tokenizer = AutoTokenizer.from_pretrained("deepset/roberta-base-squad2") | |
| self.qa_model = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-base-squad2").to(self.device) | |
| nli_model_name = "ynie/roberta-large-snli_mnli_fever_anli_R1_R2_R3-nli" | |
| self.nli_tokenizer = AutoTokenizer.from_pretrained(nli_model_name) | |
| self.nli_model = AutoModelForSequenceClassification.from_pretrained(nli_model_name).to(self.device) | |
| judge_model_name = "google/flan-t5-large" | |
| self.judge_tokenizer = AutoTokenizer.from_pretrained(judge_model_name) | |
| self.judge_model = AutoModelForSeq2SeqLM.from_pretrained(judge_model_name).to(self.device) | |
| self.ready = True | |
| except Exception: | |
| self.ready = False | |
| def is_ready(self): | |
| return self.ready | |
| def detect(self, prompt: str, output: str) -> Dict: | |
| if not self.ready: | |
| return { | |
| "rouge_l": 0.0, "sacrebleu": 0.0, "bertscore_f1": 0.0, | |
| "unieval_consistency": 0.0, | |
| "q_squared_nli_contradiction": 0.5, | |
| "critic_contradiction": 0.5 | |
| } | |
| try: | |
| input_text = f"Provide a factual answer: {prompt}" | |
| input_ids = self.judge_tokenizer(input_text, return_tensors="pt").input_ids.to(self.device) | |
| outputs = self.judge_model.generate(input_ids, max_length=384, num_beams=5, early_stopping=True) | |
| knowledge_source = self.judge_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| rouge_l = self.rouge.compute(predictions=[output], references=[knowledge_source])['rougeL'] | |
| sacre = self.sacrebleu.compute(predictions=[output], references=[[knowledge_source]])['score'] / 100.0 | |
| bert_f1 = np.mean(self.bertscore.compute(predictions=[output], references=[knowledge_source], lang='en')['f1']) | |
| if self.unieval_evaluator: | |
| try: | |
| ue = self.unieval_evaluator.evaluate([{'source': knowledge_source, 'system_output': output}])[0]['consistency'] | |
| except Exception: | |
| ue = 0.0 | |
| else: | |
| ue = 0.0 | |
| return { | |
| "rouge_l": rouge_l, | |
| "sacrebleu": sacre, | |
| "bertscore_f1": bert_f1, | |
| "unieval_consistency": ue, | |
| "q_squared_nli_contradiction": 0.5, | |
| "critic_contradiction": 0.5 | |
| } | |
| except Exception: | |
| return { | |
| "rouge_l": 0.0, "sacrebleu": 0.0, "bertscore_f1": 0.0, | |
| "unieval_consistency": 0.0, | |
| "q_squared_nli_contradiction": 0.5, | |
| "critic_contradiction": 0.5 | |
| } | |
| # Singleton | |
| _DETECTOR = None | |
| def get_detector(): | |
| global _DETECTOR | |
| if _DETECTOR is None: | |
| _DETECTOR = HallucinationDetectorWrapper() | |
| return _DETECTOR | |
| def hallucination_score(prompt: str, output: str) -> float: | |
| d = get_detector() | |
| res = d.detect(prompt, output) | |
| weights = { | |
| "rouge_l": 0.2, "sacrebleu": 0.05, "bertscore_f1": 0.25, | |
| "unieval_consistency": 0.25, | |
| "q_squared_nli_contradiction": 0.15, | |
| "critic_contradiction": 0.10 | |
| } | |
| total = sum(weights.values()) | |
| weights = {k: v/total for k, v in weights.items()} | |
| invert = {"rouge_l", "sacrebleu", "bertscore_f1", "unieval_consistency"} | |
| final = 0.0 | |
| for m, w in weights.items(): | |
| v = res.get(m, 0.0) | |
| if m in invert: | |
| v = 1 - v | |
| final += w * v | |
| return float(final) | |
| # ------------------------- | |
| # Main evaluation | |
| # ------------------------- | |
| def evaluate_dataframe(df: pd.DataFrame, use_llm_judge: bool = False) -> Tuple[pd.DataFrame, list, pd.DataFrame]: | |
| """ | |
| Input: df with columns [prompt, response, task, agent, reference (opt)] | |
| Returns: (metrics_df, images, leaderboard_df) | |
| """ | |
| # Normalize colnames | |
| df = df.rename(columns={c: c.strip() for c in df.columns}) | |
| if "instruction" not in df.columns and "prompt" in df.columns: | |
| df = df.rename(columns={"prompt": "instruction"}) | |
| if "response" not in df.columns and "output" in df.columns: | |
| df = df.rename(columns={"output": "response"}) | |
| if "agent" not in df.columns: | |
| df["agent"] = "Unknown" | |
| # sentence-transformers model for accuracy | |
| embed_model = None | |
| try: | |
| embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| except Exception: | |
| embed_model = None | |
| rows = [] | |
| for _, r in df.iterrows(): | |
| instr = str(r.get("instruction", "")) | |
| response = str(r.get("response", "")) | |
| reference = str(r.get("reference", "")) if "reference" in r else "" | |
| agent = r.get("agent", "Unknown") | |
| task = r.get("task", "Unknown") | |
| inst_score = check_instruction_following(instr, response) | |
| _, grammar_score = check_grammar(response) | |
| coh_score = check_coherence(response) | |
| acc_emb = check_accuracy_embeddings(reference, response, embed_model) | |
| base_final = float(np.mean([inst_score, grammar_score, coh_score, acc_emb])) | |
| row_entry = { | |
| "Task": task, | |
| "Agent": agent, | |
| "Instruction": instr, | |
| "Response": response, | |
| "Reference": reference, | |
| "score_instruction": inst_score, | |
| "score_grammar": grammar_score, | |
| "score_coherence": coh_score, | |
| "score_accuracy": acc_emb, | |
| "base_final_score": round(base_final, 4) | |
| } | |
| if use_llm_judge: | |
| try: | |
| h = hallucination_score(instr, response) | |
| row_entry["score_llm_consistency"] = round(1.0 - h, 4) | |
| row_entry["final_score"] = round((base_final + (1.0 - h)) / 2, 4) | |
| except Exception: | |
| row_entry["score_llm_consistency"] = 0.5 | |
| row_entry["final_score"] = base_final | |
| else: | |
| row_entry["score_llm_consistency"] = np.nan | |
| row_entry["final_score"] = base_final | |
| rows.append(row_entry) | |
| metrics_df = pd.DataFrame(rows) | |
| # ---------- Visualizations ---------- | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import uuid | |
| images = [] | |
| # Leaderboard | |
| try: | |
| lb = metrics_df.groupby("Agent")["final_score"].mean().reset_index().sort_values("final_score", ascending=False) | |
| fname = f"/tmp/{uuid.uuid4().hex}_leaderboard.png" | |
| fig, ax = plt.subplots(figsize=(8, max(4, len(lb)*0.4))) | |
| ax.barh(lb["Agent"], lb["final_score"], color="tab:blue") | |
| ax.invert_yaxis() | |
| ax.set_xlabel("Average final score") | |
| ax.set_title("Leaderboard") | |
| plt.tight_layout() | |
| fig.savefig(fname, bbox_inches="tight") | |
| plt.close(fig) | |
| images.append((fname, "Leaderboard")) | |
| except Exception: | |
| pass | |
| # Radar chart (all agents) | |
| try: | |
| metric_cols = ["score_instruction", "score_coherence", "score_grammar", "score_accuracy"] | |
| if use_llm_judge: | |
| metric_cols.append("score_llm_consistency") | |
| agg = metrics_df.groupby("Agent")[metric_cols].mean().reset_index() | |
| labels = [c.replace("score_", "").capitalize() for c in metric_cols] | |
| rows_for_plot = [] | |
| for _, row in agg.iterrows(): | |
| vals = [float(row[c])*100 for c in metric_cols] | |
| rows_for_plot.append({"name": row["Agent"], "values": vals}) | |
| def spider_net_multi(labels, rows, title="Radar"): | |
| N = len(labels) | |
| angles = [n / float(N) * 2 * math.pi for n in range(N)] | |
| angles += angles[:1] | |
| fig = plt.figure(figsize=(6.5,6.5)) | |
| ax = plt.subplot(111, polar=True) | |
| ax.set_xticks(angles[:-1]) | |
| ax.set_xticklabels(labels) | |
| ax.set_ylim(0, 100) | |
| for r in rows: | |
| v = r["values"] + r["values"][:1] | |
| ax.plot(angles, v, label=r["name"]) | |
| ax.fill(angles, v, alpha=0.1) | |
| ax.set_title(title) | |
| ax.legend(loc="upper right", bbox_to_anchor=(1.3,1.1)) | |
| return fig | |
| fig = spider_net_multi(labels, rows_for_plot, "All Agents Comparison") | |
| fname2 = f"/tmp/{uuid.uuid4().hex}_radar.png" | |
| fig.savefig(fname2, bbox_inches="tight") | |
| plt.close(fig) | |
| images.append((fname2, "All agents radar")) | |
| except Exception: | |
| pass | |
| # Per-task radar | |
| try: | |
| for task, subset in metrics_df.groupby("Task"): | |
| agg = subset.groupby("Agent")[metric_cols].mean().reset_index() | |
| if agg.shape[0] == 0: | |
| continue | |
| rows_for_plot = [] | |
| for _, row in agg.iterrows(): | |
| vals = [float(row[c])*100 for c in metric_cols] | |
| rows_for_plot.append({"name": row["Agent"], "values": vals}) | |
| fig = spider_net_multi(labels, rows_for_plot, f"{task} Agents") | |
| fname3 = f"/tmp/{uuid.uuid4().hex}_{task}_radar.png" | |
| fig.savefig(fname3, bbox_inches="tight") | |
| plt.close(fig) | |
| images.append((fname3, f"{task} radar")) | |
| except Exception: | |
| pass | |
| # Correlation heatmap | |
| try: | |
| metric_cols2 = ["score_instruction", "score_coherence", "score_grammar", "score_accuracy", "final_score"] | |
| if use_llm_judge: | |
| metric_cols2.append("score_llm_consistency") | |
| fig, ax = plt.subplots(figsize=(7,6)) | |
| sns.heatmap(metrics_df[metric_cols2].corr(), annot=True, fmt=".2f", cmap="coolwarm", ax=ax) | |
| ax.set_title("Metric correlations") | |
| fnameh = f"/tmp/{uuid.uuid4().hex}_heatmap.png" | |
| fig.savefig(fnameh, bbox_inches="tight") | |
| plt.close(fig) | |
| images.append((fnameh, "Metric correlations")) | |
| except Exception: | |
| pass | |
| leaderboard_df = metrics_df.groupby(["Agent","Task"])["final_score"].mean().reset_index().sort_values("final_score", ascending=False) | |
| return metrics_df, images, leaderboard_df | |