Spaces:
Runtime error
Runtime error
| ##################################################################################################################################################################### | |
| # import re | |
| # import json | |
| # import torch | |
| # import pandas as pd | |
| # import matplotlib.pyplot as plt | |
| # import seaborn as sns | |
| # import os | |
| # import uuid | |
| # from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| # from sentence_transformers import SentenceTransformer, util | |
| # import matplotlib.pyplot as plt | |
| # import numpy as np | |
| # def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): | |
| # """ | |
| # Radar chart comparing multiple agents across metrics. | |
| # """ | |
| # labels = metrics | |
| # num_vars = len(labels) | |
| # # Compute angle for each axis | |
| # angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() | |
| # angles += angles[:1] # close loop | |
| # fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True)) | |
| # for agent in agents: | |
| # values = [] | |
| # for m in metrics: | |
| # mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() | |
| # values.append(mean_val if not np.isnan(mean_val) else 0) | |
| # values += values[:1] | |
| # ax.plot(angles, values, label=agent, linewidth=2) | |
| # ax.fill(angles, values, alpha=0.25) | |
| # ax.set_xticks(angles[:-1]) | |
| # ax.set_xticklabels(labels) | |
| # ax.set_yticklabels([]) | |
| # ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) | |
| # ax.set_title("Agent Performance Radar Chart") | |
| # plt.tight_layout() | |
| # plt.savefig(out_path) | |
| # plt.close() | |
| # return out_path | |
| # import seaborn as sns | |
| # def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): | |
| # pivot = metrics_df.groupby("agent")[ | |
| # ["accuracy", "hallucination", "instruction_following", "coherence", "assumption"] | |
| # ].mean() | |
| # plt.figure(figsize=(8, 5)) | |
| # sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".2f") | |
| # plt.title("Agent Γ Metric Heatmap") | |
| # plt.tight_layout() | |
| # plt.savefig(out_path) | |
| # plt.close() | |
| # return out_path | |
| # # -------------------------- | |
| # # MODEL LOADING | |
| # # -------------------------- | |
| # NLI_MODEL = "textattack/roberta-base-MNLI" | |
| # EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| # # Load NLI model & tokenizer | |
| # nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) | |
| # nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) | |
| # nli_model.to("cpu") | |
| # nli_model.eval() | |
| # # Load embedding model | |
| # embed_model = SentenceTransformer(EMBED_MODEL) | |
| # # Label mapping from config | |
| # id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} | |
| # # -------------------------- | |
| # # METRIC FUNCTIONS | |
| # # -------------------------- | |
| # def check_instruction_following(prompt: str, response: str) -> float: | |
| # """Embedding-based similarity between prompt and response.""" | |
| # if not prompt or not response: | |
| # return 0.0 | |
| # p_emb = embed_model.encode(prompt, convert_to_tensor=True) | |
| # r_emb = embed_model.encode(response, convert_to_tensor=True) | |
| # sim = float(util.cos_sim(p_emb, r_emb).item()) | |
| # return round(max(0.0, min(1.0, sim)), 3) | |
| # def check_hallucination(reference: str, response: str) -> float: | |
| # """ | |
| # Single hallucination score: | |
| # Entailment prob - Contradiction prob (normalized to [0,1]). | |
| # Higher = less hallucination. | |
| # """ | |
| # if not reference or not response: | |
| # return 0.0 | |
| # with torch.no_grad(): | |
| # inputs = nli_tokenizer.encode_plus(reference, response, return_tensors="pt", truncation=True) | |
| # outputs = nli_model(**inputs) | |
| # probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] | |
| # entail_prob, contra_prob = 0.0, 0.0 | |
| # for idx, p in enumerate(probs): | |
| # label = id2label.get(idx, "") | |
| # if "ENTAIL" in label: | |
| # entail_prob = float(p) | |
| # elif "CONTRA" in label: | |
| # contra_prob = float(p) | |
| # score = entail_prob - contra_prob | |
| # score = (score + 1) / 2 # normalize [-1,1] β [0,1] | |
| # return round(max(0.0, min(1.0, score)), 3) | |
| # def check_assumption(response: str) -> float: | |
| # """Detect speculative/hedging terms.""" | |
| # if not response: | |
| # return 0.0 | |
| # speculative_terms = ["maybe", "probably", "might", "perhaps", "i guess", "seems", "could"] | |
| # count = sum(1 for t in speculative_terms if t in response.lower()) | |
| # score = 1.0 - min(count / 5.0, 1.0) # smoother decay | |
| # return round(score, 3) | |
| # def check_coherence(response: str) -> float: | |
| # """Heuristic coherence metric: penalizes very short/long, rewards sentence balance.""" | |
| # if not response: | |
| # return 0.0 | |
| # words = len(re.findall(r"\w+", response)) | |
| # sents = max(1, len(re.split(r"[.!?]+", response)) - 1) | |
| # if words < 5: | |
| # return 0.3 | |
| # if words > 200: | |
| # return 0.5 | |
| # base = min(1.0, (words / 50.0) + (sents / 5.0)) | |
| # return round(max(0.4, min(base, 0.95)), 3) | |
| # def check_accuracy(reference: str, response: str) -> float: | |
| # """Semantic similarity between reference and response via embeddings (cosine).""" | |
| # if not reference or not response: | |
| # return 0.0 | |
| # ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| # resp_emb = embed_model.encode(response, convert_to_tensor=True) | |
| # sim = float(util.cos_sim(ref_emb, resp_emb).item()) | |
| # return round(max(0.0, min(1.0, sim)), 3) | |
| # # -------------------------- | |
| # # ROW & DF EVALUATION | |
| # # -------------------------- | |
| # def evaluate_row(row): | |
| # prompt = row.get("prompt", "") | |
| # response = row.get("response", "") | |
| # reference = row.get("reference", "") | |
| # metrics = { | |
| # "task_id": row.get("task_id", ""), | |
| # "agent": row.get("agent", ""), | |
| # "instruction_following": check_instruction_following(prompt, response), | |
| # "hallucination": check_hallucination(reference, response), | |
| # "assumption": check_assumption(response), | |
| # "coherence": check_coherence(response), | |
| # "accuracy": check_accuracy(reference, response), | |
| # } | |
| # # Weighted avg score (you can adjust weights) | |
| # metrics["final_score"] = round( | |
| # 0.25 * metrics["instruction_following"] | |
| # + 0.25 * metrics["accuracy"] | |
| # + 0.2 * metrics["hallucination"] | |
| # + 0.15 * metrics["coherence"] | |
| # + 0.15 * metrics["assumption"], | |
| # 3, | |
| # ) | |
| # return metrics | |
| # def evaluate_dataframe(df: pd.DataFrame): | |
| # metrics_df = df.apply(evaluate_row, axis=1, result_type="expand") | |
| # # Leaderboard | |
| # leaderboard = ( | |
| # metrics_df.groupby(["agent", "task_id"])["final_score"] | |
| # .mean() | |
| # .reset_index() | |
| # ) | |
| # # # Plots | |
| # # images = [] | |
| # # Existing images list | |
| # images = [] | |
| # # Add radar chart | |
| # radar_path = plot_radar_chart(metrics_df, agents=df["agent"].unique(), | |
| # | |
| ############################################################################################################################### | |
| import re | |
| import json | |
| import torch | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import os | |
| import uuid | |
| import numpy as np | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| AutoModelForCausalLM, | |
| pipeline | |
| ) | |
| from sentence_transformers import SentenceTransformer, util | |
| import evaluate | |
| from sklearn.metrics import accuracy_score, f1_score | |
| from collections import defaultdict | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # -------------------------- | |
| # MODEL LOADING | |
| # -------------------------- | |
| NLI_MODEL = "microsoft/deberta-v2-xlarge-mnli" | |
| EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
| LLM_JUDGE_MODEL = "microsoft/DialoGPT-large" # Can be replaced with more powerful models | |
| # Load NLI model & tokenizer | |
| nli_tokenizer = AutoTokenizer.from_pretrained(NLI_MODEL) | |
| nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_MODEL) | |
| nli_model.to("cuda" if torch.cuda.is_available() else "cpu") | |
| nli_model.eval() | |
| # Load embedding model | |
| embed_model = SentenceTransformer(EMBED_MODEL) | |
| # Load LLM judge | |
| judge_tokenizer = AutoTokenizer.from_pretrained(LLM_JUDGE_MODEL) | |
| judge_model = AutoModelForCausalLM.from_pretrained(LLM_JUDGE_MODEL) | |
| judge_model.to("cuda" if torch.cuda.is_available() else "cpu") | |
| judge_model.eval() | |
| # Load additional evaluation metrics | |
| bertscore = evaluate.load("bertscore") | |
| bleu = evaluate.load("bleu") | |
| rouge = evaluate.load("rouge") | |
| # Label mapping from config | |
| id2label = {int(k): v.upper() for k, v in nli_model.config.id2label.items()} | |
| # -------------------------- | |
| # IMPROVED METRIC FUNCTIONS | |
| # -------------------------- | |
| def check_instruction_following(prompt: str, response: str) -> float: | |
| """Improved instruction following using NLI and semantic similarity.""" | |
| if not prompt or not response: | |
| return 0.0 | |
| # Method 1: NLI-based evaluation | |
| with torch.no_grad(): | |
| inputs = nli_tokenizer.encode_plus( | |
| prompt, | |
| response, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512 | |
| ).to(nli_model.device) | |
| outputs = nli_model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] | |
| entail_prob, neutral_prob = 0.0, 0.0 | |
| for idx, p in enumerate(probs): | |
| label = id2label.get(idx, "") | |
| if "ENTAIL" in label: | |
| entail_prob = float(p) | |
| elif "NEUTRAL" in label: | |
| neutral_prob = float(p) | |
| nli_score = entail_prob + (neutral_prob * 0.5) | |
| # Method 2: Semantic similarity | |
| p_emb = embed_model.encode(prompt, convert_to_tensor=True) | |
| r_emb = embed_model.encode(response, convert_to_tensor=True) | |
| sim_score = float(util.cos_sim(p_emb, r_emb).item()) | |
| # Combined score (weighted average) | |
| final_score = 0.7 * nli_score + 0.3 * sim_score | |
| return round(max(0.0, min(1.0, final_score)), 3) | |
| def check_hallucination(reference: str, response: str) -> float: | |
| """Enhanced hallucination detection using multiple methods.""" | |
| if not reference or not response: | |
| return 0.0 | |
| # Method 1: NLI-based contradiction detection | |
| with torch.no_grad(): | |
| inputs = nli_tokenizer.encode_plus( | |
| reference, | |
| response, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512 | |
| ).to(nli_model.device) | |
| outputs = nli_model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()[0] | |
| contra_prob, neutral_prob = 0.0, 0.0 | |
| for idx, p in enumerate(probs): | |
| label = id2label.get(idx, "") | |
| if "CONTRA" in label: | |
| contra_prob = float(p) | |
| elif "NEUTRAL" in label: | |
| neutral_prob = float(p) | |
| nli_hallucination_score = contra_prob + (neutral_prob * 0.3) | |
| # Method 2: Semantic similarity penalty | |
| ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| resp_emb = embed_model.encode(response, convert_to_tensor=True) | |
| semantic_sim = float(util.cos_sim(ref_emb, resp_emb).item()) | |
| # Combined score: Higher when less hallucination | |
| hallucination_score = 1.0 - (0.7 * nli_hallucination_score + 0.3 * (1 - semantic_sim)) | |
| return round(max(0.0, min(1.0, hallucination_score)), 3) | |
| def check_assumption(response: str) -> float: | |
| """Improved assumption detection using pattern matching and LLM judgment.""" | |
| if not response: | |
| return 0.0 | |
| # Pattern-based detection | |
| speculative_patterns = [ | |
| r"\b(maybe|perhaps|possibly|probably|might|could|would|should)\b", | |
| r"\b(I think|I believe|I guess|I suppose|I assume)\b", | |
| r"\b(it seems|it appears|it looks like)\b", | |
| r"\b(likely|unlikely|presumably|arguably)\b", | |
| r"\b(some|many|most|often|usually|generally|typically)\b" | |
| ] | |
| pattern_count = sum( | |
| len(re.findall(pattern, response.lower())) | |
| for pattern in speculative_patterns | |
| ) | |
| # Length normalization | |
| word_count = len(response.split()) | |
| pattern_score = min(1.0, pattern_count / max(1, word_count / 5)) | |
| # LLM-based judgment | |
| assumption_prompt = f""" | |
| Determine if the following text contains assumptions, speculation, or hedging language. | |
| Text: {response} | |
| Answer with only 'yes' or 'no': | |
| """ | |
| with torch.no_grad(): | |
| inputs = judge_tokenizer.encode(assumption_prompt, return_tensors="pt") | |
| outputs = judge_model.generate( | |
| inputs, | |
| max_length=len(inputs[0]) + 3, | |
| pad_token_id=judge_tokenizer.eos_token_id | |
| ) | |
| judgment = judge_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| llm_score = 0.0 if "yes" in judgment.lower() else 1.0 | |
| # Combined score | |
| final_score = 0.6 * (1 - pattern_score) + 0.4 * llm_score | |
| return round(final_score, 3) | |
| def check_coherence(response: str) -> float: | |
| """Enhanced coherence evaluation using multiple linguistic features.""" | |
| if not response: | |
| return 0.0 | |
| # Feature 1: Sentence structure | |
| sentences = re.split(r'[.!?]+', response) | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 0] | |
| num_sentences = len(sentences) | |
| if num_sentences == 0: | |
| return 0.0 | |
| # Feature 2: Sentence length variation | |
| sent_lengths = [len(s.split()) for s in sentences] | |
| length_variance = np.var(sent_lengths) if len(sent_lengths) > 1 else 0 | |
| length_score = 1.0 - min(1.0, length_variance / 100) | |
| # Feature 3: Transition words | |
| transition_words = [ | |
| 'however', 'therefore', 'moreover', 'furthermore', 'consequently', | |
| 'additionally', 'likewise', 'similarly', 'nevertheless', 'nonetheless' | |
| ] | |
| transition_count = sum(1 for word in transition_words | |
| if word in response.lower()) | |
| transition_score = min(1.0, transition_count / 3) | |
| # Feature 4: Repetition penalty | |
| words = response.lower().split() | |
| unique_words = set(words) | |
| repetition_ratio = len(unique_words) / max(1, len(words)) | |
| # Combined score | |
| coherence_score = ( | |
| 0.3 * min(1.0, num_sentences / 5) + | |
| 0.2 * length_score + | |
| 0.3 * transition_score + | |
| 0.2 * repetition_ratio | |
| ) | |
| return round(max(0.0, min(1.0, coherence_score)), 3) | |
| def check_accuracy(reference: str, response: str) -> float: | |
| """Enhanced accuracy evaluation using multiple metrics.""" | |
| if not reference or not response: | |
| return 0.0 | |
| # BERTScore | |
| bert_results = bertscore.compute( | |
| predictions=[response], | |
| references=[reference], | |
| lang="en", | |
| model_type=EMBED_MODEL | |
| ) | |
| bert_f1 = bert_results['f1'][0] | |
| # ROUGE-L | |
| rouge_results = rouge.compute( | |
| predictions=[response], | |
| references=[reference], | |
| use_stemmer=True | |
| ) | |
| rouge_l = rouge_results['rougeL'] | |
| # BLEU (for shorter responses) | |
| try: | |
| bleu_results = bleu.compute( | |
| predictions=[response.split()], | |
| references=[[reference.split()]] | |
| ) | |
| bleu_score = bleu_results['bleu'] | |
| except: | |
| bleu_score = 0.0 | |
| # Semantic similarity | |
| ref_emb = embed_model.encode(reference, convert_to_tensor=True) | |
| resp_emb = embed_model.encode(response, convert_to_tensor=True) | |
| semantic_sim = float(util.cos_sim(ref_emb, resp_emb).item()) | |
| # Combined score (weighted average) | |
| accuracy_score = ( | |
| 0.4 * bert_f1 + | |
| 0.3 * rouge_l + | |
| 0.1 * bleu_score + | |
| 0.2 * semantic_sim | |
| ) | |
| return round(max(0.0, min(1.0, accuracy_score)), 3) | |
| def check_relevance(prompt: str, response: str) -> float: | |
| """Check how relevant the response is to the prompt.""" | |
| if not prompt or not response: | |
| return 0.0 | |
| # Encode both prompt and response | |
| p_emb = embed_model.encode(prompt, convert_to_tensor=True) | |
| r_emb = embed_model.encode(response, convert_to_tensor=True) | |
| # Calculate cosine similarity | |
| similarity = float(util.cos_sim(p_emb, r_emb).item()) | |
| return round(max(0.0, min(1.0, similarity)), 3) | |
| def check_fluency(response: str) -> float: | |
| """Check the fluency of the response using perplexity-based approach.""" | |
| if not response: | |
| return 0.0 | |
| # Load a fluency model (perplexity-based) | |
| fluency_checker = pipeline( | |
| "text-classification", | |
| model="textattack/roberta-base-CoLA", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| try: | |
| # Split into sentences if too long | |
| sentences = re.split(r'[.!?]+', response) | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 5] | |
| if not sentences: | |
| return 0.5 | |
| # Check each sentence | |
| fluency_scores = [] | |
| for sent in sentences[:3]: # Limit to first 3 sentences | |
| result = fluency_checker(sent[:512]) # Truncate if too long | |
| score = result[0]['score'] if result[0]['label'] == 'LABEL_1' else 1 - result[0]['score'] | |
| fluency_scores.append(score) | |
| avg_fluency = sum(fluency_scores) / len(fluency_scores) | |
| return round(avg_fluency, 3) | |
| except: | |
| # Fallback to simple heuristic | |
| words = response.split() | |
| if len(words) < 3: | |
| return 0.3 | |
| return 0.7 | |
| # -------------------------- | |
| # ROW & DF EVALUATION | |
| # -------------------------- | |
| def evaluate_row(row): | |
| prompt = row.get("prompt", "") | |
| response = row.get("response", "") | |
| reference = row.get("reference", "") | |
| metrics = { | |
| "task_id": row.get("task_id", ""), | |
| "agent": row.get("agent", ""), | |
| "instruction_following": check_instruction_following(prompt, response), | |
| "hallucination": check_hallucination(reference, response), | |
| "assumption": check_assumption(response), | |
| "coherence": check_coherence(response), | |
| "accuracy": check_accuracy(reference, response), | |
| "relevance": check_relevance(prompt, response), | |
| "fluency": check_fluency(response), | |
| } | |
| # Weighted avg score (adjust weights as needed) | |
| metrics["final_score"] = round( | |
| 0.20 * metrics["instruction_following"] + | |
| 0.20 * metrics["accuracy"] + | |
| 0.15 * metrics["hallucination"] + | |
| 0.10 * metrics["coherence"] + | |
| 0.10 * metrics["assumption"] + | |
| 0.15 * metrics["relevance"] + | |
| 0.10 * metrics["fluency"], | |
| 3, | |
| ) | |
| return metrics | |
| # -------------------------- | |
| # VISUALIZATION FUNCTIONS | |
| # -------------------------- | |
| def plot_radar_chart(metrics_df, agents, metrics, out_path="/tmp/radar.png"): | |
| """Radar chart comparing multiple agents across metrics.""" | |
| labels = metrics | |
| num_vars = len(labels) | |
| # Compute angle for each axis | |
| angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() | |
| angles += angles[:1] # close loop | |
| fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True)) | |
| for agent in agents: | |
| values = [] | |
| for m in metrics: | |
| mean_val = metrics_df.loc[metrics_df['agent'] == agent, m].mean() | |
| values.append(mean_val if not np.isnan(mean_val) else 0) | |
| values += values[:1] | |
| ax.plot(angles, values, label=agent, linewidth=2) | |
| ax.fill(angles, values, alpha=0.25) | |
| ax.set_xticks(angles[:-1]) | |
| ax.set_xticklabels(labels) | |
| ax.set_yticklabels([]) | |
| ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1)) | |
| ax.set_title("Agent Performance Radar Chart") | |
| plt.tight_layout() | |
| plt.savefig(out_path) | |
| plt.close() | |
| return out_path | |
| def plot_heatmap(metrics_df, out_path="/tmp/heatmap.png"): | |
| """Heatmap of agent performance across metrics.""" | |
| metrics = ["accuracy", "hallucination", "instruction_following", | |
| "coherence", "assumption", "relevance", "fluency"] | |
| pivot = metrics_df.groupby("agent")[metrics].mean() | |
| plt.figure(figsize=(10, 6)) | |
| sns.heatmap(pivot, annot=True, cmap="YlGnBu", fmt=".3f", center=0.5) | |
| plt.title("Agent Γ Metric Heatmap") | |
| plt.tight_layout() | |
| plt.savefig(out_path) | |
| plt.close() | |
| return out_path | |
| def plot_score_distribution(metrics_df, out_path="/tmp/distribution.png"): | |
| """Distribution of final scores by agent.""" | |
| plt.figure(figsize=(10, 6)) | |
| agents = metrics_df['agent'].unique() | |
| for agent in agents: | |
| agent_scores = metrics_df[metrics_df['agent'] == agent]['final_score'] | |
| sns.kdeplot(agent_scores, label=agent, fill=True, alpha=0.3) | |
| plt.xlabel('Final Score') | |
| plt.ylabel('Density') | |
| plt.title('Distribution of Final Scores by Agent') | |
| plt.legend() | |
| plt.tight_layout() | |
| plt.savefig(out_path) | |
| plt.close() | |
| return out_path | |
| def plot_metric_correlation(metrics_df, out_path="/tmp/correlation.png"): | |
| """Correlation matrix between different metrics.""" | |
| metrics = ["accuracy", "hallucination", "instruction_following", | |
| "coherence", "assumption", "relevance", "fluency", "final_score"] | |
| plt.figure(figsize=(10, 8)) | |
| correlation_matrix = metrics_df[metrics].corr() | |
| sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", center=0, | |
| fmt=".2f", square=True) | |
| plt.title('Correlation Between Metrics') | |
| plt.tight_layout() | |
| plt.savefig(out_path) | |
| plt.close() | |
| return out_path | |
| def plot_agent_comparison(metrics_df, out_path="/tmp/agent_comparison.png"): | |
| """Bar chart comparing agent performance across metrics.""" | |
| metrics = ["accuracy", "hallucination", "instruction_following", | |
| "coherence", "assumption", "relevance", "fluency"] | |
| agent_means = metrics_df.groupby('agent')[metrics].mean() | |
| plt.figure(figsize=(12, 6)) | |
| agent_means.plot(kind='bar', colormap='Set3') | |
| plt.title('Agent Performance Across Metrics') | |
| plt.xlabel('Agent') | |
| plt.ylabel('Score') | |
| plt.xticks(rotation=45) | |
| plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') | |
| plt.tight_layout() | |
| plt.savefig(out_path) | |
| plt.close() | |
| return out_path | |
| # -------------------------- | |
| # MAIN EVALUATION FUNCTION | |
| # -------------------------- | |
| def evaluate_dataframe(df: pd.DataFrame): | |
| """Evaluate a dataframe of agent responses.""" | |
| metrics_df = df.apply(evaluate_row, axis=1, result_type='expand') | |
| # Leaderboard | |
| leaderboard = ( | |
| metrics_df.groupby(["agent", "task_id"])["final_score"] | |
| .mean() | |
| .reset_index() | |
| ) | |
| # Generate visualizations | |
| images = [] | |
| # Add all visualizations | |
| agents = df["agent"].unique() | |
| metrics = ["accuracy", "hallucination", "instruction_following", | |
| "coherence", "assumption", "relevance", "fluency"] | |
| radar_path = plot_radar_chart(metrics_df, agents, metrics) | |
| images.append((radar_path, "Radar Chart: Agent vs Metrics")) | |
| heatmap_path = plot_heatmap(metrics_df) | |
| images.append((heatmap_path, "Heatmap: Agent vs Metrics")) | |
| distribution_path = plot_score_distribution(metrics_df) | |
| images.append((distribution_path, "Score Distribution by Agent")) | |
| correlation_path = plot_metric_correlation(metrics_df) | |
| images.append((correlation_path, "Metric Correlation Matrix")) | |
| agent_comparison_path = plot_agent_comparison(metrics_df) | |
| images.append((agent_comparison_path, "Agent Comparison Chart")) | |
| return metrics_df, images, leaderboard |