evaluation-framework / evaluator.py
Supastrikas-004's picture
Update evaluator.py (#11)
9b5f327 verified
raw
history blame
15.2 kB
#####################################################################################################################################################################
# import re
# import json
# import torch
# import pandas as pd
# import matplotlib.pyplot as plt
# import seaborn as sns
# import os
# import uuid
# from transformers import AutoTokenizer, AutoModelForSequenceClassification
# from sentence_transformers import SentenceTransformer, util
# import matplotlib.pyplot as plt
# import numpy as np
# def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"):
# """
# Radar chart comparing multiple agents across metrics.
# """
# labels = metrics
# num_vars = len(labels)
# # Compute angle for each axis
# angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
# angles += angles[:1] # close loop
# fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
# for agent in agents:
# values = []
# for m in metrics:
# mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean()
# values.append(mean_val if not np.isnan(mean_val) else 0)
# values += values[:1]
# ax.plot(angles, values, label=agent, linewidth=2)
# ax.fill(angles, values, alpha=0.25)
# ax.set_xticks(angles[:-1])
# ax.set_xticklabels(labels)
# ax.set_yticklabels([])
# ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1))
# ax.set_title("Agent Performance Radar Chart")
# plt.tight_layout()
# plt.savefig(out_path)
# plt.close()
# return out_path
# import seaborn as sns
# def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"):
# pivot = metrics_df.groupby("agent")[
# ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"]
# ].mean()
# plt.figure(figsize=(8, 5))
# sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f")
# plt.title("Agent Γ— Metric Heatmap")
# plt.tight_layout()
# plt.savefig(out_path)
# plt.close()
# return out_path
# # --------------------------
# # MODEL LOADING
# # --------------------------
# NLI_MODEL = "textattack/roberta-base-MNLI"
# EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# # Load NLI model & tokenizer
# nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL)
# nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL)
# nli_model.to("cpu")
# nli_model.eval()
# # Load embedding model
# embed_model = SentenceTransformer(EMBED_MODEL)
# # Label mapping from config
# id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()}
# # --------------------------
# # METRIC FUNCTIONS
# # --------------------------
# def check_instruction_following(prompt: str, response: str) -> float:
# """Embedding-based similarity between prompt and response."""
# if not prompt or not response:
# return 0.0
# p_emb = embed_model.encode(prompt, convert_to_tensor=True)
# r_emb = embed_model.encode(response, convert_to_tensor=True)
# sim = float(util.cos_sim(p_emb, r_emb).item())
# return round(max(0.0, min(1.0, sim)), 3)
# def check_hallucination(reference: str, response: str) -> float:
# """
# Single hallucination score:
# Entailment prob - Contradiction prob (normalized to [0,1]).
# Higher = less hallucination.
# """
# if not reference or not response:
# return 0.0
# with torch.no_grad():
# inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True)
# outputs = nli_model(**inputs)
# probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
# entail_prob, contra_prob = 0.0, 0.0
# for idx, p in enumerate(probs):
# label = id2label.get(idx, "")
# if "ENTAIL" in label:
# entail_prob = float(p)
# elif "CONTRA" in label:
# contra_prob = float(p)
# score = entail_prob - contra_prob
# score = (score + 1) / 2 # normalize [-1,1] β†’ [0,1]
# return round(max(0.0, min(1.0, score)), 3)
# def check_assumption(response: str) -> float:
# """Detect speculative/hedging terms."""
# if not response:
# return 0.0
# speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"]
# count = sum(1 for t in speculative_terms if t in response.lower())
# score = 1.0 - min(count / 5.0, 1.0) # smoother decay
# return round(score, 3)
# def check_coherence(response: str) -> float:
# """Heuristic coherence metric: penalizes very short/long, rewards sentence balance."""
# if not response:
# return 0.0
# words = len(re.findall(r"\w+", response))
# sents = max(1, len(re.split(r"[.!?]+", response)) - 1)
# if words < 5:
# return 0.3
# if words > 200:
# return 0.5
# base = min(1.0, (words / 50.0) + (sents / 5.0))
# return round(max(0.4, min(base, 0.95)), 3)
# def check_accuracy(reference: str, response: str) -> float:
# """Semantic similarity between reference and response via embeddings (cosine)."""
# if not reference or not response:
# return 0.0
# ref_emb = embed_model.encode(reference, convert_to_tensor=True)
# resp_emb = embed_model.encode(response, convert_to_tensor=True)
# sim = float(util.cos_sim(ref_emb, resp_emb).item())
# return round(max(0.0, min(1.0, sim)), 3)
# # --------------------------
# # ROW & DF EVALUATION
# # --------------------------
# def evaluate_row(row):
# prompt = row.get("prompt", "")
# response = row.get("response", "")
# reference = row.get("reference", "")
# metrics = {
# "task_id": row.get("task_id", ""),
# "agent": row.get("agent", ""),
# "instruction_following": check_instruction_following(prompt, response),
# "hallucination": check_hallucination(reference, response),
# "assumption": check_assumption(response),
# "coherence": check_coherence(response),
# "accuracy": check_accuracy(reference, response),
# }
# # Weighted avg score (you can adjust weights)
# metrics["final_score"] = round(
# 0.25 * metrics["instruction_following"]
# + 0.25 * metrics["accuracy"]
# + 0.2 * metrics["hallucination"]
# + 0.15 * metrics["coherence"]
# + 0.15 * metrics["assumption"],
# 3,
# )
# return metrics
# def evaluate_dataframe(df: pd.DataFrame):
# metrics_df = df.apply(evaluate_row, axis=1, result_type="expand")
# # Leaderboard
# leaderboard = (
# metrics_df.groupby(["agent", "task_id"])["final_score"]
# .mean()
# .reset_index()
# )
# # # Plots
# # images = []
# # Existing images list
# images = []
# # Add radar chart
# radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(),
#
###############################################################################################################################
"""
Evaluation logic for Agentic Evaluation Framework.
"""
import os
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForCausalLM,
pipeline,
)
from sentence_transformers import SentenceTransformer
import evaluate
# -----------------------------
# Global Config
# -----------------------------
NLI_MODEL = "microsoft/deberta-v2-xlarge-mnli"
EMBED_MODEL = "all-MiniLM-L6-v2"
LLM_JUDGE_MODEL = "microsoft/DialoGPT-small"
FLUENCY_MODEL = "textattack/roberta-base-CoLA"
device = 0 if torch.cuda.is_available() else -1
# Caches
_nli_model, _nli_tokenizer = None, None
_embed_model = None
_judge_model, _judge_tokenizer = None, None
_fluency_checker = None
# Metrics
bertscore = evaluate.load("bertscore")
bleu = evaluate.load("bleu")
rouge = evaluate.load("rouge")
# -----------------------------
# Lazy Model Loaders
# -----------------------------
def get_nli_model():
global _nli_model, _nli_tokenizer
if _nli_model is None:
_nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL)
_nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL).to(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
_nli_model.eval()
return _nli_model, _nli_tokenizer
def get_embed_model():
global _embed_model
if _embed_model is None:
_embed_model = SentenceTransformer(EMBED_MODEL, device="cuda" if torch.cuda.is_available() else "cpu")
return _embed_model
def get_judge_model():
global _judge_model, _judge_tokenizer
if _judge_model is None:
_judge_tokenizer = AutoTokenizer.from_pretrained(LLM_JUDGE_MODEL)
_judge_model = AutoModelForCausalLM.from_pretrained(LLM_JUDGE_MODEL).to(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
return _judge_model, _judge_tokenizer
def get_fluency_checker():
global _fluency_checker
if _fluency_checker is None:
_fluency_checker = pipeline(
"text-classification", model=FLUENCY_MODEL, device=device
)
return _fluency_checker
# -----------------------------
# Evaluation Functions
# -----------------------------
def check_instruction_following(prompt, response):
try:
nli_model, nli_tokenizer = get_nli_model()
inputs = nli_tokenizer(prompt, response, return_tensors="pt", truncation=True, padding=True).to(
nli_model.device
)
with torch.no_grad():
logits = nli_model(**inputs).logits
probs = torch.softmax(logits, dim=-1).cpu().numpy()[0]
entailment_score = probs[2] # entailment index
return float(entailment_score)
except Exception:
return 0.0
def check_hallucination(reference, response):
try:
nli_model, nli_tokenizer = get_nli_model()
inputs = nli_tokenizer(reference, response, return_tensors="pt", truncation=True, padding=True).to(
nli_model.device
)
with torch.no_grad():
logits = nli_model(**inputs).logits
probs = torch.softmax(logits, dim=-1).cpu().numpy()[0]
contradiction_score = probs[0] # contradiction index
return 1.0 - float(contradiction_score)
except Exception:
return 0.0
def check_assumption(prompt, response):
try:
judge_model, judge_tokenizer = get_judge_model()
input_text = f"Does this response make assumptions not in the prompt?\nPrompt: {prompt}\nResponse: {response}\nAnswer yes or no:"
inputs = judge_tokenizer.encode(input_text, return_tensors="pt").to(judge_model.device)
outputs = judge_model.generate(inputs, max_length=50)
judgment = judge_tokenizer.decode(outputs[0], skip_special_tokens=True).lower()
if "yes" in judgment:
return 0.0
elif "no" in judgment:
return 1.0
return 0.5
except Exception:
return 0.5
def check_coherence(response):
try:
emb = get_embed_model().encode(response, convert_to_tensor=True, normalize_embeddings=True)
coherence = float(torch.mean(emb).cpu().item())
return coherence
except Exception:
return 0.0
def check_accuracy(reference, response):
try:
bert_results = bertscore.compute(predictions=[response], references=[reference], lang="en")
bert_f1 = bert_results["f1"][0]
except Exception:
bert_f1 = 0.0
try:
bleu_results = bleu.compute(predictions=[response], references=[[reference]])
bleu_score = bleu_results["bleu"]
except Exception:
bleu_score = 0.0
try:
rouge_results = rouge.compute(predictions=[response], references=[reference])
rouge_l = rouge_results["rougeL"]
except Exception:
rouge_l = 0.0
return float((bert_f1 + bleu_score + rouge_l) / 3)
def check_relevance(prompt, response):
try:
model = get_embed_model()
emb1 = model.encode(prompt, convert_to_tensor=True)
emb2 = model.encode(response, convert_to_tensor=True)
cos_sim = torch.nn.functional.cosine_similarity(emb1, emb2, dim=0)
return float(cos_sim.item())
except Exception:
return 0.0
def check_fluency(response):
try:
fluency_checker = get_fluency_checker()
result = fluency_checker(response)[0]
return float(result["score"]) if result["label"] == "LABEL_1" else 1.0 - float(result["score"])
except Exception:
return 0.5
# -----------------------------
# Row Evaluation
# -----------------------------
def evaluate_row(row):
scores = {
"instruction_following": check_instruction_following(row["prompt"], row["response"]),
"hallucination": check_hallucination(row["reference"], row["response"]),
"assumption": check_assumption(row["prompt"], row["response"]),
"coherence": check_coherence(row["response"]),
"accuracy": check_accuracy(row["reference"], row["response"]),
"relevance": check_relevance(row["prompt"], row["response"]),
"fluency": check_fluency(row["response"]),
}
scores["final_score"] = np.mean(list(scores.values()))
return pd.Series(scores)
# -----------------------------
# Visualization Helpers
# -----------------------------
def plot_radar_chart(metrics_df, out_path="/tmp/radar.png"):
import seaborn as sns
mean_scores = metrics_df.mean(numeric_only=True).drop("final_score", errors="ignore")
categories = list(mean_scores.index)
values = mean_scores.values.tolist()
values += values[:1]
categories += categories[:1]
angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
angles += angles[:1]
plt.figure(figsize=(6, 6))
ax = plt.subplot(111, polar=True)
ax.plot(angles, values, "o-", linewidth=2)
ax.fill(angles, values, alpha=0.25)
ax.set_thetagrids(np.degrees(angles[:-1]), categories)
plt.savefig(out_path)
plt.close()
return out_path, "Radar Chart (Mean Scores)"
def plot_leaderboard(metrics_df, out_path="/tmp/leaderboard.png"):
agent_means = metrics_df.groupby("agent")["final_score"].mean().sort_values(ascending=False)
plt.figure(figsize=(10, 5))
agent_means.plot(kind="bar", colormap="Set3", ax=plt.gca())
plt.title("Leaderboard: Avg Final Score per Agent")
plt.ylabel("Score")
plt.tight_layout()
plt.savefig(out_path)
plt.close()
return out_path, "Leaderboard"
# -----------------------------
# Main Evaluation Entry
# -----------------------------
def evaluate_dataframe(df: pd.DataFrame):
metrics_df = df.apply(evaluate_row, axis=1, result_type="expand")
metrics_df = pd.concat([df, metrics_df], axis=1)
leaderboard = (
metrics_df.groupby("agent")["final_score"]
.mean()
.reset_index()
.sort_values("final_score", ascending=False)
)
images = []
images.append(plot_radar_chart(metrics_df))
images.append(plot_leaderboard(metrics_df))
return metrics_df, images, leaderboard