##################################################################################################################################################################### # import re # import json # import torch # import pandas as pd # import matplotlib.pyplot as plt # import seaborn as sns # import os # import uuid # from transformers import AutoTokenizer, AutoModelForSequenceClassification # from sentence_transformers import SentenceTransformer, util # import matplotlib.pyplot as plt # import numpy as np # def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): # """ # Radar chart comparing multiple agents across metrics. # """ # labels = metrics # num_vars = len(labels) # # Compute angle for each axis # angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() # angles += angles[:1] # close loop # fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True)) # for agent in agents: # values = [] # for m in metrics: # mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() # values.append(mean_val if not np.isnan(mean_val) else 0) # values += values[:1] # ax.plot(angles, values, label=agent, linewidth=2) # ax.fill(angles, values, alpha=0.25) # ax.set_xticks(angles[:-1]) # ax.set_xticklabels(labels) # ax.set_yticklabels([]) # ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) # ax.set_title("Agent Performance Radar Chart") # plt.tight_layout() # plt.savefig(out_path) # plt.close() # return out_path # import seaborn as sns # def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): # pivot = metrics_df.groupby("agent")[ # ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"] # ].mean() # plt.figure(figsize=(8, 5)) # sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f") # plt.title("Agent × Metric Heatmap") # plt.tight_layout() # plt.savefig(out_path) # plt.close() # return out_path # # -------------------------- # # MODEL LOADING # # -------------------------- # NLI_MODEL = "textattack/roberta-base-MNLI" # EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # # Load NLI model & tokenizer # nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) # nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) # nli_model.to("cpu") # nli_model.eval() # # Load embedding model # embed_model = SentenceTransformer(EMBED_MODEL) # # Label mapping from config # id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} # # -------------------------- # # METRIC FUNCTIONS # # -------------------------- # def check_instruction_following(prompt: str, response: str) -> float: # """Embedding-based similarity between prompt and response.""" # if not prompt or not response: # return 0.0 # p_emb = embed_model.encode(prompt, convert_to_tensor=True) # r_emb = embed_model.encode(response, convert_to_tensor=True) # sim = float(util.cos_sim(p_emb, r_emb).item()) # return round(max(0.0, min(1.0, sim)), 3) # def check_hallucination(reference: str, response: str) -> float: # """ # Single hallucination score: # Entailment prob - Contradiction prob (normalized to [0,1]). # Higher = less hallucination. # """ # if not reference or not response: # return 0.0 # with torch.no_grad(): # inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True) # outputs = nli_model(**inputs) # probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] # entail_prob, contra_prob = 0.0, 0.0 # for idx, p in enumerate(probs): # label = id2label.get(idx, "") # if "ENTAIL" in label: # entail_prob = float(p) # elif "CONTRA" in label: # contra_prob = float(p) # score = entail_prob - contra_prob # score = (score + 1) / 2 # normalize [-1,1] → [0,1] # return round(max(0.0, min(1.0, score)), 3) # def check_assumption(response: str) -> float: # """Detect speculative/hedging terms.""" # if not response: # return 0.0 # speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"] # count = sum(1 for t in speculative_terms if t in response.lower()) # score = 1.0 - min(count / 5.0, 1.0) # smoother decay # return round(score, 3) # def check_coherence(response: str) -> float: # """Heuristic coherence metric: penalizes very short/long, rewards sentence balance.""" # if not response: # return 0.0 # words = len(re.findall(r"\w+", response)) # sents = max(1, len(re.split(r"[.!?]+", response)) - 1) # if words < 5: # return 0.3 # if words > 200: # return 0.5 # base = min(1.0, (words / 50.0) + (sents / 5.0)) # return round(max(0.4, min(base, 0.95)), 3) # def check_accuracy(reference: str, response: str) -> float: # """Semantic similarity between reference and response via embeddings (cosine).""" # if not reference or not response: # return 0.0 # ref_emb = embed_model.encode(reference, convert_to_tensor=True) # resp_emb = embed_model.encode(response, convert_to_tensor=True) # sim = float(util.cos_sim(ref_emb, resp_emb).item()) # return round(max(0.0, min(1.0, sim)), 3) # # -------------------------- # # ROW & DF EVALUATION # # -------------------------- # def evaluate_row(row): # prompt = row.get("prompt", "") # response = row.get("response", "") # reference = row.get("reference", "") # metrics = { # "task_id": row.get("task_id", ""), # "agent": row.get("agent", ""), # "instruction_following": check_instruction_following(prompt, response), # "hallucination": check_hallucination(reference, response), # "assumption": check_assumption(response), # "coherence": check_coherence(response), # "accuracy": check_accuracy(reference, response), # } # # Weighted avg score (you can adjust weights) # metrics["final_score"] = round( # 0.25 * metrics["instruction_following"] # + 0.25 * metrics["accuracy"] # + 0.2 * metrics["hallucination"] # + 0.15 * metrics["coherence"] # + 0.15 * metrics["assumption"], # 3, # ) # return metrics # def evaluate_dataframe(df: pd.DataFrame): # metrics_df = df.apply(evaluate_row, axis=1, result_type="expand") # # Leaderboard # leaderboard = ( # metrics_df.groupby(["agent", "task_id"])["final_score"] # .mean() # .reset_index() # ) # # # Plots # # images = [] # # Existing images list # images = [] # # Add radar chart # radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(), # ############################################################################################################################### import re import json import torch import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import os import uuid import numpy as np from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, pipeline ) from sentence_transformers import SentenceTransformer, util import evaluate from sklearn.metrics import accuracy_score, f1_score from collections import defaultdict import warnings warnings.filterwarnings('ignore') # -------------------------- # MODEL LOADING # -------------------------- NLI_MODEL = "microsoft/deberta-v2-xlarge-mnli" EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" LLM_JUDGE_MODEL = "microsoft/DialoGPT-large" # Can be replaced with more powerful models # Load NLI model & tokenizer nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) nli_model.to("cuda" if torch.cuda.is_available() else "cpu") nli_model.eval() # Load embedding model embed_model = SentenceTransformer(EMBED_MODEL) # Load LLM judge judge_tokenizer = AutoTokenizer.from_pretrained(LLM_JUDGE_MODEL) judge_model = AutoModelForCausalLM.from_pretrained(LLM_JUDGE_MODEL) judge_model.to("cuda" if torch.cuda.is_available() else "cpu") judge_model.eval() # Load additional evaluation metrics bertscore = evaluate.load("bertscore") bleu = evaluate.load("bleu") rouge = evaluate.load("rouge") # Label mapping from config id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} # -------------------------- # IMPROVED METRIC FUNCTIONS # -------------------------- def check_instruction_following(prompt: str, response: str) -> float: """Improved instruction following using NLI and semantic similarity.""" if not prompt or not response: return 0.0 # Method 1: NLI-based evaluation with torch.no_grad(): inputs = nli_tokenizer.encode_plus( prompt, response, return_tensors="pt", truncation=True, max_length=512 ).to(nli_model.device) outputs = nli_model(**inputs) probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] entail_prob, neutral_prob = 0.0, 0.0 for idx, p in enumerate(probs): label = id2label.get(idx, "") if "ENTAIL" in label: entail_prob = float(p) elif "NEUTRAL" in label: neutral_prob = float(p) nli_score = entail_prob + (neutral_prob * 0.5) # Method 2: Semantic similarity p_emb = embed_model.encode(prompt, convert_to_tensor=True) r_emb = embed_model.encode(response, convert_to_tensor=True) sim_score = float(util.cos_sim(p_emb, r_emb).item()) # Combined score (weighted average) final_score = 0.7 * nli_score + 0.3 * sim_score return round(max(0.0, min(1.0, final_score)), 3) def check_hallucination(reference: str, response: str) -> float: """Enhanced hallucination detection using multiple methods.""" if not reference or not response: return 0.0 # Method 1: NLI-based contradiction detection with torch.no_grad(): inputs = nli_tokenizer.encode_plus( reference, response, return_tensors="pt", truncation=True, max_length=512 ).to(nli_model.device) outputs = nli_model(**inputs) probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] contra_prob, neutral_prob = 0.0, 0.0 for idx, p in enumerate(probs): label = id2label.get(idx, "") if "CONTRA" in label: contra_prob = float(p) elif "NEUTRAL" in label: neutral_prob = float(p) nli_hallucination_score = contra_prob + (neutral_prob * 0.3) # Method 2: Semantic similarity penalty ref_emb = embed_model.encode(reference, convert_to_tensor=True) resp_emb = embed_model.encode(response, convert_to_tensor=True) semantic_sim = float(util.cos_sim(ref_emb, resp_emb).item()) # Combined score: Higher when less hallucination hallucination_score = 1.0 - (0.7 * nli_hallucination_score + 0.3 * (1 - semantic_sim)) return round(max(0.0, min(1.0, hallucination_score)), 3) def check_assumption(response: str) -> float: """Improved assumption detection using pattern matching and LLM judgment.""" if not response: return 0.0 # Pattern-based detection speculative_patterns = [ r"\b(maybe|perhaps|possibly|probably|might|could|would|should)\b", r"\b(I think|I believe|I guess|I suppose|I assume)\b", r"\b(it seems|it appears|it looks like)\b", r"\b(likely|unlikely|presumably|arguably)\b", r"\b(some|many|most|often|usually|generally|typically)\b" ] pattern_count = sum( len(re.findall(pattern, response.lower())) for pattern in speculative_patterns ) # Length normalization word_count = len(response.split()) pattern_score = min(1.0, pattern_count / max(1, word_count / 5)) # LLM-based judgment assumption_prompt = f""" Determine if the following text contains assumptions, speculation, or hedging language. Text: {response} Answer with only 'yes' or 'no': """ with torch.no_grad(): inputs = judge_tokenizer.encode(assumption_prompt, return_tensors="pt") outputs = judge_model.generate( inputs, max_length=len(inputs[0]) + 3, pad_token_id=judge_tokenizer.eos_token_id ) judgment = judge_tokenizer.decode(outputs[0], skip_special_tokens=True) llm_score = 0.0 if "yes" in judgment.lower() else 1.0 # Combined score final_score = 0.6 * (1 - pattern_score) + 0.4 * llm_score return round(final_score, 3) def check_coherence(response: str) -> float: """Enhanced coherence evaluation using multiple linguistic features.""" if not response: return 0.0 # Feature 1: Sentence structure sentences = re.split(r'[.!?]+', response) sentences = [s.strip() for s in sentences if len(s.strip()) > 0] num_sentences = len(sentences) if num_sentences == 0: return 0.0 # Feature 2: Sentence length variation sent_lengths = [len(s.split()) for s in sentences] length_variance = np.var(sent_lengths) if len(sent_lengths) > 1 else 0 length_score = 1.0 - min(1.0, length_variance / 100) # Feature 3: Transition words transition_words = [ 'however', 'therefore', 'moreover', 'furthermore', 'consequently', 'additionally', 'likewise', 'similarly', 'nevertheless', 'nonetheless' ] transition_count = sum(1 for word in transition_words if word in response.lower()) transition_score = min(1.0, transition_count / 3) # Feature 4: Repetition penalty words = response.lower().split() unique_words = set(words) repetition_ratio = len(unique_words) / max(1, len(words)) # Combined score coherence_score = ( 0.3 * min(1.0, num_sentences / 5) + 0.2 * length_score + 0.3 * transition_score + 0.2 * repetition_ratio ) return round(max(0.0, min(1.0, coherence_score)), 3) def check_accuracy(reference: str, response: str) -> float: """Enhanced accuracy evaluation using multiple metrics.""" if not reference or not response: return 0.0 # BERTScore bert_results = bertscore.compute( predictions=[response], references=[reference], lang="en", model_type=EMBED_MODEL ) bert_f1 = bert_results['f1'][0] # ROUGE-L rouge_results = rouge.compute( predictions=[response], references=[reference], use_stemmer=True ) rouge_l = rouge_results['rougeL'] # BLEU (for shorter responses) try: bleu_results = bleu.compute( predictions=[response.split()], references=[[reference.split()]] ) bleu_score = bleu_results['bleu'] except: bleu_score = 0.0 # Semantic similarity ref_emb = embed_model.encode(reference, convert_to_tensor=True) resp_emb = embed_model.encode(response, convert_to_tensor=True) semantic_sim = float(util.cos_sim(ref_emb, resp_emb).item()) # Combined score (weighted average) accuracy_score = ( 0.4 * bert_f1 + 0.3 * rouge_l + 0.1 * bleu_score + 0.2 * semantic_sim ) return round(max(0.0, min(1.0, accuracy_score)), 3) def check_relevance(prompt: str, response: str) -> float: """Check how relevant the response is to the prompt.""" if not prompt or not response: return 0.0 # Encode both prompt and response p_emb = embed_model.encode(prompt, convert_to_tensor=True) r_emb = embed_model.encode(response, convert_to_tensor=True) # Calculate cosine similarity similarity = float(util.cos_sim(p_emb, r_emb).item()) return round(max(0.0, min(1.0, similarity)), 3) def check_fluency(response: str) -> float: """Check the fluency of the response using perplexity-based approach.""" if not response: return 0.0 # Load a fluency model (perplexity-based) fluency_checker = pipeline( "text-classification", model="textattack/roberta-base-CoLA", device=0 if torch.cuda.is_available() else -1 ) try: # Split into sentences if too long sentences = re.split(r'[.!?]+', response) sentences = [s.strip() for s in sentences if len(s.strip()) > 5] if not sentences: return 0.5 # Check each sentence fluency_scores = [] for sent in sentences[:3]: # Limit to first 3 sentences result = fluency_checker(sent[:512]) # Truncate if too long score = result[0]['score'] if result[0]['label'] == 'LABEL_1' else 1 - result[0]['score'] fluency_scores.append(score) avg_fluency = sum(fluency_scores) / len(fluency_scores) return round(avg_fluency, 3) except: # Fallback to simple heuristic words = response.split() if len(words) < 3: return 0.3 return 0.7 # -------------------------- # ROW & DF EVALUATION # -------------------------- def evaluate_row(row): prompt = row.get("prompt", "") response = row.get("response", "") reference = row.get("reference", "") metrics = { "task_id": row.get("task_id", ""), "agent": row.get("agent", ""), "instruction_following": check_instruction_following(prompt, response), "hallucination": check_hallucination(reference, response), "assumption": check_assumption(response), "coherence": check_coherence(response), "accuracy": check_accuracy(reference, response), "relevance": check_relevance(prompt, response), "fluency": check_fluency(response), } # Weighted avg score (adjust weights as needed) metrics["final_score"] = round( 0.20 * metrics["instruction_following"] + 0.20 * metrics["accuracy"] + 0.15 * metrics["hallucination"] + 0.10 * metrics["coherence"] + 0.10 * metrics["assumption"] + 0.15 * metrics["relevance"] + 0.10 * metrics["fluency"], 3, ) return metrics # -------------------------- # VISUALIZATION FUNCTIONS # -------------------------- def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): """Radar chart comparing multiple agents across metrics.""" labels = metrics num_vars = len(labels) # Compute angle for each axis angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() angles += angles[:1] # close loop fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True)) for agent in agents: values = [] for m in metrics: mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() values.append(mean_val if not np.isnan(mean_val) else 0) values += values[:1] ax.plot(angles, values, label=agent, linewidth=2) ax.fill(angles, values, alpha=0.25) ax.set_xticks(angles[:-1]) ax.set_xticklabels(labels) ax.set_yticklabels([]) ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) ax.set_title("Agent Performance Radar Chart") plt.tight_layout() plt.savefig(out_path) plt.close() return out_path def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): """Heatmap of agent performance across metrics.""" metrics = ["accuracy", "hallucination", "instruction_following", "coherence", "assumption", "relevance", "fluency"] pivot = metrics_df.groupby("agent")[metrics].mean() plt.figure(figsize=(10, 6)) sns.heatmap(pivot, annot=True, cmap="YlGnBu", fmt=".3f", center=0.5) plt.title("Agent × Metric Heatmap") plt.tight_layout() plt.savefig(out_path) plt.close() return out_path def plot_score_distribution(metrics_df, out_path="/tmp/distribution.png"): """Distribution of final scores by agent.""" plt.figure(figsize=(10, 6)) agents = metrics_df['agent'].unique() for agent in agents: agent_scores = metrics_df[metrics_df['agent'] == agent]['final_score'] sns.kdeplot(agent_scores, label=agent, fill=True, alpha=0.3) plt.xlabel('Final Score') plt.ylabel('Density') plt.title('Distribution of Final Scores by Agent') plt.legend() plt.tight_layout() plt.savefig(out_path) plt.close() return out_path def plot_metric_correlation(metrics_df, out_path="/tmp/correlation.png"): """Correlation matrix between different metrics.""" metrics = ["accuracy", "hallucination", "instruction_following", "coherence", "assumption", "relevance", "fluency", "final_score"] plt.figure(figsize=(10, 8)) correlation_matrix = metrics_df[metrics].corr() sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", center=0, fmt=".2f", square=True) plt.title('Correlation Between Metrics') plt.tight_layout() plt.savefig(out_path) plt.close() return out_path def plot_agent_comparison(metrics_df, out_path="/tmp/agent_comparison.png"): """Bar chart comparing agent performance across metrics.""" metrics = ["accuracy", "hallucination", "instruction_following", "coherence", "assumption", "relevance", "fluency"] agent_means = metrics_df.groupby('agent')[metrics].mean() plt.figure(figsize=(12, 6)) agent_means.plot(kind='bar', colormap='Set3') plt.title('Agent Performance Across Metrics') plt.xlabel('Agent') plt.ylabel('Score') plt.xticks(rotation=45) plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') plt.tight_layout() plt.savefig(out_path) plt.close() return out_path # -------------------------- # MAIN EVALUATION FUNCTION # -------------------------- def evaluate_dataframe(df: pd.DataFrame): """Evaluate a dataframe of agent responses.""" metrics_df = df.apply(evaluate_row, axis=1, result_type='expand') # Leaderboard leaderboard = ( metrics_df.groupby(["agent", "task_id"])["final_score"] .mean() .reset_index() ) # Generate visualizations images = [] # Add all visualizations agents = df["agent"].unique() metrics = ["accuracy", "hallucination", "instruction_following", "coherence", "assumption", "relevance", "fluency"] radar_path = plot_radar_chart(metrics_df, agents, metrics) images.append((radar_path, "Radar Chart: Agent vs Metrics")) heatmap_path = plot_heatmap(metrics_df) images.append((heatmap_path, "Heatmap: Agent vs Metrics")) distribution_path = plot_score_distribution(metrics_df) images.append((distribution_path, "Score Distribution by Agent")) correlation_path = plot_metric_correlation(metrics_df) images.append((correlation_path, "Metric Correlation Matrix")) agent_comparison_path = plot_agent_comparison(metrics_df) images.append((agent_comparison_path, "Agent Comparison Chart")) return metrics_df, images, leaderboard