evaluation-framework / evaluator.py
manayporwal07's picture
Update evaluator.py
ccf526b verified
raw
history blame
23.5 kB
#####################################################################################################################################################################
# import re
# import json
# import torch
# import pandas as pd
# import matplotlib.pyplot as plt
# import seaborn as sns
# import os
# import uuid
# from transformers import AutoTokenizer, AutoModelForSequenceClassification
# from sentence_transformers import SentenceTransformer, util
# import matplotlib.pyplot as plt
# import numpy as np
# def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"):
# """
# Radar chart comparing multiple agents across metrics.
# """
# labels = metrics
# num_vars = len(labels)
# # Compute angle for each axis
# angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
# angles += angles[:1] # close loop
# fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
# for agent in agents:
# values = []
# for m in metrics:
# mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean()
# values.append(mean_val if not np.isnan(mean_val) else 0)
# values += values[:1]
# ax.plot(angles, values, label=agent, linewidth=2)
# ax.fill(angles, values, alpha=0.25)
# ax.set_xticks(angles[:-1])
# ax.set_xticklabels(labels)
# ax.set_yticklabels([])
# ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1))
# ax.set_title("Agent Performance Radar Chart")
# plt.tight_layout()
# plt.savefig(out_path)
# plt.close()
# return out_path
# import seaborn as sns
# def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"):
# pivot = metrics_df.groupby("agent")[
# ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"]
# ].mean()
# plt.figure(figsize=(8, 5))
# sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f")
# plt.title("Agent Γ— Metric Heatmap")
# plt.tight_layout()
# plt.savefig(out_path)
# plt.close()
# return out_path
# # --------------------------
# # MODEL LOADING
# # --------------------------
# NLI_MODEL = "textattack/roberta-base-MNLI"
# EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# # Load NLI model & tokenizer
# nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL)
# nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL)
# nli_model.to("cpu")
# nli_model.eval()
# # Load embedding model
# embed_model = SentenceTransformer(EMBED_MODEL)
# # Label mapping from config
# id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()}
# # --------------------------
# # METRIC FUNCTIONS
# # --------------------------
# def check_instruction_following(prompt: str, response: str) -> float:
# """Embedding-based similarity between prompt and response."""
# if not prompt or not response:
# return 0.0
# p_emb = embed_model.encode(prompt, convert_to_tensor=True)
# r_emb = embed_model.encode(response, convert_to_tensor=True)
# sim = float(util.cos_sim(p_emb, r_emb).item())
# return round(max(0.0, min(1.0, sim)), 3)
# def check_hallucination(reference: str, response: str) -> float:
# """
# Single hallucination score:
# Entailment prob - Contradiction prob (normalized to [0,1]).
# Higher = less hallucination.
# """
# if not reference or not response:
# return 0.0
# with torch.no_grad():
# inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True)
# outputs = nli_model(**inputs)
# probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
# entail_prob, contra_prob = 0.0, 0.0
# for idx, p in enumerate(probs):
# label = id2label.get(idx, "")
# if "ENTAIL" in label:
# entail_prob = float(p)
# elif "CONTRA" in label:
# contra_prob = float(p)
# score = entail_prob - contra_prob
# score = (score + 1) / 2 # normalize [-1,1] β†’ [0,1]
# return round(max(0.0, min(1.0, score)), 3)
# def check_assumption(response: str) -> float:
# """Detect speculative/hedging terms."""
# if not response:
# return 0.0
# speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"]
# count = sum(1 for t in speculative_terms if t in response.lower())
# score = 1.0 - min(count / 5.0, 1.0) # smoother decay
# return round(score, 3)
# def check_coherence(response: str) -> float:
# """Heuristic coherence metric: penalizes very short/long, rewards sentence balance."""
# if not response:
# return 0.0
# words = len(re.findall(r"\w+", response))
# sents = max(1, len(re.split(r"[.!?]+", response)) - 1)
# if words < 5:
# return 0.3
# if words > 200:
# return 0.5
# base = min(1.0, (words / 50.0) + (sents / 5.0))
# return round(max(0.4, min(base, 0.95)), 3)
# def check_accuracy(reference: str, response: str) -> float:
# """Semantic similarity between reference and response via embeddings (cosine)."""
# if not reference or not response:
# return 0.0
# ref_emb = embed_model.encode(reference, convert_to_tensor=True)
# resp_emb = embed_model.encode(response, convert_to_tensor=True)
# sim = float(util.cos_sim(ref_emb, resp_emb).item())
# return round(max(0.0, min(1.0, sim)), 3)
# # --------------------------
# # ROW & DF EVALUATION
# # --------------------------
# def evaluate_row(row):
# prompt = row.get("prompt", "")
# response = row.get("response", "")
# reference = row.get("reference", "")
# metrics = {
# "task_id": row.get("task_id", ""),
# "agent": row.get("agent", ""),
# "instruction_following": check_instruction_following(prompt, response),
# "hallucination": check_hallucination(reference, response),
# "assumption": check_assumption(response),
# "coherence": check_coherence(response),
# "accuracy": check_accuracy(reference, response),
# }
# # Weighted avg score (you can adjust weights)
# metrics["final_score"] = round(
# 0.25 * metrics["instruction_following"]
# + 0.25 * metrics["accuracy"]
# + 0.2 * metrics["hallucination"]
# + 0.15 * metrics["coherence"]
# + 0.15 * metrics["assumption"],
# 3,
# )
# return metrics
# def evaluate_dataframe(df: pd.DataFrame):
# metrics_df = df.apply(evaluate_row, axis=1, result_type="expand")
# # Leaderboard
# leaderboard = (
# metrics_df.groupby(["agent", "task_id"])["final_score"]
# .mean()
# .reset_index()
# )
# # # Plots
# # images = []
# # Existing images list
# images = []
# # Add radar chart
# radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(),
#
###############################################################################################################################
import re
import json
import torch
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import uuid
import numpy as np
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForCausalLM,
pipeline
)
from sentence_transformers import SentenceTransformer, util
import evaluate
from sklearn.metrics import accuracy_score, f1_score
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')
# --------------------------
# MODEL LOADING
# --------------------------
NLI_MODEL = "microsoft/deberta-v2-xlarge-mnli"
EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2"
LLM_JUDGE_MODEL = "microsoft/DialoGPT-large" # Can be replaced with more powerful models
# Load NLI model & tokenizer
nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL)
nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL)
nli_model.to("cuda" if torch.cuda.is_available() else "cpu")
nli_model.eval()
# Load embedding model
embed_model = SentenceTransformer(EMBED_MODEL)
# Load LLM judge
judge_tokenizer = AutoTokenizer.from_pretrained(LLM_JUDGE_MODEL)
judge_model = AutoModelForCausalLM.from_pretrained(LLM_JUDGE_MODEL)
judge_model.to("cuda" if torch.cuda.is_available() else "cpu")
judge_model.eval()
# Load additional evaluation metrics
bertscore = evaluate.load("bertscore")
bleu = evaluate.load("bleu")
rouge = evaluate.load("rouge")
# Label mapping from config
id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()}
# --------------------------
# IMPROVED METRIC FUNCTIONS
# --------------------------
def check_instruction_following(prompt: str, response: str) -> float:
"""Improved instruction following using NLI and semantic similarity."""
if not prompt or not response:
return 0.0
# Method 1: NLI-based evaluation
with torch.no_grad():
inputs = nli_tokenizer.encode_plus(
prompt,
response,
return_tensors="pt",
truncation=True,
max_length=512
).to(nli_model.device)
outputs = nli_model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
entail_prob, neutral_prob = 0.0, 0.0
for idx, p in enumerate(probs):
label = id2label.get(idx, "")
if "ENTAIL" in label:
entail_prob = float(p)
elif "NEUTRAL" in label:
neutral_prob = float(p)
nli_score = entail_prob + (neutral_prob * 0.5)
# Method 2: Semantic similarity
p_emb = embed_model.encode(prompt, convert_to_tensor=True)
r_emb = embed_model.encode(response, convert_to_tensor=True)
sim_score = float(util.cos_sim(p_emb, r_emb).item())
# Combined score (weighted average)
final_score = 0.7 * nli_score + 0.3 * sim_score
return round(max(0.0, min(1.0, final_score)), 3)
def check_hallucination(reference: str, response: str) -> float:
"""Enhanced hallucination detection using multiple methods."""
if not reference or not response:
return 0.0
# Method 1: NLI-based contradiction detection
with torch.no_grad():
inputs = nli_tokenizer.encode_plus(
reference,
response,
return_tensors="pt",
truncation=True,
max_length=512
).to(nli_model.device)
outputs = nli_model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
contra_prob, neutral_prob = 0.0, 0.0
for idx, p in enumerate(probs):
label = id2label.get(idx, "")
if "CONTRA" in label:
contra_prob = float(p)
elif "NEUTRAL" in label:
neutral_prob = float(p)
nli_hallucination_score = contra_prob + (neutral_prob * 0.3)
# Method 2: Semantic similarity penalty
ref_emb = embed_model.encode(reference, convert_to_tensor=True)
resp_emb = embed_model.encode(response, convert_to_tensor=True)
semantic_sim = float(util.cos_sim(ref_emb, resp_emb).item())
# Combined score: Higher when less hallucination
hallucination_score = 1.0 - (0.7 * nli_hallucination_score + 0.3 * (1 - semantic_sim))
return round(max(0.0, min(1.0, hallucination_score)), 3)
def check_assumption(response: str) -> float:
"""Improved assumption detection using pattern matching and LLM judgment."""
if not response:
return 0.0
# Pattern-based detection
speculative_patterns = [
r"\b(maybe|perhaps|possibly|probably|might|could|would|should)\b",
r"\b(I think|I believe|I guess|I suppose|I assume)\b",
r"\b(it seems|it appears|it looks like)\b",
r"\b(likely|unlikely|presumably|arguably)\b",
r"\b(some|many|most|often|usually|generally|typically)\b"
]
pattern_count = sum(
len(re.findall(pattern, response.lower()))
for pattern in speculative_patterns
)
# Length normalization
word_count = len(response.split())
pattern_score = min(1.0, pattern_count / max(1, word_count / 5))
# LLM-based judgment
assumption_prompt = f"""
Determine if the following text contains assumptions, speculation, or hedging language.
Text: {response}
Answer with only 'yes' or 'no':
"""
with torch.no_grad():
inputs = judge_tokenizer.encode(assumption_prompt, return_tensors="pt")
outputs = judge_model.generate(
inputs,
max_length=len(inputs[0]) + 3,
pad_token_id=judge_tokenizer.eos_token_id
)
judgment = judge_tokenizer.decode(outputs[0], skip_special_tokens=True)
llm_score = 0.0 if "yes" in judgment.lower() else 1.0
# Combined score
final_score = 0.6 * (1 - pattern_score) + 0.4 * llm_score
return round(final_score, 3)
def check_coherence(response: str) -> float:
"""Enhanced coherence evaluation using multiple linguistic features."""
if not response:
return 0.0
# Feature 1: Sentence structure
sentences = re.split(r'[.!?]+', response)
sentences = [s.strip() for s in sentences if len(s.strip()) > 0]
num_sentences = len(sentences)
if num_sentences == 0:
return 0.0
# Feature 2: Sentence length variation
sent_lengths = [len(s.split()) for s in sentences]
length_variance = np.var(sent_lengths) if len(sent_lengths) > 1 else 0
length_score = 1.0 - min(1.0, length_variance / 100)
# Feature 3: Transition words
transition_words = [
'however', 'therefore', 'moreover', 'furthermore', 'consequently',
'additionally', 'likewise', 'similarly', 'nevertheless', 'nonetheless'
]
transition_count = sum(1 for word in transition_words
if word in response.lower())
transition_score = min(1.0, transition_count / 3)
# Feature 4: Repetition penalty
words = response.lower().split()
unique_words = set(words)
repetition_ratio = len(unique_words) / max(1, len(words))
# Combined score
coherence_score = (
0.3 * min(1.0, num_sentences / 5) +
0.2 * length_score +
0.3 * transition_score +
0.2 * repetition_ratio
)
return round(max(0.0, min(1.0, coherence_score)), 3)
def check_accuracy(reference: str, response: str) -> float:
"""Enhanced accuracy evaluation using multiple metrics."""
if not reference or not response:
return 0.0
# BERTScore
bert_results = bertscore.compute(
predictions=[response],
references=[reference],
lang="en",
model_type=EMBED_MODEL
)
bert_f1 = bert_results['f1'][0]
# ROUGE-L
rouge_results = rouge.compute(
predictions=[response],
references=[reference],
use_stemmer=True
)
rouge_l = rouge_results['rougeL']
# BLEU (for shorter responses)
try:
bleu_results = bleu.compute(
predictions=[response.split()],
references=[[reference.split()]]
)
bleu_score = bleu_results['bleu']
except:
bleu_score = 0.0
# Semantic similarity
ref_emb = embed_model.encode(reference, convert_to_tensor=True)
resp_emb = embed_model.encode(response, convert_to_tensor=True)
semantic_sim = float(util.cos_sim(ref_emb, resp_emb).item())
# Combined score (weighted average)
accuracy_score = (
0.4 * bert_f1 +
0.3 * rouge_l +
0.1 * bleu_score +
0.2 * semantic_sim
)
return round(max(0.0, min(1.0, accuracy_score)), 3)
def check_relevance(prompt: str, response: str) -> float:
"""Check how relevant the response is to the prompt."""
if not prompt or not response:
return 0.0
# Encode both prompt and response
p_emb = embed_model.encode(prompt, convert_to_tensor=True)
r_emb = embed_model.encode(response, convert_to_tensor=True)
# Calculate cosine similarity
similarity = float(util.cos_sim(p_emb, r_emb).item())
return round(max(0.0, min(1.0, similarity)), 3)
def check_fluency(response: str) -> float:
"""Check the fluency of the response using perplexity-based approach."""
if not response:
return 0.0
# Load a fluency model (perplexity-based)
fluency_checker = pipeline(
"text-classification",
model="textattack/roberta-base-CoLA",
device=0 if torch.cuda.is_available() else -1
)
try:
# Split into sentences if too long
sentences = re.split(r'[.!?]+', response)
sentences = [s.strip() for s in sentences if len(s.strip()) > 5]
if not sentences:
return 0.5
# Check each sentence
fluency_scores = []
for sent in sentences[:3]: # Limit to first 3 sentences
result = fluency_checker(sent[:512]) # Truncate if too long
score = result[0]['score'] if result[0]['label'] == 'LABEL_1' else 1 - result[0]['score']
fluency_scores.append(score)
avg_fluency = sum(fluency_scores) / len(fluency_scores)
return round(avg_fluency, 3)
except:
# Fallback to simple heuristic
words = response.split()
if len(words) < 3:
return 0.3
return 0.7
# --------------------------
# ROW & DF EVALUATION
# --------------------------
def evaluate_row(row):
prompt = row.get("prompt", "")
response = row.get("response", "")
reference = row.get("reference", "")
metrics = {
"task_id": row.get("task_id", ""),
"agent": row.get("agent", ""),
"instruction_following": check_instruction_following(prompt, response),
"hallucination": check_hallucination(reference, response),
"assumption": check_assumption(response),
"coherence": check_coherence(response),
"accuracy": check_accuracy(reference, response),
"relevance": check_relevance(prompt, response),
"fluency": check_fluency(response),
}
# Weighted avg score (adjust weights as needed)
metrics["final_score"] = round(
0.20 * metrics["instruction_following"] +
0.20 * metrics["accuracy"] +
0.15 * metrics["hallucination"] +
0.10 * metrics["coherence"] +
0.10 * metrics["assumption"] +
0.15 * metrics["relevance"] +
0.10 * metrics["fluency"],
3,
)
return metrics
# --------------------------
# VISUALIZATION FUNCTIONS
# --------------------------
def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"):
"""Radar chart comparing multiple agents across metrics."""
labels = metrics
num_vars = len(labels)
# Compute angle for each axis
angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
angles += angles[:1] # close loop
fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True))
for agent in agents:
values = []
for m in metrics:
mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean()
values.append(mean_val if not np.isnan(mean_val) else 0)
values += values[:1]
ax.plot(angles, values, label=agent, linewidth=2)
ax.fill(angles, values, alpha=0.25)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels)
ax.set_yticklabels([])
ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1))
ax.set_title("Agent Performance Radar Chart")
plt.tight_layout()
plt.savefig(out_path)
plt.close()
return out_path
def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"):
"""Heatmap of agent performance across metrics."""
metrics = ["accuracy", "hallucination", "instruction_following",
"coherence", "assumption", "relevance", "fluency"]
pivot = metrics_df.groupby("agent")[metrics].mean()
plt.figure(figsize=(10, 6))
sns.heatmap(pivot, annot=True, cmap="YlGnBu", fmt=".3f", center=0.5)
plt.title("Agent Γ— Metric Heatmap")
plt.tight_layout()
plt.savefig(out_path)
plt.close()
return out_path
def plot_score_distribution(metrics_df, out_path="/tmp/distribution.png"):
"""Distribution of final scores by agent."""
plt.figure(figsize=(10, 6))
agents = metrics_df['agent'].unique()
for agent in agents:
agent_scores = metrics_df[metrics_df['agent'] == agent]['final_score']
sns.kdeplot(agent_scores, label=agent, fill=True, alpha=0.3)
plt.xlabel('Final Score')
plt.ylabel('Density')
plt.title('Distribution of Final Scores by Agent')
plt.legend()
plt.tight_layout()
plt.savefig(out_path)
plt.close()
return out_path
def plot_metric_correlation(metrics_df, out_path="/tmp/correlation.png"):
"""Correlation matrix between different metrics."""
metrics = ["accuracy", "hallucination", "instruction_following",
"coherence", "assumption", "relevance", "fluency", "final_score"]
plt.figure(figsize=(10, 8))
correlation_matrix = metrics_df[metrics].corr()
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", center=0,
fmt=".2f", square=True)
plt.title('Correlation Between Metrics')
plt.tight_layout()
plt.savefig(out_path)
plt.close()
return out_path
def plot_agent_comparison(metrics_df, out_path="/tmp/agent_comparison.png"):
"""Bar chart comparing agent performance across metrics."""
metrics = ["accuracy", "hallucination", "instruction_following",
"coherence", "assumption", "relevance", "fluency"]
agent_means = metrics_df.groupby('agent')[metrics].mean()
plt.figure(figsize=(12, 6))
agent_means.plot(kind='bar', colormap='Set3')
plt.title('Agent Performance Across Metrics')
plt.xlabel('Agent')
plt.ylabel('Score')
plt.xticks(rotation=45)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(out_path)
plt.close()
return out_path
# --------------------------
# MAIN EVALUATION FUNCTION
# --------------------------
def evaluate_dataframe(df: pd.DataFrame):
"""Evaluate a dataframe of agent responses."""
metrics_df = df.apply(evaluate_row, axis=1, result_type='expand')
# Leaderboard
leaderboard = (
metrics_df.groupby(["agent", "task_id"])["final_score"]
.mean()
.reset_index()
)
# Generate visualizations
images = []
# Add all visualizations
agents = df["agent"].unique()
metrics = ["accuracy", "hallucination", "instruction_following",
"coherence", "assumption", "relevance", "fluency"]
radar_path = plot_radar_chart(metrics_df, agents, metrics)
images.append((radar_path, "Radar Chart: Agent vs Metrics"))
heatmap_path = plot_heatmap(metrics_df)
images.append((heatmap_path, "Heatmap: Agent vs Metrics"))
distribution_path = plot_score_distribution(metrics_df)
images.append((distribution_path, "Score Distribution by Agent"))
correlation_path = plot_metric_correlation(metrics_df)
images.append((correlation_path, "Metric Correlation Matrix"))
agent_comparison_path = plot_agent_comparison(metrics_df)
images.append((agent_comparison_path, "Agent Comparison Chart"))
return metrics_df, images, leaderboard