Spaces:
Runtime error
Runtime error
| ##################################################################################################################################################################### | |
| # import re | |
| # import json | |
| # import torch | |
| # import pandas as pd | |
| # import matplotlib.pyplot as plt | |
| # import seaborn as sns | |
| # import os | |
| # import uuid | |
| # from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| # from sentence_transformers import SentenceTransformer, util | |
| # import matplotlib.pyplot as plt | |
| # import numpy as np | |
| # def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): | |
| # """ | |
| # Radar chart comparing multiple agents across metrics. | |
| # """ | |
| # labels = metrics | |
| # num_vars = len(labels) | |
| # # Compute angle for each axis | |
| # angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() | |
| # angles += angles[:1] # close loop | |
| # fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True)) | |
| # for agent in agents: | |
| # values = [] | |
| # for m in metrics: | |
| # mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() | |
| # values.append(mean_val if not np.isnan(mean_val) else 0) | |
| # values += values[:1] | |
| # ax.plot(angles, values, label=agent, linewidth=2) | |
| # ax.fill(angles, values, alpha=0.25) | |
| # ax.set_xticks(angles[:-1]) | |
| # ax.set_xticklabels(labels) | |
| # ax.set_yticklabels([]) | |
| # ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) | |
| # ax.set_title("Agent Performance Radar Chart") | |
| # plt.tight_layout() | |
| # plt.savefig(out_path) | |
| # plt.close() | |
| # return out_path | |
| # import seaborn as sns | |
| # def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): | |
| # pivot = metrics_df.groupby("agent")[ | |
| # ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"] | |
| # ].mean() | |
| # plt.figure(figsize=(8, 5)) | |
| # sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f") | |
| # plt.title("Agent Γ Metric Heatmap") | |
| # plt.tight_layout() | |
| # plt.savefig(out_path) | |
| # plt.close() | |
| # return out_path | |
| # # -------------------------- | |
| # # MODEL LOADING | |
| # # -------------------------- | |
| # NLI_MODEL = "textattack/roberta-base-MNLI" | |
| # EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| # # Load NLI model & tokenizer | |
| # nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) | |
| # nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) | |
| # nli_model.to("cpu") | |
| # nli_model.eval() | |
| # # Load embedding model | |
| # embed_model = SentenceTransformer(EMBED_MODEL) | |
| # # Label mapping from config | |
| # id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} | |
| # # -------------------------- | |
| # # METRIC FUNCTIONS | |
| # # -------------------------- | |
| # def check_instruction_following(prompt: str, response: str) -> float: | |
| # """Embedding-based similarity between prompt and response.""" | |
| # if not prompt or not response: | |
| # return 0.0 | |
| # p_emb = embed_model.encode(prompt, convert_to_tensor=True) | |
| # r_emb = embed_model.encode(response, convert_to_tensor=True) | |
| # sim = float(util.cos_sim(p_emb, r_emb).item()) | |
| # return round(max(0.0, min(1.0, sim)), 3) | |
| # def check_hallucination(reference: str, response: str) -> float: | |
| # """ | |
| # Single hallucination score: | |
| # Entailment prob - Contradiction prob (normalized to [0,1]). | |
| # Higher = less hallucination. | |
| # """ | |
| # if not reference or not response: | |
| # return 0.0 | |
| # with torch.no_grad(): | |
| # inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True) | |
| # outputs = nli_model(**inputs) | |
| # probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] | |
| # entail_prob, contra_prob = 0.0, 0.0 | |
| # for idx, p in enumerate(probs): | |
| # label = id2label.get(idx, "") | |
| # if "ENTAIL" in label: | |
| # entail_prob = float(p) | |
| # elif "CONTRA" in label: | |
| # contra_prob = float(p) | |
| # score = entail_prob - contra_prob | |
| # score = (score + 1) / 2 # normalize [-1,1] β [0,1] | |
| # return round(max(0.0, min(1.0, score)), 3) | |
| # def check_assumption(response: str) -> float: | |
| # """Detect speculative/hedging terms.""" | |
| # if not response: | |
| # return 0.0 | |
| # speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"] | |
| # count = sum(1 for t in speculative_terms if t in response.lower()) | |
| # score = 1.0 - min(count / 5.0, 1.0) # smoother decay | |
| # return round(score, 3) | |
| # def check_coherence(response: str) -> float: | |
| # """Heuristic coherence metric: penalizes very short/long, rewards sentence balance.""" | |
| # if not response: | |
| # return 0.0 | |
| # words = len(re.findall(r"\w+", response)) | |
| # sents = max(1, len(re.split(r"[.!?]+", response)) - 1) | |
| # if words < 5: | |
| # return 0.3 | |
| # if words > 200: | |
| # return 0.5 | |
| # base = min(1.0, (words / 50.0) + (sents / 5.0)) | |
| # return round(max(0.4, min(base, 0.95)), 3) | |
| # def check_accuracy(reference: str, response: str) -> float: | |
| # """Semantic similarity between reference and response via embeddings (cosine).""" | |
| # if not reference or not response: | |
| # return 0.0 | |
| # ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| # resp_emb = embed_model.encode(response, convert_to_tensor=True) | |
| # sim = float(util.cos_sim(ref_emb, resp_emb).item()) | |
| # return round(max(0.0, min(1.0, sim)), 3) | |
| # # -------------------------- | |
| # # ROW & DF EVALUATION | |
| # # -------------------------- | |
| # def evaluate_row(row): | |
| # prompt = row.get("prompt", "") | |
| # response = row.get("response", "") | |
| # reference = row.get("reference", "") | |
| # metrics = { | |
| # "task_id": row.get("task_id", ""), | |
| # "agent": row.get("agent", ""), | |
| # "instruction_following": check_instruction_following(prompt, response), | |
| # "hallucination": check_hallucination(reference, response), | |
| # "assumption": check_assumption(response), | |
| # "coherence": check_coherence(response), | |
| # "accuracy": check_accuracy(reference, response), | |
| # } | |
| # # Weighted avg score (you can adjust weights) | |
| # metrics["final_score"] = round( | |
| # 0.25 * metrics["instruction_following"] | |
| # + 0.25 * metrics["accuracy"] | |
| # + 0.2 * metrics["hallucination"] | |
| # + 0.15 * metrics["coherence"] | |
| # + 0.15 * metrics["assumption"], | |
| # 3, | |
| # ) | |
| # return metrics | |
| # def evaluate_dataframe(df: pd.DataFrame): | |
| # metrics_df = df.apply(evaluate_row, axis=1, result_type="expand") | |
| # # Leaderboard | |
| # leaderboard = ( | |
| # metrics_df.groupby(["agent", "task_id"])["final_score"] | |
| # .mean() | |
| # .reset_index() | |
| # ) | |
| # # # Plots | |
| # # images = [] | |
| # # Existing images list | |
| # images = [] | |
| # # Add radar chart | |
| # radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(), | |
| # | |
| ############################################################################################################################### | |
| # evaluator.py | |
| """ | |
| Upgraded Evaluation logic for the Agentic Evaluation Framework. | |
| Provides scoring functions, visualization generation, and summary outputs. | |
| """ | |
| import math | |
| import uuid | |
| from typing import List, Dict, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| # ----------------------------- | |
| # Lazy model loading | |
| # ----------------------------- | |
| NLI_MODEL = "textattack/roberta-base-MNLI" | |
| EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| _nli_tokenizer = None | |
| _nli_model = None | |
| _embed_model = None | |
| _id2label = None | |
| def ensure_models_loaded(): | |
| global _nli_tokenizer, _nli_model, _embed_model, _id2label | |
| if _embed_model is None: | |
| from sentence_transformers import SentenceTransformer, util | |
| _embed_model = SentenceTransformer(EMBED_MODEL) | |
| globals()["util"] = util | |
| if _nli_model is None: | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| _nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) | |
| _nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) | |
| _nli_model.to("cpu") | |
| _nli_model.eval() | |
| _id2label = {int(k): v.upper() for k, v in _nli_model.config.id2label.items()} | |
| def get_embed_model(): | |
| ensure_models_loaded() | |
| return _embed_model | |
| def get_nli_tokenizer_and_model(): | |
| ensure_models_loaded() | |
| return _nli_tokenizer, _nli_model, _id2label | |
| # ----------------------------- | |
| # Metric functions | |
| # ----------------------------- | |
| def check_instruction_following(prompt: str, response: str) -> float: | |
| if not prompt or not response: | |
| return 0.0 | |
| embed_model = get_embed_model() | |
| p_emb = embed_model.encode(prompt, convert_to_tensor=True) | |
| r_emb = embed_model.encode(response, convert_to_tensor=True) | |
| sim = float(util.cos_sim(p_emb, r_emb).item()) | |
| return round(max(0.0, min(1.0, sim)), 3) | |
| def check_hallucination(prompt: str, response: str) -> float: | |
| if not prompt or not response: | |
| return 0.0 | |
| tokenizer, model, id2label = get_nli_tokenizer_and_model() | |
| inputs = tokenizer.encode_plus(prompt, response, return_tensors="pt", truncation=True) | |
| outputs = model(**inputs) | |
| probs = outputs.logits.softmax(dim=1).detach().cpu().numpy()[0] | |
| labels = [id2label[i] for i in range(len(probs))] | |
| if "ENTAILMENT" in labels: | |
| entailment_prob = float(probs[labels.index("ENTAILMENT")]) | |
| else: | |
| entailment_prob = float(probs.max()) | |
| return round(entailment_prob, 3) | |
| def check_accuracy(reference: str, response: str) -> float: | |
| if not reference or not response: | |
| return 0.0 | |
| embed_model = get_embed_model() | |
| ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| r_emb = embed_model.encode(response, convert_to_tensor=True) | |
| sim = float(util.cos_sim(ref_emb, r_emb).item()) | |
| return round(max(0.0, min(1.0, sim)), 3) | |
| def check_coherence(response: str) -> float: | |
| if not response or not isinstance(response, str): | |
| return 0.0 | |
| sentences = [s.strip() for s in response.split(".") if s.strip()] | |
| if not sentences: | |
| return 0.0 | |
| lengths = [len(s.split()) for s in sentences] | |
| avg_len = np.mean(lengths) | |
| std = np.std(lengths) | |
| score = max(0.0, min(1.0, 1.0 - (std / (avg_len + 1e-6)))) | |
| return round(score, 3) | |
| def check_fluency(response: str) -> float: | |
| if not response or not isinstance(response, str): | |
| return 0.0 | |
| letters = sum(ch.isalpha() for ch in response) | |
| total = len(response) | |
| return round(max(0.0, min(1.0, letters / max(1, total))), 3) | |
| # ----------------------------- | |
| # Visualization helpers | |
| # ----------------------------- | |
| def spider_net_multi(labels: List[str], rows: List[Dict], title: str, fill_alpha: float = 0.12): | |
| N = len(labels) | |
| angles = [n / float(N) * 2 * math.pi for n in range(N)] | |
| angles += angles[:1] | |
| fig = plt.figure(figsize=(6.5, 6.5)) | |
| ax = plt.subplot(111, polar=True) | |
| ax.set_xticks(angles[:-1]) | |
| ax.set_xticklabels(labels, fontsize=9) | |
| ax.set_ylim(0, 1) | |
| ax.set_yticks([0, 0.25, 0.5, 0.75, 1.0]) | |
| for r in rows: | |
| values = r["values"] | |
| values_closed = values + values[:1] | |
| ax.plot(angles, values_closed, linewidth=1.5, label=r["name"]) | |
| ax.fill(angles, values_closed, alpha=fill_alpha) | |
| ax.set_title(title, y=1.08, fontsize=12) | |
| ax.legend(loc="upper right", bbox_to_anchor=(1.25, 1.1)) | |
| return fig | |
| def heatmap_plot(df: pd.DataFrame, metric_cols: List[str], title: str = "Metric Correlations"): | |
| fig, ax = plt.subplots(figsize=(7, 5)) | |
| sns.heatmap(df[metric_cols].corr(), annot=True, fmt=".2f", cmap="coolwarm", ax=ax) | |
| ax.set_title(title) | |
| return fig | |
| def task_agent_heatmap(leaderboard: pd.DataFrame, metric: str): | |
| pivot = leaderboard.pivot(index="task", columns="agent", values=metric) | |
| fig, ax = plt.subplots(figsize=(7, 5)) | |
| sns.heatmap(pivot, annot=True, fmt=".2f", cmap="YlGnBu", ax=ax) | |
| ax.set_title(f"Task-Agent Performance ({metric})") | |
| return fig | |
| def leaderboard_barplot(leaderboard: pd.DataFrame, metric_cols: List[str]): | |
| melted = leaderboard.melt(id_vars=["agent"], value_vars=metric_cols, var_name="metric", value_name="score") | |
| fig, ax = plt.subplots(figsize=(8, 5)) | |
| sns.barplot(x="metric", y="score", hue="agent", data=melted, ax=ax) | |
| ax.set_title("Leaderboard Bar Chart") | |
| ax.set_ylim(0, 1) | |
| return fig | |
| def distribution_plot(metrics_df: pd.DataFrame, metric: str): | |
| fig, ax = plt.subplots(figsize=(7, 5)) | |
| sns.boxplot(x="agent", y=metric, data=metrics_df, ax=ax) | |
| sns.stripplot(x="agent", y=metric, data=metrics_df, ax=ax, color="black", alpha=0.4, jitter=True) | |
| ax.set_title(f"Distribution of {metric} Scores per Agent") | |
| ax.set_ylim(0, 1) | |
| return fig | |
| def scatter_two_metrics(metrics_df: pd.DataFrame, metric_x: str, metric_y: str): | |
| fig, ax = plt.subplots(figsize=(6, 6)) | |
| sns.scatterplot(x=metric_x, y=metric_y, hue="agent", data=metrics_df, ax=ax, alpha=0.7) | |
| ax.set_title(f"{metric_x} vs {metric_y}") | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| return fig | |
| # ----------------------------- | |
| # Main evaluation entrypoint | |
| # ----------------------------- | |
| def evaluate_dataframe(df: pd.DataFrame) -> Tuple[pd.DataFrame, List[Tuple[str,str]], pd.DataFrame]: | |
| df = df.copy() | |
| # compute scores per row | |
| scores = [] | |
| for _, row in df.iterrows(): | |
| s = {} | |
| s["instruction_following"] = check_instruction_following(str(row.get("prompt", "")), str(row.get("response", ""))) | |
| s["hallucination"] = check_hallucination(str(row.get("prompt", "")), str(row.get("response", ""))) | |
| s["accuracy"] = check_accuracy(str(row.get("reference", "")), str(row.get("response", ""))) | |
| s["coherence"] = check_coherence(str(row.get("response", ""))) | |
| s["fluency"] = check_fluency(str(row.get("response", ""))) | |
| scores.append(s) | |
| metrics_df = pd.concat([df.reset_index(drop=True), pd.DataFrame(scores)], axis=1) | |
| # leaderboard: average per agent & task | |
| metric_cols = ["instruction_following", "hallucination", "accuracy", "coherence", "fluency"] | |
| leaderboard = ( | |
| metrics_df.groupby(["agent", "task"])[metric_cols] | |
| .mean() | |
| .reset_index() | |
| ) | |
| # ------------------- | |
| # Visualization images | |
| # ------------------- | |
| images = [] | |
| try: | |
| rows = [] | |
| for agent in leaderboard["agent"].unique(): | |
| vals = leaderboard[leaderboard["agent"] == agent][metric_cols].mean().tolist() | |
| rows.append({"name": agent, "values": vals}) | |
| fig1 = spider_net_multi(metric_cols, rows, "Agent Performance Radar") | |
| path1 = f"/tmp/radar_{uuid.uuid4().hex}.png" | |
| fig1.savefig(path1, bbox_inches="tight") | |
| plt.close(fig1) | |
| images.append((path1, "Radar Plot")) | |
| except Exception as e: | |
| print("Radar plot failed:", e) | |
| try: | |
| fig2 = heatmap_plot(metrics_df, metric_cols, title="Metric Correlation Heatmap") | |
| path2 = f"/tmp/heatmap_{uuid.uuid4().hex}.png" | |
| fig2.savefig(path2, bbox_inches="tight") | |
| plt.close(fig2) | |
| images.append((path2, "Correlation Heatmap")) | |
| except Exception as e: | |
| print("Heatmap failed:", e) | |
| try: | |
| fig3 = task_agent_heatmap(leaderboard, "accuracy") | |
| path3 = f"/tmp/task_agent_{uuid.uuid4().hex}.png" | |
| fig3.savefig(path3, bbox_inches="tight") | |
| plt.close(fig3) | |
| images.append((path3, "Task-Agent Heatmap (Accuracy)")) | |
| except Exception as e: | |
| print("Task-agent heatmap failed:", e) | |
| try: | |
| fig4 = leaderboard_barplot(leaderboard, metric_cols) | |
| path4 = f"/tmp/barplot_{uuid.uuid4().hex}.png" | |
| fig4.savefig(path4, bbox_inches="tight") | |
| plt.close(fig4) | |
| images.append((path4, "Leaderboard Bar Chart")) | |
| except Exception as e: | |
| print("Barplot failed:", e) | |
| try: | |
| fig5 = distribution_plot(metrics_df, "accuracy") | |
| path5 = f"/tmp/distribution_{uuid.uuid4().hex}.png" | |
| fig5.savefig(path5, bbox_inches="tight") | |
| plt.close(fig5) | |
| images.append((path5, "Accuracy Distribution")) | |
| except Exception as e: | |
| print("Distribution plot failed:", e) | |
| try: | |
| fig6 = scatter_two_metrics(metrics_df, "instruction_following", "accuracy") | |
| path6 = f"/tmp/scatter_{uuid.uuid4().hex}.png" | |
| fig6.savefig(path6, bbox_inches="tight") | |
| plt.close(fig6) | |
| images.append((path6, "Instruction Following vs Accuracy")) | |
| except Exception as e: | |
| print("Scatter plot failed:", e) | |
| return metrics_df, images, leaderboard | |