##################################################################################################################################################################### # import re # import json # import torch # import pandas as pd # import matplotlib.pyplot as plt # import seaborn as sns # import os # import uuid # from transformers import AutoTokenizer, AutoModelForSequenceClassification # from sentence_transformers import SentenceTransformer, util # import matplotlib.pyplot as plt # import numpy as np # def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): # """ # Radar chart comparing multiple agents across metrics. # """ # labels = metrics # num_vars = len(labels) # # Compute angle for each axis # angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() # angles += angles[:1] # close loop # fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True)) # for agent in agents: # values = [] # for m in metrics: # mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() # values.append(mean_val if not np.isnan(mean_val) else 0) # values += values[:1] # ax.plot(angles, values, label=agent, linewidth=2) # ax.fill(angles, values, alpha=0.25) # ax.set_xticks(angles[:-1]) # ax.set_xticklabels(labels) # ax.set_yticklabels([]) # ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) # ax.set_title("Agent Performance Radar Chart") # plt.tight_layout() # plt.savefig(out_path) # plt.close() # return out_path # import seaborn as sns # def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): # pivot = metrics_df.groupby("agent")[ # ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"] # ].mean() # plt.figure(figsize=(8, 5)) # sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f") # plt.title("Agent × Metric Heatmap") # plt.tight_layout() # plt.savefig(out_path) # plt.close() # return out_path # # -------------------------- # # MODEL LOADING # # -------------------------- # NLI_MODEL = "textattack/roberta-base-MNLI" # EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # # Load NLI model & tokenizer # nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) # nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) # nli_model.to("cpu") # nli_model.eval() # # Load embedding model # embed_model = SentenceTransformer(EMBED_MODEL) # # Label mapping from config # id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} # # -------------------------- # # METRIC FUNCTIONS # # -------------------------- # def check_instruction_following(prompt: str, response: str) -> float: # """Embedding-based similarity between prompt and response.""" # if not prompt or not response: # return 0.0 # p_emb = embed_model.encode(prompt, convert_to_tensor=True) # r_emb = embed_model.encode(response, convert_to_tensor=True) # sim = float(util.cos_sim(p_emb, r_emb).item()) # return round(max(0.0, min(1.0, sim)), 3) # def check_hallucination(reference: str, response: str) -> float: # """ # Single hallucination score: # Entailment prob - Contradiction prob (normalized to [0,1]). # Higher = less hallucination. # """ # if not reference or not response: # return 0.0 # with torch.no_grad(): # inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True) # outputs = nli_model(**inputs) # probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] # entail_prob, contra_prob = 0.0, 0.0 # for idx, p in enumerate(probs): # label = id2label.get(idx, "") # if "ENTAIL" in label: # entail_prob = float(p) # elif "CONTRA" in label: # contra_prob = float(p) # score = entail_prob - contra_prob # score = (score + 1) / 2 # normalize [-1,1] → [0,1] # return round(max(0.0, min(1.0, score)), 3) # def check_assumption(response: str) -> float: # """Detect speculative/hedging terms.""" # if not response: # return 0.0 # speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"] # count = sum(1 for t in speculative_terms if t in response.lower()) # score = 1.0 - min(count / 5.0, 1.0) # smoother decay # return round(score, 3) # def check_coherence(response: str) -> float: # """Heuristic coherence metric: penalizes very short/long, rewards sentence balance.""" # if not response: # return 0.0 # words = len(re.findall(r"\w+", response)) # sents = max(1, len(re.split(r"[.!?]+", response)) - 1) # if words < 5: # return 0.3 # if words > 200: # return 0.5 # base = min(1.0, (words / 50.0) + (sents / 5.0)) # return round(max(0.4, min(base, 0.95)), 3) # def check_accuracy(reference: str, response: str) -> float: # """Semantic similarity between reference and response via embeddings (cosine).""" # if not reference or not response: # return 0.0 # ref_emb = embed_model.encode(reference, convert_to_tensor=True) # resp_emb = embed_model.encode(response, convert_to_tensor=True) # sim = float(util.cos_sim(ref_emb, resp_emb).item()) # return round(max(0.0, min(1.0, sim)), 3) # # -------------------------- # # ROW & DF EVALUATION # # -------------------------- # def evaluate_row(row): # prompt = row.get("prompt", "") # response = row.get("response", "") # reference = row.get("reference", "") # metrics = { # "task_id": row.get("task_id", ""), # "agent": row.get("agent", ""), # "instruction_following": check_instruction_following(prompt, response), # "hallucination": check_hallucination(reference, response), # "assumption": check_assumption(response), # "coherence": check_coherence(response), # "accuracy": check_accuracy(reference, response), # } # # Weighted avg score (you can adjust weights) # metrics["final_score"] = round( # 0.25 * metrics["instruction_following"] # + 0.25 * metrics["accuracy"] # + 0.2 * metrics["hallucination"] # + 0.15 * metrics["coherence"] # + 0.15 * metrics["assumption"], # 3, # ) # return metrics # def evaluate_dataframe(df: pd.DataFrame): # metrics_df = df.apply(evaluate_row, axis=1, result_type="expand") # # Leaderboard # leaderboard = ( # metrics_df.groupby(["agent", "task_id"])["final_score"] # .mean() # .reset_index() # ) # # # Plots # # images = [] # # Existing images list # images = [] # # Add radar chart # radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(), # ############################################################################################################################### # evaluator.py """ Evaluator for Agentic Evaluation Framework """ import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import math, uuid, re from sentence_transformers import SentenceTransformer, util from transformers import AutoTokenizer, AutoModelForSequenceClassification # ------------------------ # Models (lightweight) # ------------------------ EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" NLI_MODEL = "textattack/roberta-base-MNLI" _embed_model = SentenceTransformer(EMBED_MODEL) _nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) _nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) _id2label = {int(k): v.upper() for k, v in _nli_model.config.id2label.items()} # ------------------------ # Metrics # ------------------------ def check_instruction_following(prompt, response): if not prompt or not response: return 0.0 p_emb = _embed_model.encode(prompt, convert_to_tensor=True) r_emb = _embed_model.encode(response, convert_to_tensor=True) sim = float(util.cos_sim(p_emb, r_emb).item()) return round(max(0.0, min(1.0, sim)), 3) def check_hallucination(prompt, response): if not prompt or not response: return 0.0 inputs = _nli_tokenizer.encode_plus(prompt, response, return_tensors="pt", truncation=True) outputs = _nli_model(**inputs) probs = outputs.logits.softmax(dim=1).detach().cpu().numpy()[0] labels = [ _id2label[i] for i in range(len(probs)) ] entailment_prob = float(probs[labels.index("ENTAILMENT")]) if "ENTAILMENT" in labels else float(probs.max()) return round(max(0.0, min(1.0, entailment_prob)), 3) def check_accuracy(reference, response): if not reference or not response: return 0.0 ref_emb = _embed_model.encode(reference, convert_to_tensor=True) r_emb = _embed_model.encode(response, convert_to_tensor=True) sim = float(util.cos_sim(ref_emb, r_emb).item()) return round(max(0.0, min(1.0, sim)), 3) def check_coherence(response): if not response: return 0.0 sents = [s.strip() for s in re.split(r"[.!?]+", response) if s.strip()] if len(sents) <= 1: return 1.0 embs = _embed_model.encode(sents, convert_to_tensor=True) sims = [] for i in range(len(embs)): for j in range(i+1, len(embs)): sims.append(float(util.cos_sim(embs[i], embs[j]).item())) avg = np.mean(sims) return round((avg + 1) / 2, 3) # normalize to [0,1] def check_fluency(response): if not response: return 0.0 letters = sum(ch.isalpha() for ch in response) total = len(response) return round(letters / max(1, total), 3) # ------------------------ # Evaluation # ------------------------ def evaluate_dataframe(df: pd.DataFrame): scores = [] for _, row in df.iterrows(): s = {} s["instruction_following"] = check_instruction_following(str(row.get("prompt", "")), str(row.get("response", ""))) s["hallucination"] = check_hallucination(str(row.get("prompt", "")), str(row.get("response", ""))) s["accuracy"] = check_accuracy(str(row.get("reference", "")), str(row.get("response", ""))) s["coherence"] = check_coherence(str(row.get("response", ""))) s["fluency"] = check_fluency(str(row.get("response", ""))) # clamp for k in s: s[k] = max(0.0, min(1.0, s[k])) s["final_score"] = round(float(np.mean(list(s.values()))), 3) scores.append(s) metrics_df = pd.concat([df.reset_index(drop=True), pd.DataFrame(scores)], axis=1) metric_cols = ["instruction_following", "hallucination", "accuracy", "coherence", "fluency", "final_score"] leaderboard = ( metrics_df.groupby(["agent", "task_type"])[metric_cols] .mean() .reset_index() ) return metrics_df, [], leaderboard # ------------------------ # Visualizations # ------------------------ def plot_radar_chart(leaderboard, metric_cols): categories = metric_cols angles = np.linspace(0, 2*np.pi, len(categories), endpoint=False).tolist() angles += angles[:1] fig = plt.figure(figsize=(6,6)) ax = plt.subplot(111, polar=True) for agent in leaderboard["agent"].unique(): vals = leaderboard[leaderboard["agent"]==agent][metric_cols].mean().tolist() vals += vals[:1] ax.plot(angles, vals, label=agent) ax.fill(angles, vals, alpha=0.1) ax.set_xticks(angles[:-1]) ax.set_xticklabels(categories) ax.set_ylim(0,1) ax.legend(loc="upper right") return fig def plot_heatmap(metrics_df, metric_cols): fig, ax = plt.subplots(figsize=(7,5)) sns.heatmap(metrics_df[metric_cols].corr(), annot=True, fmt=".2f", cmap="coolwarm", ax=ax) return fig def plot_boxplot(metrics_df, metric_cols): fig, ax = plt.subplots(figsize=(7,5)) sns.boxplot(data=metrics_df[metric_cols], ax=ax) return fig def plot_bar(leaderboard, metric_cols): fig, ax = plt.subplots(figsize=(8,5)) leaderboard.plot(x="agent", y="final_score", kind="bar", ax=ax, legend=False) ax.set_ylabel("Final Score") return fig def generate_visualizations(metrics_df, leaderboard): metric_cols = ["instruction_following", "hallucination", "accuracy", "coherence", "fluency", "final_score"] figs = [] try: figs.append(plot_radar_chart(leaderboard, metric_cols)) except Exception as e: print("Radar failed:", e) try: figs.append(plot_heatmap(metrics_df, metric_cols)) except Exception as e: print("Heatmap failed:", e) try: figs.append(plot_boxplot(metrics_df, metric_cols)) except Exception as e: print("Boxplot failed:", e) try: figs.append(plot_bar(leaderboard, metric_cols)) except Exception as e: print("Bar failed:", e) # Save to temp and return as gallery list images = [] for fig in figs: path = f"/tmp/viz_{uuid.uuid4().hex}.png" fig.savefig(path, bbox_inches="tight") plt.close(fig) images.append(path) return images