evaluation-framework / evaluator.py
manayporwal07's picture
Update evaluator.py
260dd1f verified
raw
history blame
16.7 kB
#####################################################################################################################################################################
# import re
# import json
# import torch
# import pandas as pd
# import matplotlib.pyplot as plt
# import seaborn as sns
# import os
# import uuid
# from transformers import AutoTokenizer, AutoModelForSequenceClassification
# from sentence_transformers import SentenceTransformer, util
# import matplotlib.pyplot as plt
# import numpy as np
# def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"):
# """
# Radar chart comparing multiple agents across metrics.
# """
# labels = metrics
# num_vars = len(labels)
# # Compute angle for each axis
# angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
# angles += angles[:1] # close loop
# fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
# for agent in agents:
# values = []
# for m in metrics:
# mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean()
# values.append(mean_val if not np.isnan(mean_val) else 0)
# values += values[:1]
# ax.plot(angles, values, label=agent, linewidth=2)
# ax.fill(angles, values, alpha=0.25)
# ax.set_xticks(angles[:-1])
# ax.set_xticklabels(labels)
# ax.set_yticklabels([])
# ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1))
# ax.set_title("Agent Performance Radar Chart")
# plt.tight_layout()
# plt.savefig(out_path)
# plt.close()
# return out_path
# import seaborn as sns
# def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"):
# pivot = metrics_df.groupby("agent")[
# ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"]
# ].mean()
# plt.figure(figsize=(8, 5))
# sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f")
# plt.title("Agent Γ— Metric Heatmap")
# plt.tight_layout()
# plt.savefig(out_path)
# plt.close()
# return out_path
# # --------------------------
# # MODEL LOADING
# # --------------------------
# NLI_MODEL = "textattack/roberta-base-MNLI"
# EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# # Load NLI model & tokenizer
# nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL)
# nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL)
# nli_model.to("cpu")
# nli_model.eval()
# # Load embedding model
# embed_model = SentenceTransformer(EMBED_MODEL)
# # Label mapping from config
# id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()}
# # --------------------------
# # METRIC FUNCTIONS
# # --------------------------
# def check_instruction_following(prompt: str, response: str) -> float:
# """Embedding-based similarity between prompt and response."""
# if not prompt or not response:
# return 0.0
# p_emb = embed_model.encode(prompt, convert_to_tensor=True)
# r_emb = embed_model.encode(response, convert_to_tensor=True)
# sim = float(util.cos_sim(p_emb, r_emb).item())
# return round(max(0.0, min(1.0, sim)), 3)
# def check_hallucination(reference: str, response: str) -> float:
# """
# Single hallucination score:
# Entailment prob - Contradiction prob (normalized to [0,1]).
# Higher = less hallucination.
# """
# if not reference or not response:
# return 0.0
# with torch.no_grad():
# inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True)
# outputs = nli_model(**inputs)
# probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
# entail_prob, contra_prob = 0.0, 0.0
# for idx, p in enumerate(probs):
# label = id2label.get(idx, "")
# if "ENTAIL" in label:
# entail_prob = float(p)
# elif "CONTRA" in label:
# contra_prob = float(p)
# score = entail_prob - contra_prob
# score = (score + 1) / 2 # normalize [-1,1] β†’ [0,1]
# return round(max(0.0, min(1.0, score)), 3)
# def check_assumption(response: str) -> float:
# """Detect speculative/hedging terms."""
# if not response:
# return 0.0
# speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"]
# count = sum(1 for t in speculative_terms if t in response.lower())
# score = 1.0 - min(count / 5.0, 1.0) # smoother decay
# return round(score, 3)
# def check_coherence(response: str) -> float:
# """Heuristic coherence metric: penalizes very short/long, rewards sentence balance."""
# if not response:
# return 0.0
# words = len(re.findall(r"\w+", response))
# sents = max(1, len(re.split(r"[.!?]+", response)) - 1)
# if words < 5:
# return 0.3
# if words > 200:
# return 0.5
# base = min(1.0, (words / 50.0) + (sents / 5.0))
# return round(max(0.4, min(base, 0.95)), 3)
# def check_accuracy(reference: str, response: str) -> float:
# """Semantic similarity between reference and response via embeddings (cosine)."""
# if not reference or not response:
# return 0.0
# ref_emb = embed_model.encode(reference, convert_to_tensor=True)
# resp_emb = embed_model.encode(response, convert_to_tensor=True)
# sim = float(util.cos_sim(ref_emb, resp_emb).item())
# return round(max(0.0, min(1.0, sim)), 3)
# # --------------------------
# # ROW & DF EVALUATION
# # --------------------------
# def evaluate_row(row):
# prompt = row.get("prompt", "")
# response = row.get("response", "")
# reference = row.get("reference", "")
# metrics = {
# "task_id": row.get("task_id", ""),
# "agent": row.get("agent", ""),
# "instruction_following": check_instruction_following(prompt, response),
# "hallucination": check_hallucination(reference, response),
# "assumption": check_assumption(response),
# "coherence": check_coherence(response),
# "accuracy": check_accuracy(reference, response),
# }
# # Weighted avg score (you can adjust weights)
# metrics["final_score"] = round(
# 0.25 * metrics["instruction_following"]
# + 0.25 * metrics["accuracy"]
# + 0.2 * metrics["hallucination"]
# + 0.15 * metrics["coherence"]
# + 0.15 * metrics["assumption"],
# 3,
# )
# return metrics
# def evaluate_dataframe(df: pd.DataFrame):
# metrics_df = df.apply(evaluate_row, axis=1, result_type="expand")
# # Leaderboard
# leaderboard = (
# metrics_df.groupby(["agent", "task_id"])["final_score"]
# .mean()
# .reset_index()
# )
# # # Plots
# # images = []
# # Existing images list
# images = []
# # Add radar chart
# radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(),
#
###############################################################################################################################
# evaluator.py
"""
Upgraded Evaluation logic for the Agentic Evaluation Framework.
Provides scoring functions, visualization generation, and summary outputs.
"""
import math
import uuid
from typing import List, Dict, Tuple
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# -----------------------------
# Lazy model loading
# -----------------------------
NLI_MODEL = "textattack/roberta-base-MNLI"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
_nli_tokenizer = None
_nli_model = None
_embed_model = None
_id2label = None
def ensure_models_loaded():
global _nli_tokenizer, _nli_model, _embed_model, _id2label
if _embed_model is None:
from sentence_transformers import SentenceTransformer, util
_embed_model = SentenceTransformer(EMBED_MODEL)
globals()["util"] = util
if _nli_model is None:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
_nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL)
_nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL)
_nli_model.to("cpu")
_nli_model.eval()
_id2label = {int(k): v.upper() for k, v in _nli_model.config.id2label.items()}
def get_embed_model():
ensure_models_loaded()
return _embed_model
def get_nli_tokenizer_and_model():
ensure_models_loaded()
return _nli_tokenizer, _nli_model, _id2label
# -----------------------------
# Metric functions
# -----------------------------
def check_instruction_following(prompt: str, response: str) -> float:
if not prompt or not response:
return 0.0
embed_model = get_embed_model()
p_emb = embed_model.encode(prompt, convert_to_tensor=True)
r_emb = embed_model.encode(response, convert_to_tensor=True)
sim = float(util.cos_sim(p_emb, r_emb).item())
return round(max(0.0, min(1.0, sim)), 3)
def check_hallucination(prompt: str, response: str) -> float:
if not prompt or not response:
return 0.0
tokenizer, model, id2label = get_nli_tokenizer_and_model()
inputs = tokenizer.encode_plus(prompt, response, return_tensors="pt", truncation=True)
outputs = model(**inputs)
probs = outputs.logits.softmax(dim=1).detach().cpu().numpy()[0]
labels = [id2label[i] for i in range(len(probs))]
if "ENTAILMENT" in labels:
entailment_prob = float(probs[labels.index("ENTAILMENT")])
else:
entailment_prob = float(probs.max())
return round(entailment_prob, 3)
def check_accuracy(reference: str, response: str) -> float:
if not reference or not response:
return 0.0
embed_model = get_embed_model()
ref_emb = embed_model.encode(reference, convert_to_tensor=True)
r_emb = embed_model.encode(response, convert_to_tensor=True)
sim = float(util.cos_sim(ref_emb, r_emb).item())
return round(max(0.0, min(1.0, sim)), 3)
def check_coherence(response: str) -> float:
if not response or not isinstance(response, str):
return 0.0
sentences = [s.strip() for s in response.split(".") if s.strip()]
if not sentences:
return 0.0
lengths = [len(s.split()) for s in sentences]
avg_len = np.mean(lengths)
std = np.std(lengths)
score = max(0.0, min(1.0, 1.0 - (std / (avg_len + 1e-6))))
return round(score, 3)
def check_fluency(response: str) -> float:
if not response or not isinstance(response, str):
return 0.0
letters = sum(ch.isalpha() for ch in response)
total = len(response)
return round(max(0.0, min(1.0, letters / max(1, total))), 3)
# -----------------------------
# Visualization helpers
# -----------------------------
def spider_net_multi(labels: List[str], rows: List[Dict], title: str, fill_alpha: float = 0.12):
N = len(labels)
angles = [n / float(N) * 2 * math.pi for n in range(N)]
angles += angles[:1]
fig = plt.figure(figsize=(6.5, 6.5))
ax = plt.subplot(111, polar=True)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels, fontsize=9)
ax.set_ylim(0, 1)
ax.set_yticks([0, 0.25, 0.5, 0.75, 1.0])
for r in rows:
values = r["values"]
values_closed = values + values[:1]
ax.plot(angles, values_closed, linewidth=1.5, label=r["name"])
ax.fill(angles, values_closed, alpha=fill_alpha)
ax.set_title(title, y=1.08, fontsize=12)
ax.legend(loc="upper right", bbox_to_anchor=(1.25, 1.1))
return fig
def heatmap_plot(df: pd.DataFrame, metric_cols: List[str], title: str = "Metric Correlations"):
fig, ax = plt.subplots(figsize=(7, 5))
sns.heatmap(df[metric_cols].corr(), annot=True, fmt=".2f", cmap="coolwarm", ax=ax)
ax.set_title(title)
return fig
def task_agent_heatmap(leaderboard: pd.DataFrame, metric: str):
pivot = leaderboard.pivot(index="task", columns="agent", values=metric)
fig, ax = plt.subplots(figsize=(7, 5))
sns.heatmap(pivot, annot=True, fmt=".2f", cmap="YlGnBu", ax=ax)
ax.set_title(f"Task-Agent Performance ({metric})")
return fig
def leaderboard_barplot(leaderboard: pd.DataFrame, metric_cols: List[str]):
melted = leaderboard.melt(id_vars=["agent"], value_vars=metric_cols, var_name="metric", value_name="score")
fig, ax = plt.subplots(figsize=(8, 5))
sns.barplot(x="metric", y="score", hue="agent", data=melted, ax=ax)
ax.set_title("Leaderboard Bar Chart")
ax.set_ylim(0, 1)
return fig
def distribution_plot(metrics_df: pd.DataFrame, metric: str):
fig, ax = plt.subplots(figsize=(7, 5))
sns.boxplot(x="agent", y=metric, data=metrics_df, ax=ax)
sns.stripplot(x="agent", y=metric, data=metrics_df, ax=ax, color="black", alpha=0.4, jitter=True)
ax.set_title(f"Distribution of {metric} Scores per Agent")
ax.set_ylim(0, 1)
return fig
def scatter_two_metrics(metrics_df: pd.DataFrame, metric_x: str, metric_y: str):
fig, ax = plt.subplots(figsize=(6, 6))
sns.scatterplot(x=metric_x, y=metric_y, hue="agent", data=metrics_df, ax=ax, alpha=0.7)
ax.set_title(f"{metric_x} vs {metric_y}")
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
return fig
# -----------------------------
# Main evaluation entrypoint
# -----------------------------
def evaluate_dataframe(df: pd.DataFrame) -> Tuple[pd.DataFrame, List[Tuple[str,str]], pd.DataFrame]:
df = df.copy()
# compute scores per row
scores = []
for _, row in df.iterrows():
s = {}
s["instruction_following"] = check_instruction_following(str(row.get("prompt", "")), str(row.get("response", "")))
s["hallucination"] = check_hallucination(str(row.get("prompt", "")), str(row.get("response", "")))
s["accuracy"] = check_accuracy(str(row.get("reference", "")), str(row.get("response", "")))
s["coherence"] = check_coherence(str(row.get("response", "")))
s["fluency"] = check_fluency(str(row.get("response", "")))
scores.append(s)
metrics_df = pd.concat([df.reset_index(drop=True), pd.DataFrame(scores)], axis=1)
# leaderboard: average per agent & task
metric_cols = ["instruction_following", "hallucination", "accuracy", "coherence", "fluency"]
leaderboard = (
metrics_df.groupby(["agent", "task"])[metric_cols]
.mean()
.reset_index()
)
# -------------------
# Visualization images
# -------------------
images = []
try:
rows = []
for agent in leaderboard["agent"].unique():
vals = leaderboard[leaderboard["agent"] == agent][metric_cols].mean().tolist()
rows.append({"name": agent, "values": vals})
fig1 = spider_net_multi(metric_cols, rows, "Agent Performance Radar")
path1 = f"/tmp/radar_{uuid.uuid4().hex}.png"
fig1.savefig(path1, bbox_inches="tight")
plt.close(fig1)
images.append((path1, "Radar Plot"))
except Exception as e:
print("Radar plot failed:", e)
try:
fig2 = heatmap_plot(metrics_df, metric_cols, title="Metric Correlation Heatmap")
path2 = f"/tmp/heatmap_{uuid.uuid4().hex}.png"
fig2.savefig(path2, bbox_inches="tight")
plt.close(fig2)
images.append((path2, "Correlation Heatmap"))
except Exception as e:
print("Heatmap failed:", e)
try:
fig3 = task_agent_heatmap(leaderboard, "accuracy")
path3 = f"/tmp/task_agent_{uuid.uuid4().hex}.png"
fig3.savefig(path3, bbox_inches="tight")
plt.close(fig3)
images.append((path3, "Task-Agent Heatmap (Accuracy)"))
except Exception as e:
print("Task-agent heatmap failed:", e)
try:
fig4 = leaderboard_barplot(leaderboard, metric_cols)
path4 = f"/tmp/barplot_{uuid.uuid4().hex}.png"
fig4.savefig(path4, bbox_inches="tight")
plt.close(fig4)
images.append((path4, "Leaderboard Bar Chart"))
except Exception as e:
print("Barplot failed:", e)
try:
fig5 = distribution_plot(metrics_df, "accuracy")
path5 = f"/tmp/distribution_{uuid.uuid4().hex}.png"
fig5.savefig(path5, bbox_inches="tight")
plt.close(fig5)
images.append((path5, "Accuracy Distribution"))
except Exception as e:
print("Distribution plot failed:", e)
try:
fig6 = scatter_two_metrics(metrics_df, "instruction_following", "accuracy")
path6 = f"/tmp/scatter_{uuid.uuid4().hex}.png"
fig6.savefig(path6, bbox_inches="tight")
plt.close(fig6)
images.append((path6, "Instruction Following vs Accuracy"))
except Exception as e:
print("Scatter plot failed:", e)
return metrics_df, images, leaderboard