Spaces:
Sleeping
Sleeping
| import re | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple | |
| import json | |
| import torch | |
| from collections import defaultdict | |
| import spacy | |
| import evaluate | |
| from transformers import pipeline | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import hashlib | |
| from datetime import datetime | |
| import concurrent.futures | |
| import random | |
| import gc | |
| class AetherScoreEvaluator: | |
| def __init__(self): | |
| # NLP models | |
| try: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| print("Downloading 'en_core_web_sm' spacy model...") | |
| spacy.cli.download("en_core_web_sm") | |
| self.nlp = spacy.load("en_core_web_sm") | |
| # Initialize models with error handling | |
| self._initialize_models() | |
| # Scoring weights | |
| self.weights = { | |
| 'instruction_following': 0.25, | |
| 'hallucination_score': 0.20, | |
| 'assumption_control': 0.20, | |
| 'coherence': 0.20, | |
| 'accuracy': 0.15 | |
| } | |
| # In-memory cache | |
| self.cache = {} | |
| def _initialize_models(self): | |
| """Initialize all models with proper error handling""" | |
| try: | |
| # LLM Judge Model | |
| self.judge_model = pipeline( | |
| "text2text-generation", | |
| model="google/flan-t5-base", | |
| device=-1 # CPU only for stability | |
| ) | |
| # Sentence Transformer | |
| self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Evaluation metrics | |
| self.rouge = evaluate.load("rouge") | |
| self.sacrebleu = evaluate.load("sacrebleu") | |
| # NLI models | |
| self.nli_tokenizer = AutoTokenizer.from_pretrained("prajjwal1/bert-mini-mnli") | |
| self.nli_model = AutoModelForSequenceClassification.from_pretrained("prajjwal1/bert-mini-mnli") | |
| print("All models initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing models: {e}") | |
| # Fallback to basic functionality | |
| self._use_fallback_models() | |
| def _use_fallback_models(self): | |
| """Fallback to basic evaluation if model loading fails""" | |
| print("Using fallback evaluation methods") | |
| self.judge_model = None | |
| self.sentence_model = None | |
| self.rouge = None | |
| self.sacrebleu = None | |
| self.nli_tokenizer = None | |
| self.nli_model = None | |
| def _cleanup_models(self): | |
| """Clean up model memory""" | |
| if hasattr(self, 'nli_model') and self.nli_model is not None: | |
| del self.nli_model | |
| if hasattr(self, 'judge_model') and self.judge_model is not None: | |
| del self.judge_model | |
| torch.cuda.empty_cache() if torch.cuda.is_available() else None | |
| gc.collect() | |
| def _evaluate_with_llm_judge(self, prompt: str, response: str) -> dict: | |
| """ | |
| Hallucination detection with robust error handling | |
| """ | |
| try: | |
| # Step 1: Embedding similarity (with fallback) | |
| if self.sentence_model is not None: | |
| emb_sim = self._semantic_similarity(prompt, response) | |
| else: | |
| emb_sim = 0.5 # neutral fallback | |
| # Step 2: NLI check (with error handling) | |
| if self.nli_tokenizer is not None and self.nli_model is not None: | |
| try: | |
| inputs = self.nli_tokenizer.encode_plus( | |
| prompt, response, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512 # Limit token length | |
| ) | |
| with torch.no_grad(): | |
| logits = self.nli_model(**inputs).logits | |
| probs = torch.softmax(logits, dim=-1).cpu().numpy()[0] | |
| entailment, neutral, contradiction = probs[2], probs[1], probs[0] | |
| except Exception as nli_error: | |
| print(f"NLI evaluation failed: {nli_error}") | |
| entailment, neutral, contradiction = 0.33, 0.33, 0.34 | |
| else: | |
| entailment, neutral, contradiction = 0.33, 0.33, 0.34 | |
| # Step 3: ROUGE-L (with error handling) | |
| if self.rouge is not None: | |
| try: | |
| rouge_l = self.rouge.compute(predictions=[response], references=[prompt])["rougeL"] | |
| except Exception as rouge_error: | |
| print(f"ROUGE evaluation failed: {rouge_error}") | |
| rouge_l = 0.5 | |
| else: | |
| rouge_l = 0.5 | |
| # Step 4: SacreBLEU (with error handling) | |
| if self.sacrebleu is not None: | |
| try: | |
| sacrebleu = self.sacrebleu.compute(predictions=[response], references=[[prompt]])["score"] / 100.0 | |
| except Exception as bleu_error: | |
| print(f"BLEU evaluation failed: {bleu_error}") | |
| sacrebleu = 0.5 | |
| else: | |
| sacrebleu = 0.5 | |
| # Step 5: Weighted hallucination score | |
| weights = {"entailment": 0.4, "embedding": 0.2, "rouge": 0.2, "sacrebleu": 0.2} | |
| halluc_score = 1 - ( | |
| weights["entailment"] * entailment + | |
| weights["embedding"] * emb_sim + | |
| weights["rouge"] * rouge_l + | |
| weights["sacrebleu"] * sacrebleu | |
| ) | |
| # Step 6: Assumption control from neutrality | |
| assumption_score = 1 - neutral | |
| # Ensure scores are in valid range | |
| halluc_score = max(0.0, min(1.0, float(halluc_score))) | |
| assumption_score = max(0.0, min(1.0, float(assumption_score))) | |
| # Step 7: Explanations | |
| halluc_expl = ( | |
| f"Entailment={entailment:.2f}, Embedding={emb_sim:.2f}, " | |
| f"ROUGE-L={rouge_l:.2f}, SacreBLEU={sacrebleu:.2f}, Neutral={neutral:.2f}" | |
| ) | |
| assumption_expl = ( | |
| f"Assumption control derived from NLI neutrality={neutral:.2f}. " | |
| "Lower neutrality → stronger confidence." | |
| ) | |
| return { | |
| "hallucination_score": (halluc_score, halluc_expl), | |
| "assumption_control": (assumption_score, assumption_expl), | |
| } | |
| except Exception as e: | |
| print(f"Evaluation error: {e}") | |
| # Return fallback scores | |
| return { | |
| "hallucination_score": (0.5, f"Evaluation failed: {str(e)}"), | |
| "assumption_control": (0.5, f"Evaluation failed: {str(e)}"), | |
| } | |
| def evaluate_single(self, prompt: str, response: str, expected_answer: Optional[str] = None, task_type: str = "general") -> Dict: | |
| """Single evaluation with enhanced error handling""" | |
| try: | |
| # Input validation | |
| if not prompt or not response: | |
| return { | |
| "scores": {"overall_score": 0.0}, | |
| "reasons": {"error": "Empty prompt or response"} | |
| } | |
| # Generating Eval ID | |
| eval_id = self._generate_eval_id(prompt, response) | |
| scores, reasons = {}, {} | |
| # LLM Judge evaluation | |
| llm_judge_results = self._evaluate_with_llm_judge(prompt, response) | |
| scores['hallucination_score'], reasons['hallucination_score'] = llm_judge_results['hallucination_score'] | |
| scores['assumption_control'], reasons['assumption_control'] = llm_judge_results['assumption_control'] | |
| # Other evaluations | |
| scores['instruction_following'], reasons['instruction_following'] = self._evaluate_instruction_following(prompt, response) | |
| scores['coherence'], reasons['coherence'] = self._evaluate_coherence(response) | |
| if expected_answer: | |
| scores['accuracy'], reasons['accuracy'] = self._evaluate_accuracy(response, expected_answer, task_type) | |
| else: | |
| scores['accuracy'], reasons['accuracy'] = (0.5, "No expected answer provided.") | |
| # Calculate overall score | |
| scores['overall_score'] = self._calculate_overall_score(scores) | |
| reasons['overall_score'] = "Weighted average of component scores." | |
| # Add metadata | |
| scores.update({ | |
| 'eval_id': eval_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'task_type': task_type | |
| }) | |
| return {"scores": scores, "reasons": reasons} | |
| except Exception as e: | |
| print(f"Single evaluation error: {e}") | |
| return { | |
| "scores": {"overall_score": 0.0, "eval_id": "error"}, | |
| "reasons": {"error": str(e)} | |
| } | |
| def evaluate_batch(self, data: List[Dict], mode: str = "comprehensive") -> List[Dict]: | |
| """Process batch with improved error handling and cleanup""" | |
| if not data: | |
| return [] | |
| results = [] | |
| failed_count = 0 | |
| def process_item(item): | |
| try: | |
| return self.evaluate_single( | |
| prompt=item.get('prompt', ''), | |
| response=item.get('response', ''), | |
| expected_answer=item.get('expected_answer', ''), | |
| task_type=item.get('task_type', 'general') | |
| ) | |
| except Exception as e: | |
| print(f"Item processing failed: {e}") | |
| return None | |
| # Use smaller thread pool and add timeout | |
| max_workers = min(4, len(data)) # Limit concurrent threads | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all tasks with timeout | |
| future_to_item = { | |
| executor.submit(process_item, item): (i, item) | |
| for i, item in enumerate(data) | |
| } | |
| for future in concurrent.futures.as_completed(future_to_item, timeout=300): # 5 minute timeout | |
| try: | |
| result = future.result(timeout=30) # 30 second per item timeout | |
| if result: | |
| idx, item = future_to_item[future] | |
| result.update({ | |
| 'task_id': item.get('task_id', result['scores'].get('eval_id', f'task_{idx}')), | |
| 'agent_name': item.get('agent_name', 'Unknown'), | |
| }) | |
| results.append(result) | |
| else: | |
| failed_count += 1 | |
| except Exception as exc: | |
| failed_count += 1 | |
| print(f'Item generated exception: {exc}') | |
| if failed_count > 0: | |
| print(f"Warning: {failed_count} items failed to process") | |
| # Cleanup after batch processing | |
| if len(data) > 10: # Only cleanup for larger batches | |
| gc.collect() | |
| return results | |
| def _evaluate_instruction_following(self, prompt: str, response: str) -> Tuple[float, str]: | |
| """Evaluate instruction following with better error handling""" | |
| try: | |
| score, checks, passed = 1.0, 0, 0 | |
| # Check for negative constraints | |
| negations = re.findall(r"(don't|do not|avoid|without) ([\w\s,]+)", prompt.lower()) | |
| for _, constraint_phrase in negations: | |
| checks += 1 | |
| words_to_avoid = [w.strip() for w in constraint_phrase.split(',')] | |
| if not any(word in response.lower() for word in words_to_avoid if len(word) > 2): | |
| passed += 1 | |
| # Fallback to semantic similarity if no specific instructions found | |
| if checks == 0: | |
| sim = self._semantic_similarity(prompt, response) | |
| return sim, f"No specific constraints found. Score based on semantic similarity ({sim:.2f}) to prompt." | |
| score = passed / checks if checks > 0 else 1.0 | |
| reason = f"{passed}/{checks} specific constraints were followed." | |
| return score, reason | |
| except Exception as e: | |
| return 0.5, f"Instruction evaluation failed: {str(e)}" | |
| def _evaluate_coherence(self, response: str) -> Tuple[float, str]: | |
| """Evaluate coherence with error handling""" | |
| try: | |
| if not response.strip(): | |
| return 0.1, "Empty response" | |
| doc = self.nlp(response) | |
| sentences = [sent.text for sent in doc.sents if sent.text.strip()] | |
| if len(sentences) < 2: | |
| return 0.7, "Coherence is neutral for single-sentence responses." | |
| if self.sentence_model is not None: | |
| embeddings = self.sentence_model.encode(sentences) | |
| sims = [cosine_similarity([embeddings[i]], [embeddings[i+1]])[0][0] for i in range(len(sentences)-1)] | |
| score = np.mean(sims) | |
| else: | |
| score = 0.7 # fallback | |
| reason = f"Average sentence-to-sentence similarity score is {score:.2f} across {len(sentences)} sentences." | |
| return float(score), reason | |
| except Exception as e: | |
| return 0.5, f"Coherence evaluation failed: {str(e)}" | |
| def _evaluate_accuracy(self, response: str, expected: str, task_type: str) -> Tuple[float, str]: | |
| """Evaluate accuracy with error handling""" | |
| try: | |
| sim = self._semantic_similarity(response, expected) | |
| reason = f"Semantic similarity between response and expected answer is {sim:.2f}." | |
| if sim > 0.95: | |
| reason += " (High match)" | |
| elif sim < 0.5: | |
| reason += " (Low match)" | |
| return sim, reason | |
| except Exception as e: | |
| return 0.5, f"Accuracy evaluation failed: {str(e)}" | |
| def _calculate_overall_score(self, scores: Dict) -> float: | |
| """Calculate overall score with error handling""" | |
| try: | |
| total, weight_sum = 0.0, 0.0 | |
| for metric, weight in self.weights.items(): | |
| if metric in scores and isinstance(scores[metric], (int, float)): | |
| total += float(scores[metric]) * weight | |
| weight_sum += weight | |
| return total / weight_sum if weight_sum > 0 else 0.5 | |
| except Exception: | |
| return 0.5 | |
| def generate_explanation(self, scores: Dict) -> str: | |
| """Generate explanation with error handling""" | |
| try: | |
| explanation = [] | |
| overall = scores.get('overall_score', 0) | |
| explanation.append(f"Overall Score: {overall:.2f}/1.00 - Reflects a weighted average of all dimensions.") | |
| if scores.get('instruction_following', 0) < 0.6: | |
| explanation.append("Low Instruction Following: The response may have ignored key constraints or parts of the prompt.") | |
| if scores.get('hallucination_score', 0) < 0.6: | |
| explanation.append("Potential Hallucination: The response might contain unverified or fabricated information.") | |
| if scores.get('accuracy', 0) < 0.6 and scores.get('accuracy', 0.5) != 0.5: | |
| explanation.append("Low Accuracy: The response significantly differs from the provided expected answer.") | |
| if len(explanation) == 1: | |
| explanation.append("Great Performance: The agent performed well across the primary evaluation dimensions.") | |
| return "\n".join(explanation) | |
| except Exception as e: | |
| return f"Explanation generation failed: {str(e)}" | |
| def get_agent_scores_from_results(self, results: List[Dict]) -> Dict[str, List[float]]: | |
| """Get agent scores with error handling""" | |
| agent_scores = defaultdict(list) | |
| for result in results: | |
| try: | |
| agent_name = result.get('agent_name', 'Unknown') | |
| overall_score = result.get('scores', {}).get('overall_score', 0) | |
| if isinstance(overall_score, (int, float)) and not np.isnan(overall_score): | |
| agent_scores[agent_name].append(float(overall_score)) | |
| except Exception as e: | |
| print(f"Error processing result: {e}") | |
| continue | |
| return agent_scores | |
| def _generate_eval_id(self, prompt: str, response: str) -> str: | |
| """Generate evaluation ID""" | |
| try: | |
| return hashlib.md5(f"{prompt}{response}".encode()).hexdigest()[:12] | |
| except Exception: | |
| return hashlib.md5(f"fallback{datetime.now()}".encode()).hexdigest()[:12] | |
| def _semantic_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate semantic similarity with error handling""" | |
| try: | |
| if not text1 or not text2 or self.sentence_model is None: | |
| return 0.0 | |
| emb1 = self.sentence_model.encode([text1]) | |
| emb2 = self.sentence_model.encode([text2]) | |
| sim = cosine_similarity(emb1, emb2)[0][0] | |
| return float(sim) if not np.isnan(sim) else 0.0 | |
| except Exception as e: | |
| print(f"Similarity calculation failed: {e}") | |
| return 0.0 | |
| def __del__(self): | |
| """Cleanup when object is destroyed""" | |
| try: | |
| self._cleanup_models() | |
| except Exception: | |
| pass |