e6test / evaluator_module.py
aaditya-raj's picture
Update evaluator_module.py
d29d0c6 verified
import re
import numpy as np
from typing import Dict, List, Optional, Tuple
import json
import torch
from collections import defaultdict
import spacy
import evaluate
from transformers import pipeline
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.metrics.pairwise import cosine_similarity
import hashlib
from datetime import datetime
import concurrent.futures
import random
import gc
class AetherScoreEvaluator:
def __init__(self):
# NLP models
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
print("Downloading 'en_core_web_sm' spacy model...")
spacy.cli.download("en_core_web_sm")
self.nlp = spacy.load("en_core_web_sm")
# Initialize models with error handling
self._initialize_models()
# Scoring weights
self.weights = {
'instruction_following': 0.25,
'hallucination_score': 0.20,
'assumption_control': 0.20,
'coherence': 0.20,
'accuracy': 0.15
}
# In-memory cache
self.cache = {}
def _initialize_models(self):
"""Initialize all models with proper error handling"""
try:
# LLM Judge Model
self.judge_model = pipeline(
"text2text-generation",
model="google/flan-t5-base",
device=-1 # CPU only for stability
)
# Sentence Transformer
self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
# Evaluation metrics
self.rouge = evaluate.load("rouge")
self.sacrebleu = evaluate.load("sacrebleu")
# NLI models
self.nli_tokenizer = AutoTokenizer.from_pretrained("prajjwal1/bert-mini-mnli")
self.nli_model = AutoModelForSequenceClassification.from_pretrained("prajjwal1/bert-mini-mnli")
print("All models initialized successfully")
except Exception as e:
print(f"Error initializing models: {e}")
# Fallback to basic functionality
self._use_fallback_models()
def _use_fallback_models(self):
"""Fallback to basic evaluation if model loading fails"""
print("Using fallback evaluation methods")
self.judge_model = None
self.sentence_model = None
self.rouge = None
self.sacrebleu = None
self.nli_tokenizer = None
self.nli_model = None
def _cleanup_models(self):
"""Clean up model memory"""
if hasattr(self, 'nli_model') and self.nli_model is not None:
del self.nli_model
if hasattr(self, 'judge_model') and self.judge_model is not None:
del self.judge_model
torch.cuda.empty_cache() if torch.cuda.is_available() else None
gc.collect()
def _evaluate_with_llm_judge(self, prompt: str, response: str) -> dict:
"""
Hallucination detection with robust error handling
"""
try:
# Step 1: Embedding similarity (with fallback)
if self.sentence_model is not None:
emb_sim = self._semantic_similarity(prompt, response)
else:
emb_sim = 0.5 # neutral fallback
# Step 2: NLI check (with error handling)
if self.nli_tokenizer is not None and self.nli_model is not None:
try:
inputs = self.nli_tokenizer.encode_plus(
prompt, response,
return_tensors="pt",
truncation=True,
max_length=512 # Limit token length
)
with torch.no_grad():
logits = self.nli_model(**inputs).logits
probs = torch.softmax(logits, dim=-1).cpu().numpy()[0]
entailment, neutral, contradiction = probs[2], probs[1], probs[0]
except Exception as nli_error:
print(f"NLI evaluation failed: {nli_error}")
entailment, neutral, contradiction = 0.33, 0.33, 0.34
else:
entailment, neutral, contradiction = 0.33, 0.33, 0.34
# Step 3: ROUGE-L (with error handling)
if self.rouge is not None:
try:
rouge_l = self.rouge.compute(predictions=[response], references=[prompt])["rougeL"]
except Exception as rouge_error:
print(f"ROUGE evaluation failed: {rouge_error}")
rouge_l = 0.5
else:
rouge_l = 0.5
# Step 4: SacreBLEU (with error handling)
if self.sacrebleu is not None:
try:
sacrebleu = self.sacrebleu.compute(predictions=[response], references=[[prompt]])["score"] / 100.0
except Exception as bleu_error:
print(f"BLEU evaluation failed: {bleu_error}")
sacrebleu = 0.5
else:
sacrebleu = 0.5
# Step 5: Weighted hallucination score
weights = {"entailment": 0.4, "embedding": 0.2, "rouge": 0.2, "sacrebleu": 0.2}
halluc_score = 1 - (
weights["entailment"] * entailment +
weights["embedding"] * emb_sim +
weights["rouge"] * rouge_l +
weights["sacrebleu"] * sacrebleu
)
# Step 6: Assumption control from neutrality
assumption_score = 1 - neutral
# Ensure scores are in valid range
halluc_score = max(0.0, min(1.0, float(halluc_score)))
assumption_score = max(0.0, min(1.0, float(assumption_score)))
# Step 7: Explanations
halluc_expl = (
f"Entailment={entailment:.2f}, Embedding={emb_sim:.2f}, "
f"ROUGE-L={rouge_l:.2f}, SacreBLEU={sacrebleu:.2f}, Neutral={neutral:.2f}"
)
assumption_expl = (
f"Assumption control derived from NLI neutrality={neutral:.2f}. "
"Lower neutrality → stronger confidence."
)
return {
"hallucination_score": (halluc_score, halluc_expl),
"assumption_control": (assumption_score, assumption_expl),
}
except Exception as e:
print(f"Evaluation error: {e}")
# Return fallback scores
return {
"hallucination_score": (0.5, f"Evaluation failed: {str(e)}"),
"assumption_control": (0.5, f"Evaluation failed: {str(e)}"),
}
def evaluate_single(self, prompt: str, response: str, expected_answer: Optional[str] = None, task_type: str = "general") -> Dict:
"""Single evaluation with enhanced error handling"""
try:
# Input validation
if not prompt or not response:
return {
"scores": {"overall_score": 0.0},
"reasons": {"error": "Empty prompt or response"}
}
# Generating Eval ID
eval_id = self._generate_eval_id(prompt, response)
scores, reasons = {}, {}
# LLM Judge evaluation
llm_judge_results = self._evaluate_with_llm_judge(prompt, response)
scores['hallucination_score'], reasons['hallucination_score'] = llm_judge_results['hallucination_score']
scores['assumption_control'], reasons['assumption_control'] = llm_judge_results['assumption_control']
# Other evaluations
scores['instruction_following'], reasons['instruction_following'] = self._evaluate_instruction_following(prompt, response)
scores['coherence'], reasons['coherence'] = self._evaluate_coherence(response)
if expected_answer:
scores['accuracy'], reasons['accuracy'] = self._evaluate_accuracy(response, expected_answer, task_type)
else:
scores['accuracy'], reasons['accuracy'] = (0.5, "No expected answer provided.")
# Calculate overall score
scores['overall_score'] = self._calculate_overall_score(scores)
reasons['overall_score'] = "Weighted average of component scores."
# Add metadata
scores.update({
'eval_id': eval_id,
'timestamp': datetime.now().isoformat(),
'task_type': task_type
})
return {"scores": scores, "reasons": reasons}
except Exception as e:
print(f"Single evaluation error: {e}")
return {
"scores": {"overall_score": 0.0, "eval_id": "error"},
"reasons": {"error": str(e)}
}
def evaluate_batch(self, data: List[Dict], mode: str = "comprehensive") -> List[Dict]:
"""Process batch with improved error handling and cleanup"""
if not data:
return []
results = []
failed_count = 0
def process_item(item):
try:
return self.evaluate_single(
prompt=item.get('prompt', ''),
response=item.get('response', ''),
expected_answer=item.get('expected_answer', ''),
task_type=item.get('task_type', 'general')
)
except Exception as e:
print(f"Item processing failed: {e}")
return None
# Use smaller thread pool and add timeout
max_workers = min(4, len(data)) # Limit concurrent threads
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks with timeout
future_to_item = {
executor.submit(process_item, item): (i, item)
for i, item in enumerate(data)
}
for future in concurrent.futures.as_completed(future_to_item, timeout=300): # 5 minute timeout
try:
result = future.result(timeout=30) # 30 second per item timeout
if result:
idx, item = future_to_item[future]
result.update({
'task_id': item.get('task_id', result['scores'].get('eval_id', f'task_{idx}')),
'agent_name': item.get('agent_name', 'Unknown'),
})
results.append(result)
else:
failed_count += 1
except Exception as exc:
failed_count += 1
print(f'Item generated exception: {exc}')
if failed_count > 0:
print(f"Warning: {failed_count} items failed to process")
# Cleanup after batch processing
if len(data) > 10: # Only cleanup for larger batches
gc.collect()
return results
def _evaluate_instruction_following(self, prompt: str, response: str) -> Tuple[float, str]:
"""Evaluate instruction following with better error handling"""
try:
score, checks, passed = 1.0, 0, 0
# Check for negative constraints
negations = re.findall(r"(don't|do not|avoid|without) ([\w\s,]+)", prompt.lower())
for _, constraint_phrase in negations:
checks += 1
words_to_avoid = [w.strip() for w in constraint_phrase.split(',')]
if not any(word in response.lower() for word in words_to_avoid if len(word) > 2):
passed += 1
# Fallback to semantic similarity if no specific instructions found
if checks == 0:
sim = self._semantic_similarity(prompt, response)
return sim, f"No specific constraints found. Score based on semantic similarity ({sim:.2f}) to prompt."
score = passed / checks if checks > 0 else 1.0
reason = f"{passed}/{checks} specific constraints were followed."
return score, reason
except Exception as e:
return 0.5, f"Instruction evaluation failed: {str(e)}"
def _evaluate_coherence(self, response: str) -> Tuple[float, str]:
"""Evaluate coherence with error handling"""
try:
if not response.strip():
return 0.1, "Empty response"
doc = self.nlp(response)
sentences = [sent.text for sent in doc.sents if sent.text.strip()]
if len(sentences) < 2:
return 0.7, "Coherence is neutral for single-sentence responses."
if self.sentence_model is not None:
embeddings = self.sentence_model.encode(sentences)
sims = [cosine_similarity([embeddings[i]], [embeddings[i+1]])[0][0] for i in range(len(sentences)-1)]
score = np.mean(sims)
else:
score = 0.7 # fallback
reason = f"Average sentence-to-sentence similarity score is {score:.2f} across {len(sentences)} sentences."
return float(score), reason
except Exception as e:
return 0.5, f"Coherence evaluation failed: {str(e)}"
def _evaluate_accuracy(self, response: str, expected: str, task_type: str) -> Tuple[float, str]:
"""Evaluate accuracy with error handling"""
try:
sim = self._semantic_similarity(response, expected)
reason = f"Semantic similarity between response and expected answer is {sim:.2f}."
if sim > 0.95:
reason += " (High match)"
elif sim < 0.5:
reason += " (Low match)"
return sim, reason
except Exception as e:
return 0.5, f"Accuracy evaluation failed: {str(e)}"
def _calculate_overall_score(self, scores: Dict) -> float:
"""Calculate overall score with error handling"""
try:
total, weight_sum = 0.0, 0.0
for metric, weight in self.weights.items():
if metric in scores and isinstance(scores[metric], (int, float)):
total += float(scores[metric]) * weight
weight_sum += weight
return total / weight_sum if weight_sum > 0 else 0.5
except Exception:
return 0.5
def generate_explanation(self, scores: Dict) -> str:
"""Generate explanation with error handling"""
try:
explanation = []
overall = scores.get('overall_score', 0)
explanation.append(f"Overall Score: {overall:.2f}/1.00 - Reflects a weighted average of all dimensions.")
if scores.get('instruction_following', 0) < 0.6:
explanation.append("Low Instruction Following: The response may have ignored key constraints or parts of the prompt.")
if scores.get('hallucination_score', 0) < 0.6:
explanation.append("Potential Hallucination: The response might contain unverified or fabricated information.")
if scores.get('accuracy', 0) < 0.6 and scores.get('accuracy', 0.5) != 0.5:
explanation.append("Low Accuracy: The response significantly differs from the provided expected answer.")
if len(explanation) == 1:
explanation.append("Great Performance: The agent performed well across the primary evaluation dimensions.")
return "\n".join(explanation)
except Exception as e:
return f"Explanation generation failed: {str(e)}"
def get_agent_scores_from_results(self, results: List[Dict]) -> Dict[str, List[float]]:
"""Get agent scores with error handling"""
agent_scores = defaultdict(list)
for result in results:
try:
agent_name = result.get('agent_name', 'Unknown')
overall_score = result.get('scores', {}).get('overall_score', 0)
if isinstance(overall_score, (int, float)) and not np.isnan(overall_score):
agent_scores[agent_name].append(float(overall_score))
except Exception as e:
print(f"Error processing result: {e}")
continue
return agent_scores
def _generate_eval_id(self, prompt: str, response: str) -> str:
"""Generate evaluation ID"""
try:
return hashlib.md5(f"{prompt}{response}".encode()).hexdigest()[:12]
except Exception:
return hashlib.md5(f"fallback{datetime.now()}".encode()).hexdigest()[:12]
def _semantic_similarity(self, text1: str, text2: str) -> float:
"""Calculate semantic similarity with error handling"""
try:
if not text1 or not text2 or self.sentence_model is None:
return 0.0
emb1 = self.sentence_model.encode([text1])
emb2 = self.sentence_model.encode([text2])
sim = cosine_similarity(emb1, emb2)[0][0]
return float(sim) if not np.isnan(sim) else 0.0
except Exception as e:
print(f"Similarity calculation failed: {e}")
return 0.0
def __del__(self):
"""Cleanup when object is destroyed"""
try:
self._cleanup_models()
except Exception:
pass