##################################################################################################################################################################### # import re # import json # import torch # import pandas as pd # import matplotlib.pyplot as plt # import seaborn as sns # import os # import uuid # from transformers import AutoTokenizer, AutoModelForSequenceClassification # from sentence_transformers import SentenceTransformer, util # import matplotlib.pyplot as plt # import numpy as np # def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): # """ # Radar chart comparing multiple agents across metrics. # """ # labels = metrics # num_vars = len(labels) # # Compute angle for each axis # angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() # angles += angles[:1] # close loop # fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True)) # for agent in agents: # values = [] # for m in metrics: # mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() # values.append(mean_val if not np.isnan(mean_val) else 0) # values += values[:1] # ax.plot(angles, values, label=agent, linewidth=2) # ax.fill(angles, values, alpha=0.25) # ax.set_xticks(angles[:-1]) # ax.set_xticklabels(labels) # ax.set_yticklabels([]) # ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) # ax.set_title("Agent Performance Radar Chart") # plt.tight_layout() # plt.savefig(out_path) # plt.close() # return out_path # import seaborn as sns # def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): # pivot = metrics_df.groupby("agent")[ # ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"] # ].mean() # plt.figure(figsize=(8, 5)) # sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f") # plt.title("Agent × Metric Heatmap") # plt.tight_layout() # plt.savefig(out_path) # plt.close() # return out_path # # -------------------------- # # MODEL LOADING # # -------------------------- # NLI_MODEL = "textattack/roberta-base-MNLI" # EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # # Load NLI model & tokenizer # nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) # nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) # nli_model.to("cpu") # nli_model.eval() # # Load embedding model # embed_model = SentenceTransformer(EMBED_MODEL) # # Label mapping from config # id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} # # -------------------------- # # METRIC FUNCTIONS # # -------------------------- # def check_instruction_following(prompt: str, response: str) -> float: # """Embedding-based similarity between prompt and response.""" # if not prompt or not response: # return 0.0 # p_emb = embed_model.encode(prompt, convert_to_tensor=True) # r_emb = embed_model.encode(response, convert_to_tensor=True) # sim = float(util.cos_sim(p_emb, r_emb).item()) # return round(max(0.0, min(1.0, sim)), 3) # def check_hallucination(reference: str, response: str) -> float: # """ # Single hallucination score: # Entailment prob - Contradiction prob (normalized to [0,1]). # Higher = less hallucination. # """ # if not reference or not response: # return 0.0 # with torch.no_grad(): # inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True) # outputs = nli_model(**inputs) # probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] # entail_prob, contra_prob = 0.0, 0.0 # for idx, p in enumerate(probs): # label = id2label.get(idx, "") # if "ENTAIL" in label: # entail_prob = float(p) # elif "CONTRA" in label: # contra_prob = float(p) # score = entail_prob - contra_prob # score = (score + 1) / 2 # normalize [-1,1] → [0,1] # return round(max(0.0, min(1.0, score)), 3) # def check_assumption(response: str) -> float: # """Detect speculative/hedging terms.""" # if not response: # return 0.0 # speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"] # count = sum(1 for t in speculative_terms if t in response.lower()) # score = 1.0 - min(count / 5.0, 1.0) # smoother decay # return round(score, 3) # def check_coherence(response: str) -> float: # """Heuristic coherence metric: penalizes very short/long, rewards sentence balance.""" # if not response: # return 0.0 # words = len(re.findall(r"\w+", response)) # sents = max(1, len(re.split(r"[.!?]+", response)) - 1) # if words < 5: # return 0.3 # if words > 200: # return 0.5 # base = min(1.0, (words / 50.0) + (sents / 5.0)) # return round(max(0.4, min(base, 0.95)), 3) # def check_accuracy(reference: str, response: str) -> float: # """Semantic similarity between reference and response via embeddings (cosine).""" # if not reference or not response: # return 0.0 # ref_emb = embed_model.encode(reference, convert_to_tensor=True) # resp_emb = embed_model.encode(response, convert_to_tensor=True) # sim = float(util.cos_sim(ref_emb, resp_emb).item()) # return round(max(0.0, min(1.0, sim)), 3) # # -------------------------- # # ROW & DF EVALUATION # # -------------------------- # def evaluate_row(row): # prompt = row.get("prompt", "") # response = row.get("response", "") # reference = row.get("reference", "") # metrics = { # "task_id": row.get("task_id", ""), # "agent": row.get("agent", ""), # "instruction_following": check_instruction_following(prompt, response), # "hallucination": check_hallucination(reference, response), # "assumption": check_assumption(response), # "coherence": check_coherence(response), # "accuracy": check_accuracy(reference, response), # } # # Weighted avg score (you can adjust weights) # metrics["final_score"] = round( # 0.25 * metrics["instruction_following"] # + 0.25 * metrics["accuracy"] # + 0.2 * metrics["hallucination"] # + 0.15 * metrics["coherence"] # + 0.15 * metrics["assumption"], # 3, # ) # return metrics # def evaluate_dataframe(df: pd.DataFrame): # metrics_df = df.apply(evaluate_row, axis=1, result_type="expand") # # Leaderboard # leaderboard = ( # metrics_df.groupby(["agent", "task_id"])["final_score"] # .mean() # .reset_index() # ) # # # Plots # # images = [] # # Existing images list # images = [] # # Add radar chart # radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(), # ############################################################################################################################### """ Evaluation logic for Agentic Evaluation Framework. """ import os import numpy as np import pandas as pd import torch import matplotlib.pyplot as plt from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, pipeline, ) from sentence_transformers import SentenceTransformer import evaluate # ----------------------------- # Global Config # ----------------------------- NLI_MODEL = "microsoft/deberta-v2-xlarge-mnli" EMBED_MODEL = "all-MiniLM-L6-v2" LLM_JUDGE_MODEL = "microsoft/DialoGPT-small" FLUENCY_MODEL = "textattack/roberta-base-CoLA" device = 0 if torch.cuda.is_available() else -1 # Caches _nli_model, _nli_tokenizer = None, None _embed_model = None _judge_model, _judge_tokenizer = None, None _fluency_checker = None # Metrics bertscore = evaluate.load("bertscore") bleu = evaluate.load("bleu") rouge = evaluate.load("rouge") # ----------------------------- # Lazy Model Loaders # ----------------------------- def get_nli_model(): global _nli_model, _nli_tokenizer if _nli_model is None: _nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) _nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL).to( torch.device("cuda" if torch.cuda.is_available() else "cpu") ) _nli_model.eval() return _nli_model, _nli_tokenizer def get_embed_model(): global _embed_model if _embed_model is None: _embed_model = SentenceTransformer(EMBED_MODEL, device="cuda" if torch.cuda.is_available() else "cpu") return _embed_model def get_judge_model(): global _judge_model, _judge_tokenizer if _judge_model is None: _judge_tokenizer = AutoTokenizer.from_pretrained(LLM_JUDGE_MODEL) _judge_model = AutoModelForCausalLM.from_pretrained(LLM_JUDGE_MODEL).to( torch.device("cuda" if torch.cuda.is_available() else "cpu") ) return _judge_model, _judge_tokenizer def get_fluency_checker(): global _fluency_checker if _fluency_checker is None: _fluency_checker = pipeline( "text-classification", model=FLUENCY_MODEL, device=device ) return _fluency_checker # ----------------------------- # Evaluation Functions # ----------------------------- def check_instruction_following(prompt, response): try: nli_model, nli_tokenizer = get_nli_model() inputs = nli_tokenizer(prompt, response, return_tensors="pt", truncation=True, padding=True).to( nli_model.device ) with torch.no_grad(): logits = nli_model(**inputs).logits probs = torch.softmax(logits, dim=-1).cpu().numpy()[0] entailment_score = probs[2] # entailment index return float(entailment_score) except Exception: return 0.0 def check_hallucination(reference, response): try: nli_model, nli_tokenizer = get_nli_model() inputs = nli_tokenizer(reference, response, return_tensors="pt", truncation=True, padding=True).to( nli_model.device ) with torch.no_grad(): logits = nli_model(**inputs).logits probs = torch.softmax(logits, dim=-1).cpu().numpy()[0] contradiction_score = probs[0] # contradiction index return 1.0 - float(contradiction_score) except Exception: return 0.0 def check_assumption(prompt, response): try: judge_model, judge_tokenizer = get_judge_model() input_text = f"Does this response make assumptions not in the prompt?\nPrompt: {prompt}\nResponse: {response}\nAnswer yes or no:" inputs = judge_tokenizer.encode(input_text, return_tensors="pt").to(judge_model.device) outputs = judge_model.generate(inputs, max_length=50) judgment = judge_tokenizer.decode(outputs[0], skip_special_tokens=True).lower() if "yes" in judgment: return 0.0 elif "no" in judgment: return 1.0 return 0.5 except Exception: return 0.5 def check_coherence(response): try: emb = get_embed_model().encode(response, convert_to_tensor=True, normalize_embeddings=True) coherence = float(torch.mean(emb).cpu().item()) return coherence except Exception: return 0.0 def check_accuracy(reference, response): try: bert_results = bertscore.compute(predictions=[response], references=[reference], lang="en") bert_f1 = bert_results["f1"][0] except Exception: bert_f1 = 0.0 try: bleu_results = bleu.compute(predictions=[response], references=[[reference]]) bleu_score = bleu_results["bleu"] except Exception: bleu_score = 0.0 try: rouge_results = rouge.compute(predictions=[response], references=[reference]) rouge_l = rouge_results["rougeL"] except Exception: rouge_l = 0.0 return float((bert_f1 + bleu_score + rouge_l) / 3) def check_relevance(prompt, response): try: model = get_embed_model() emb1 = model.encode(prompt, convert_to_tensor=True) emb2 = model.encode(response, convert_to_tensor=True) cos_sim = torch.nn.functional.cosine_similarity(emb1, emb2, dim=0) return float(cos_sim.item()) except Exception: return 0.0 def check_fluency(response): try: fluency_checker = get_fluency_checker() result = fluency_checker(response)[0] return float(result["score"]) if result["label"] == "LABEL_1" else 1.0 - float(result["score"]) except Exception: return 0.5 # ----------------------------- # Row Evaluation # ----------------------------- def evaluate_row(row): scores = { "instruction_following": check_instruction_following(row["prompt"], row["response"]), "hallucination": check_hallucination(row["reference"], row["response"]), "assumption": check_assumption(row["prompt"], row["response"]), "coherence": check_coherence(row["response"]), "accuracy": check_accuracy(row["reference"], row["response"]), "relevance": check_relevance(row["prompt"], row["response"]), "fluency": check_fluency(row["response"]), } scores["final_score"] = np.mean(list(scores.values())) return pd.Series(scores) # ----------------------------- # Visualization Helpers # ----------------------------- def plot_radar_chart(metrics_df, out_path="/tmp/radar.png"): import seaborn as sns mean_scores = metrics_df.mean(numeric_only=True).drop("final_score", errors="ignore") categories = list(mean_scores.index) values = mean_scores.values.tolist() values += values[:1] categories += categories[:1] angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist() angles += angles[:1] plt.figure(figsize=(6, 6)) ax = plt.subplot(111, polar=True) ax.plot(angles, values, "o-", linewidth=2) ax.fill(angles, values, alpha=0.25) ax.set_thetagrids(np.degrees(angles[:-1]), categories) plt.savefig(out_path) plt.close() return out_path, "Radar Chart (Mean Scores)" def plot_leaderboard(metrics_df, out_path="/tmp/leaderboard.png"): agent_means = metrics_df.groupby("agent")["final_score"].mean().sort_values(ascending=False) plt.figure(figsize=(10, 5)) agent_means.plot(kind="bar", colormap="Set3", ax=plt.gca()) plt.title("Leaderboard: Avg Final Score per Agent") plt.ylabel("Score") plt.tight_layout() plt.savefig(out_path) plt.close() return out_path, "Leaderboard" # ----------------------------- # Main Evaluation Entry # ----------------------------- def evaluate_dataframe(df: pd.DataFrame): metrics_df = df.apply(evaluate_row, axis=1, result_type="expand") metrics_df = pd.concat([df, metrics_df], axis=1) leaderboard = ( metrics_df.groupby("agent")["final_score"] .mean() .reset_index() .sort_values("final_score", ascending=False) ) images = [] images.append(plot_radar_chart(metrics_df)) images.append(plot_leaderboard(metrics_df)) return metrics_df, images, leaderboard