import re import numpy as np from typing import Dict, List, Optional, Tuple import json import torch from collections import defaultdict import spacy import evaluate from transformers import pipeline from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModelForSequenceClassification from sklearn.metrics.pairwise import cosine_similarity import hashlib from datetime import datetime import concurrent.futures import random import gc class AetherScoreEvaluator: def __init__(self): # NLP models try: self.nlp = spacy.load("en_core_web_sm") except OSError: print("Downloading 'en_core_web_sm' spacy model...") spacy.cli.download("en_core_web_sm") self.nlp = spacy.load("en_core_web_sm") # Initialize models with error handling self._initialize_models() # Scoring weights self.weights = { 'instruction_following': 0.25, 'hallucination_score': 0.20, 'assumption_control': 0.20, 'coherence': 0.20, 'accuracy': 0.15 } # In-memory cache self.cache = {} def _initialize_models(self): """Initialize all models with proper error handling""" try: # LLM Judge Model self.judge_model = pipeline( "text2text-generation", model="google/flan-t5-base", device=-1 # CPU only for stability ) # Sentence Transformer self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2') # Evaluation metrics self.rouge = evaluate.load("rouge") self.sacrebleu = evaluate.load("sacrebleu") # NLI models self.nli_tokenizer = AutoTokenizer.from_pretrained("prajjwal1/bert-mini-mnli") self.nli_model = AutoModelForSequenceClassification.from_pretrained("prajjwal1/bert-mini-mnli") print("All models initialized successfully") except Exception as e: print(f"Error initializing models: {e}") # Fallback to basic functionality self._use_fallback_models() def _use_fallback_models(self): """Fallback to basic evaluation if model loading fails""" print("Using fallback evaluation methods") self.judge_model = None self.sentence_model = None self.rouge = None self.sacrebleu = None self.nli_tokenizer = None self.nli_model = None def _cleanup_models(self): """Clean up model memory""" if hasattr(self, 'nli_model') and self.nli_model is not None: del self.nli_model if hasattr(self, 'judge_model') and self.judge_model is not None: del self.judge_model torch.cuda.empty_cache() if torch.cuda.is_available() else None gc.collect() def _evaluate_with_llm_judge(self, prompt: str, response: str) -> dict: """ Hallucination detection with robust error handling """ try: # Step 1: Embedding similarity (with fallback) if self.sentence_model is not None: emb_sim = self._semantic_similarity(prompt, response) else: emb_sim = 0.5 # neutral fallback # Step 2: NLI check (with error handling) if self.nli_tokenizer is not None and self.nli_model is not None: try: inputs = self.nli_tokenizer.encode_plus( prompt, response, return_tensors="pt", truncation=True, max_length=512 # Limit token length ) with torch.no_grad(): logits = self.nli_model(**inputs).logits probs = torch.softmax(logits, dim=-1).cpu().numpy()[0] entailment, neutral, contradiction = probs[2], probs[1], probs[0] except Exception as nli_error: print(f"NLI evaluation failed: {nli_error}") entailment, neutral, contradiction = 0.33, 0.33, 0.34 else: entailment, neutral, contradiction = 0.33, 0.33, 0.34 # Step 3: ROUGE-L (with error handling) if self.rouge is not None: try: rouge_l = self.rouge.compute(predictions=[response], references=[prompt])["rougeL"] except Exception as rouge_error: print(f"ROUGE evaluation failed: {rouge_error}") rouge_l = 0.5 else: rouge_l = 0.5 # Step 4: SacreBLEU (with error handling) if self.sacrebleu is not None: try: sacrebleu = self.sacrebleu.compute(predictions=[response], references=[[prompt]])["score"] / 100.0 except Exception as bleu_error: print(f"BLEU evaluation failed: {bleu_error}") sacrebleu = 0.5 else: sacrebleu = 0.5 # Step 5: Weighted hallucination score weights = {"entailment": 0.4, "embedding": 0.2, "rouge": 0.2, "sacrebleu": 0.2} halluc_score = 1 - ( weights["entailment"] * entailment + weights["embedding"] * emb_sim + weights["rouge"] * rouge_l + weights["sacrebleu"] * sacrebleu ) # Step 6: Assumption control from neutrality assumption_score = 1 - neutral # Ensure scores are in valid range halluc_score = max(0.0, min(1.0, float(halluc_score))) assumption_score = max(0.0, min(1.0, float(assumption_score))) # Step 7: Explanations halluc_expl = ( f"Entailment={entailment:.2f}, Embedding={emb_sim:.2f}, " f"ROUGE-L={rouge_l:.2f}, SacreBLEU={sacrebleu:.2f}, Neutral={neutral:.2f}" ) assumption_expl = ( f"Assumption control derived from NLI neutrality={neutral:.2f}. " "Lower neutrality → stronger confidence." ) return { "hallucination_score": (halluc_score, halluc_expl), "assumption_control": (assumption_score, assumption_expl), } except Exception as e: print(f"Evaluation error: {e}") # Return fallback scores return { "hallucination_score": (0.5, f"Evaluation failed: {str(e)}"), "assumption_control": (0.5, f"Evaluation failed: {str(e)}"), } def evaluate_single(self, prompt: str, response: str, expected_answer: Optional[str] = None, task_type: str = "general") -> Dict: """Single evaluation with enhanced error handling""" try: # Input validation if not prompt or not response: return { "scores": {"overall_score": 0.0}, "reasons": {"error": "Empty prompt or response"} } # Generating Eval ID eval_id = self._generate_eval_id(prompt, response) scores, reasons = {}, {} # LLM Judge evaluation llm_judge_results = self._evaluate_with_llm_judge(prompt, response) scores['hallucination_score'], reasons['hallucination_score'] = llm_judge_results['hallucination_score'] scores['assumption_control'], reasons['assumption_control'] = llm_judge_results['assumption_control'] # Other evaluations scores['instruction_following'], reasons['instruction_following'] = self._evaluate_instruction_following(prompt, response) scores['coherence'], reasons['coherence'] = self._evaluate_coherence(response) if expected_answer: scores['accuracy'], reasons['accuracy'] = self._evaluate_accuracy(response, expected_answer, task_type) else: scores['accuracy'], reasons['accuracy'] = (0.5, "No expected answer provided.") # Calculate overall score scores['overall_score'] = self._calculate_overall_score(scores) reasons['overall_score'] = "Weighted average of component scores." # Add metadata scores.update({ 'eval_id': eval_id, 'timestamp': datetime.now().isoformat(), 'task_type': task_type }) return {"scores": scores, "reasons": reasons} except Exception as e: print(f"Single evaluation error: {e}") return { "scores": {"overall_score": 0.0, "eval_id": "error"}, "reasons": {"error": str(e)} } def evaluate_batch(self, data: List[Dict], mode: str = "comprehensive") -> List[Dict]: """Process batch with improved error handling and cleanup""" if not data: return [] results = [] failed_count = 0 def process_item(item): try: return self.evaluate_single( prompt=item.get('prompt', ''), response=item.get('response', ''), expected_answer=item.get('expected_answer', ''), task_type=item.get('task_type', 'general') ) except Exception as e: print(f"Item processing failed: {e}") return None # Use smaller thread pool and add timeout max_workers = min(4, len(data)) # Limit concurrent threads with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks with timeout future_to_item = { executor.submit(process_item, item): (i, item) for i, item in enumerate(data) } for future in concurrent.futures.as_completed(future_to_item, timeout=300): # 5 minute timeout try: result = future.result(timeout=30) # 30 second per item timeout if result: idx, item = future_to_item[future] result.update({ 'task_id': item.get('task_id', result['scores'].get('eval_id', f'task_{idx}')), 'agent_name': item.get('agent_name', 'Unknown'), }) results.append(result) else: failed_count += 1 except Exception as exc: failed_count += 1 print(f'Item generated exception: {exc}') if failed_count > 0: print(f"Warning: {failed_count} items failed to process") # Cleanup after batch processing if len(data) > 10: # Only cleanup for larger batches gc.collect() return results def _evaluate_instruction_following(self, prompt: str, response: str) -> Tuple[float, str]: """Evaluate instruction following with better error handling""" try: score, checks, passed = 1.0, 0, 0 # Check for negative constraints negations = re.findall(r"(don't|do not|avoid|without) ([\w\s,]+)", prompt.lower()) for _, constraint_phrase in negations: checks += 1 words_to_avoid = [w.strip() for w in constraint_phrase.split(',')] if not any(word in response.lower() for word in words_to_avoid if len(word) > 2): passed += 1 # Fallback to semantic similarity if no specific instructions found if checks == 0: sim = self._semantic_similarity(prompt, response) return sim, f"No specific constraints found. Score based on semantic similarity ({sim:.2f}) to prompt." score = passed / checks if checks > 0 else 1.0 reason = f"{passed}/{checks} specific constraints were followed." return score, reason except Exception as e: return 0.5, f"Instruction evaluation failed: {str(e)}" def _evaluate_coherence(self, response: str) -> Tuple[float, str]: """Evaluate coherence with error handling""" try: if not response.strip(): return 0.1, "Empty response" doc = self.nlp(response) sentences = [sent.text for sent in doc.sents if sent.text.strip()] if len(sentences) < 2: return 0.7, "Coherence is neutral for single-sentence responses." if self.sentence_model is not None: embeddings = self.sentence_model.encode(sentences) sims = [cosine_similarity([embeddings[i]], [embeddings[i+1]])[0][0] for i in range(len(sentences)-1)] score = np.mean(sims) else: score = 0.7 # fallback reason = f"Average sentence-to-sentence similarity score is {score:.2f} across {len(sentences)} sentences." return float(score), reason except Exception as e: return 0.5, f"Coherence evaluation failed: {str(e)}" def _evaluate_accuracy(self, response: str, expected: str, task_type: str) -> Tuple[float, str]: """Evaluate accuracy with error handling""" try: sim = self._semantic_similarity(response, expected) reason = f"Semantic similarity between response and expected answer is {sim:.2f}." if sim > 0.95: reason += " (High match)" elif sim < 0.5: reason += " (Low match)" return sim, reason except Exception as e: return 0.5, f"Accuracy evaluation failed: {str(e)}" def _calculate_overall_score(self, scores: Dict) -> float: """Calculate overall score with error handling""" try: total, weight_sum = 0.0, 0.0 for metric, weight in self.weights.items(): if metric in scores and isinstance(scores[metric], (int, float)): total += float(scores[metric]) * weight weight_sum += weight return total / weight_sum if weight_sum > 0 else 0.5 except Exception: return 0.5 def generate_explanation(self, scores: Dict) -> str: """Generate explanation with error handling""" try: explanation = [] overall = scores.get('overall_score', 0) explanation.append(f"Overall Score: {overall:.2f}/1.00 - Reflects a weighted average of all dimensions.") if scores.get('instruction_following', 0) < 0.6: explanation.append("Low Instruction Following: The response may have ignored key constraints or parts of the prompt.") if scores.get('hallucination_score', 0) < 0.6: explanation.append("Potential Hallucination: The response might contain unverified or fabricated information.") if scores.get('accuracy', 0) < 0.6 and scores.get('accuracy', 0.5) != 0.5: explanation.append("Low Accuracy: The response significantly differs from the provided expected answer.") if len(explanation) == 1: explanation.append("Great Performance: The agent performed well across the primary evaluation dimensions.") return "\n".join(explanation) except Exception as e: return f"Explanation generation failed: {str(e)}" def get_agent_scores_from_results(self, results: List[Dict]) -> Dict[str, List[float]]: """Get agent scores with error handling""" agent_scores = defaultdict(list) for result in results: try: agent_name = result.get('agent_name', 'Unknown') overall_score = result.get('scores', {}).get('overall_score', 0) if isinstance(overall_score, (int, float)) and not np.isnan(overall_score): agent_scores[agent_name].append(float(overall_score)) except Exception as e: print(f"Error processing result: {e}") continue return agent_scores def _generate_eval_id(self, prompt: str, response: str) -> str: """Generate evaluation ID""" try: return hashlib.md5(f"{prompt}{response}".encode()).hexdigest()[:12] except Exception: return hashlib.md5(f"fallback{datetime.now()}".encode()).hexdigest()[:12] def _semantic_similarity(self, text1: str, text2: str) -> float: """Calculate semantic similarity with error handling""" try: if not text1 or not text2 or self.sentence_model is None: return 0.0 emb1 = self.sentence_model.encode([text1]) emb2 = self.sentence_model.encode([text2]) sim = cosine_similarity(emb1, emb2)[0][0] return float(sim) if not np.isnan(sim) else 0.0 except Exception as e: print(f"Similarity calculation failed: {e}") return 0.0 def __del__(self): """Cleanup when object is destroyed""" try: self._cleanup_models() except Exception: pass