""" Universal Semantic Evaluator for ANY prompt optimization use case. This evaluator uses LLM-powered semantic analysis to compare predicted vs expected outputs, enabling prompt optimization for ANY task without requiring custom evaluator code. Key Features: - Semantic understanding (not just string matching) - Works with text, JSON, numbers, structured outputs - Provides rich feedback for GEPA reflection - No task-specific assumptions """ import json import re import logging from typing import Dict, Any, Optional, List from difflib import SequenceMatcher from .base_evaluator import BaseEvaluator logger = logging.getLogger(__name__) class UniversalSemanticEvaluator(BaseEvaluator): """ Universal evaluator using LLM for semantic comparison. Works for ANY task without hardcoded assumptions: - Text outputs: "The answer is 42" vs "42" - JSON outputs: {"count": 23} vs {"count": 22} - Structured data: Lists, nested objects - Multi-modal: Image descriptions, analysis results Evaluation Strategy: 1. Quick checks (exact match, empty handling) 2. Structural comparison (for JSON/structured data) 3. LLM semantic analysis (for meaning understanding) 4. Combine into composite score with rich feedback """ def __init__( self, llm_client=None, use_llm_analysis: bool = True, semantic_weight: float = 0.6, structural_weight: float = 0.25, exact_match_bonus: float = 0.15, metric_weights: Optional[Dict[str, float]] = None ): """ Initialize Universal Semantic Evaluator. Args: llm_client: LLM client for semantic analysis (optional, falls back to heuristics) use_llm_analysis: Whether to use LLM for semantic comparison semantic_weight: Weight for semantic similarity (0.0-1.0) structural_weight: Weight for structural similarity (0.0-1.0) exact_match_bonus: Bonus weight for exact matches (0.0-1.0) metric_weights: Optional custom weights (overrides above) """ default_weights = metric_weights or { "semantic_similarity": semantic_weight, "structural_similarity": structural_weight, "exact_match": exact_match_bonus } super().__init__(metric_weights=default_weights) self.llm_client = llm_client self.use_llm_analysis = use_llm_analysis and llm_client is not None # Cache for LLM analysis to reduce API calls self._analysis_cache: Dict[str, Dict] = {} logger.info(f"🎯 Universal Semantic Evaluator initialized") logger.info(f" LLM analysis: {'enabled' if self.use_llm_analysis else 'disabled (using heuristics)'}") logger.info(f" Weights: semantic={semantic_weight}, structural={structural_weight}, exact={exact_match_bonus}") def evaluate(self, predicted: Any, expected: Any) -> Dict[str, float]: """ Evaluate predicted output against expected output using semantic understanding. Args: predicted: The model's predicted output (string, dict, or any serializable type) expected: The ground truth expected output Returns: Dictionary with metrics including 'composite_score' (required for GEPA) """ # Convert to strings for comparison predicted_str = self._to_string(predicted) expected_str = self._to_string(expected) # Initialize result result = { "composite_score": 0.0, "exact_match": 0.0, "semantic_similarity": 0.0, "structural_similarity": 0.0, "predicted_output": predicted_str[:500], # Truncate for logging "expected_output": expected_str[:500], "analysis": {}, "improvement_feedback": "" } # Handle empty/missing outputs if not predicted_str or not predicted_str.strip(): result["improvement_feedback"] = "❌ Output is EMPTY. The prompt must instruct the model to produce output." result["analysis"] = {"status": "empty_predicted"} return result if not expected_str or not expected_str.strip(): result["improvement_feedback"] = "⚠️ Expected output is empty - cannot evaluate." result["analysis"] = {"status": "empty_expected"} result["composite_score"] = 0.5 # Neutral score return result # ───────────────────────────────────────────────────── # STEP 1: Exact Match Check (Fast Path) # ───────────────────────────────────────────────────── normalized_pred = self._normalize(predicted_str) normalized_exp = self._normalize(expected_str) if normalized_pred == normalized_exp: result["exact_match"] = 1.0 result["semantic_similarity"] = 1.0 result["structural_similarity"] = 1.0 result["composite_score"] = 1.0 result["improvement_feedback"] = "✅ Perfect match! Output exactly matches expected." result["analysis"] = {"status": "exact_match"} return result # ───────────────────────────────────────────────────── # STEP 1.5: FORMAT MISMATCH DETECTION (CRITICAL FIX) # ───────────────────────────────────────────────────── # 🔥 CRITICAL: Detect when expected is JSON but predicted is narrative text # This causes catastrophically low scores and needs explicit handling expected_is_json = self._try_parse_json(expected_str) is not None predicted_is_json = self._try_parse_json(predicted_str) is not None format_mismatch = expected_is_json and not predicted_is_json if format_mismatch: # Expected JSON but got narrative - this is a CRITICAL format error # Give partial credit for semantic content but penalize heavily for format result["analysis"]["format_mismatch"] = True result["improvement_feedback"] = ( "❌ FORMAT ERROR: Expected JSON output but received narrative text. " "The prompt MUST enforce JSON output format. " "Add explicit instructions like: 'Output ONLY valid JSON, no explanations.' " "Consider adding: 'Do NOT write prose or explanations.'" ) # Still evaluate semantic content but cap the score # This gives feedback for improving the prompt logger.warning(f"⚠️ Format mismatch: expected JSON ({len(expected_str)} chars), got narrative ({len(predicted_str)} chars)") # ───────────────────────────────────────────────────── # STEP 2: Structural Comparison (for JSON/structured data) # ───────────────────────────────────────────────────── structural_result = self._compare_structure(predicted_str, expected_str) result["structural_similarity"] = structural_result["score"] result["analysis"]["structural"] = structural_result.get("details", {}) # ───────────────────────────────────────────────────── # STEP 3: Semantic Analysis # ───────────────────────────────────────────────────── if self.use_llm_analysis: semantic_result = self._llm_semantic_analysis(predicted_str, expected_str) else: semantic_result = self._heuristic_semantic_analysis(predicted_str, expected_str) result["semantic_similarity"] = semantic_result["score"] result["analysis"]["semantic"] = semantic_result.get("details", {}) result["improvement_feedback"] = semantic_result.get("feedback", "") # ───────────────────────────────────────────────────── # STEP 4: Compute Composite Score # ───────────────────────────────────────────────────── weights = self.metric_weights composite = ( result["semantic_similarity"] * weights.get("semantic_similarity", 0.6) + result["structural_similarity"] * weights.get("structural_similarity", 0.25) + result["exact_match"] * weights.get("exact_match", 0.15) ) # 🔥 CRITICAL FIX: Apply format mismatch penalty # If expected JSON but got narrative, cap the score to encourage format compliance if result.get("analysis", {}).get("format_mismatch"): # Cap at 0.3 to indicate "partial semantic match but wrong format" # This ensures format-correct outputs always score higher composite = min(composite, 0.30) logger.debug(f"📊 Format mismatch penalty applied: score capped at {composite:.3f}") result["composite_score"] = min(max(composite, 0.0), 1.0) # Add score breakdown to feedback if not result["improvement_feedback"]: result["improvement_feedback"] = self._generate_default_feedback(result) # Log evaluation logger.debug(f"📊 Evaluation: composite={result['composite_score']:.3f}, " f"semantic={result['semantic_similarity']:.3f}, " f"structural={result['structural_similarity']:.3f}") # Debug logging removed - not needed in production return result def _to_string(self, value: Any) -> str: """Convert any value to string for comparison.""" if value is None: return "" if isinstance(value, str): return value.strip() if isinstance(value, dict): try: return json.dumps(value, sort_keys=True, indent=2) except (TypeError, ValueError): return str(value) if isinstance(value, (list, tuple)): try: return json.dumps(list(value), sort_keys=True) except (TypeError, ValueError): return str(value) return str(value).strip() def _normalize(self, text: str) -> str: """Normalize text for comparison (lowercase, whitespace).""" # Lowercase and normalize whitespace normalized = ' '.join(text.lower().split()) # Remove common punctuation that doesn't affect meaning normalized = re.sub(r'[.,;:!?\'"]+$', '', normalized) return normalized def _compare_structure(self, predicted: str, expected: str) -> Dict[str, Any]: """ Compare structural similarity (especially for JSON/structured outputs). Returns: Dict with 'score' (0.0-1.0) and 'details' """ result = {"score": 0.0, "details": {}} # Try to parse as JSON pred_json = self._try_parse_json(predicted) exp_json = self._try_parse_json(expected) if pred_json is not None and exp_json is not None: # Both are valid JSON - do structural comparison return self._compare_json_structures(pred_json, exp_json) # Fallback: Compare as text structure return self._compare_text_structure(predicted, expected) def _try_parse_json(self, text: str) -> Optional[Any]: """ Try to parse text as JSON with robust extraction. 🔥 FIX: LLMs often wrap JSON in markdown code blocks or add extra text. This method now handles multiple formats: - Direct JSON - ```json ... ``` blocks - ``` ... ``` blocks (no language tag) - JSON embedded in prose - Escaped newlines and quotes """ if not text or not isinstance(text, str): return None # 🔥 PREPROCESSING: Clean common LLM output issues cleaned = text.strip() # Remove BOM and other invisible characters cleaned = cleaned.lstrip('\ufeff\u200b\u200c\u200d') # Strategy 1: Try direct parse (cleanest case) try: return json.loads(cleaned) except json.JSONDecodeError: pass # Strategy 2: Extract JSON from markdown code block (```json ... ```) # More permissive regex that handles optional language tags json_match = re.search(r'```(?:json|JSON)?\s*([\{|\[].*?[\}|\]])\s*```', cleaned, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass # Strategy 3: Find JSON using balanced brace matching (handles nested objects) def extract_balanced_json(s: str, start_char: str, end_char: str) -> Optional[str]: """Extract JSON with balanced braces/brackets.""" count = 0 start_idx = -1 for i, char in enumerate(s): if char == start_char: if count == 0: start_idx = i count += 1 elif char == end_char: count -= 1 if count == 0 and start_idx >= 0: return s[start_idx:i+1] return None # Try to find JSON object json_obj = extract_balanced_json(cleaned, '{', '}') if json_obj: try: return json.loads(json_obj) except json.JSONDecodeError: # Try to repair common issues repaired = self._repair_json(json_obj) try: return json.loads(repaired) except json.JSONDecodeError: pass # Try to find JSON array json_arr = extract_balanced_json(cleaned, '[', ']') if json_arr: try: return json.loads(json_arr) except json.JSONDecodeError: repaired = self._repair_json(json_arr) try: return json.loads(repaired) except json.JSONDecodeError: pass return None def _repair_json(self, json_str: str) -> str: """ Attempt to repair common JSON issues from LLM output. Fixes: - Trailing commas before } or ] - Single quotes instead of double quotes - Unquoted keys - Comments (// and /* */) """ repaired = json_str # Remove trailing commas repaired = re.sub(r',\s*}', '}', repaired) repaired = re.sub(r',\s*]', ']', repaired) # Remove single-line comments repaired = re.sub(r'//[^\n]*', '', repaired) # Remove multi-line comments repaired = re.sub(r'/\*.*?\*/', '', repaired, flags=re.DOTALL) # Replace single quotes with double quotes (but be careful with apostrophes) # Only replace when it looks like a JSON delimiter def replace_single_quotes(match): content = match.group(0) # Skip if it looks like an apostrophe in a word if re.match(r"'\w+'\s*:", content) or re.match(r":\s*'[^']*'", content): return content.replace("'", '"') return content # Basic single quote replacement for keys repaired = re.sub(r"'([^']+)'\s*:", r'"\1":', repaired) return repaired def _compare_json_structures(self, pred: Any, exp: Any) -> Dict[str, Any]: """Compare two JSON structures.""" result = {"score": 0.0, "details": {"type": "json", "matches": [], "mismatches": []}} if type(pred) != type(exp): result["details"]["mismatches"].append(f"Type mismatch: predicted={type(pred).__name__}, expected={type(exp).__name__}") result["score"] = 0.2 # Some credit for being JSON return result if isinstance(pred, dict) and isinstance(exp, dict): return self._compare_dicts(pred, exp) elif isinstance(pred, list) and isinstance(exp, list): return self._compare_lists(pred, exp) else: # Primitive types if pred == exp: result["score"] = 1.0 result["details"]["matches"].append(f"Values match: {pred}") else: result["score"] = self._value_similarity(pred, exp) result["details"]["mismatches"].append(f"Value mismatch: predicted={pred}, expected={exp}") return result def _compare_dicts(self, pred: dict, exp: dict) -> Dict[str, Any]: """ Compare two dictionaries with CASE-INSENSITIVE key matching. 🔥 FIX: LLMs often produce keys like 'Category' when expected is 'category'. This method now normalizes keys before comparison for fair scoring. """ result = {"score": 0.0, "details": {"type": "dict", "matches": [], "mismatches": [], "missing_keys": [], "extra_keys": []}} # 🔥 NORMALIZE: Convert all keys to lowercase for comparison # Also handle common variations like underscores vs camelCase def normalize_key(key: str) -> str: """Normalize key: lowercase, underscores to nothing, strip spaces.""" return re.sub(r'[_\s-]', '', str(key).lower()) # Build normalized key mappings pred_normalized = {normalize_key(k): (k, v) for k, v in pred.items()} exp_normalized = {normalize_key(k): (k, v) for k, v in exp.items()} pred_norm_keys = set(pred_normalized.keys()) exp_norm_keys = set(exp_normalized.keys()) # Check for missing/extra keys (using normalized comparison) missing_norm = exp_norm_keys - pred_norm_keys extra_norm = pred_norm_keys - exp_norm_keys common_norm = pred_norm_keys & exp_norm_keys # Convert back to original key names for reporting missing = [exp_normalized[k][0] for k in missing_norm] extra = [pred_normalized[k][0] for k in extra_norm] result["details"]["missing_keys"] = missing result["details"]["extra_keys"] = extra if not exp_norm_keys: result["score"] = 1.0 if not pred_norm_keys else 0.5 return result # Score based on key overlap (normalized) key_score = len(common_norm) / len(exp_norm_keys) if exp_norm_keys else 1.0 # Score based on value matches value_scores = [] for norm_key in common_norm: pred_orig_key, pred_val = pred_normalized[norm_key] exp_orig_key, exp_val = exp_normalized[norm_key] if pred_val == exp_val: value_scores.append(1.0) result["details"]["matches"].append(f"{exp_orig_key}: {exp_val}") else: sim = self._value_similarity(pred_val, exp_val) value_scores.append(sim) if sim < 0.8: result["details"]["mismatches"].append(f"{exp_orig_key}: predicted={pred_val}, expected={exp_val}") value_score = sum(value_scores) / len(value_scores) if value_scores else 0.0 # Combine scores result["score"] = 0.3 * key_score + 0.7 * value_score # Penalty for missing keys (reduced from 0.1 to 0.05 per key) if missing: result["score"] *= (1 - 0.05 * len(missing)) result["score"] = max(0.0, min(1.0, result["score"])) return result def _compare_lists(self, pred: list, exp: list) -> Dict[str, Any]: """Compare two lists.""" result = {"score": 0.0, "details": {"type": "list", "length_match": False, "item_matches": 0}} if not exp: result["score"] = 1.0 if not pred else 0.5 return result result["details"]["length_match"] = len(pred) == len(exp) # Compare items (order-sensitive) matches = 0 for i, exp_item in enumerate(exp): if i < len(pred): if pred[i] == exp_item: matches += 1 else: # Check if item exists elsewhere if exp_item in pred: matches += 0.5 # Partial credit for wrong position result["details"]["item_matches"] = matches result["score"] = matches / len(exp) # Penalty for length mismatch if len(pred) != len(exp): len_ratio = min(len(pred), len(exp)) / max(len(pred), len(exp)) result["score"] *= (0.7 + 0.3 * len_ratio) return result def _value_similarity(self, pred: Any, exp: Any) -> float: """ Calculate similarity between two values. 🔥 ENHANCED: Now handles: - Case-insensitive string comparison - Semantic similarity for common variations - Underscore/space/dash normalization - Numeric comparison with tolerance """ # Same value (exact match) if pred == exp: return 1.0 # Numeric comparison try: pred_num = float(pred) exp_num = float(exp) if exp_num == 0: return 1.0 if pred_num == 0 else 0.0 # Relative error with tolerance error = abs(pred_num - exp_num) / abs(exp_num) return max(0.0, 1.0 - error) except (ValueError, TypeError): pass # String comparison with normalization pred_str = str(pred).strip() exp_str = str(exp).strip() # Case-insensitive exact match if pred_str.lower() == exp_str.lower(): return 0.98 # Slight penalty for case mismatch # Normalize strings (remove underscores, spaces, dashes for comparison) def normalize_str(s: str) -> str: return re.sub(r'[_\s\-]+', '', s.lower()) pred_norm = normalize_str(pred_str) exp_norm = normalize_str(exp_str) if pred_norm == exp_norm: return 0.95 # Good match despite formatting differences # Check if one contains the other (partial match) if pred_norm in exp_norm or exp_norm in pred_norm: ratio = min(len(pred_norm), len(exp_norm)) / max(len(pred_norm), len(exp_norm)) return 0.7 + (0.2 * ratio) # 0.7-0.9 for partial matches # 🔥 SEMANTIC SIMILARITY: Check for common equivalent terms semantic_equivalents = { # Priority levels 'low': ['low', 'minor', 'trivial', 'p3', 'p4'], 'medium': ['medium', 'normal', 'moderate', 'p2'], 'high': ['high', 'important', 'major', 'p1', 'critical', 'urgent'], # Boolean variations 'true': ['true', 'yes', '1', 'on', 'enabled'], 'false': ['false', 'no', '0', 'off', 'disabled'], # Status variations 'success': ['success', 'succeeded', 'completed', 'done', 'passed'], 'failure': ['failure', 'failed', 'error', 'crashed'], 'pending': ['pending', 'waiting', 'queued', 'in_progress', 'processing'], } for canonical, equivalents in semantic_equivalents.items(): pred_match = any(eq in pred_norm for eq in equivalents) exp_match = any(eq in exp_norm for eq in equivalents) if pred_match and exp_match: return 0.85 # Semantic match # Sequence matching (character-level similarity) ratio = SequenceMatcher(None, pred_str.lower(), exp_str.lower()).ratio() # 🔥 WORD-LEVEL SIMILARITY: Check word overlap pred_words = set(re.findall(r'\w+', pred_str.lower())) exp_words = set(re.findall(r'\w+', exp_str.lower())) if pred_words and exp_words: word_overlap = len(pred_words & exp_words) / max(len(pred_words), len(exp_words)) # Combine character and word similarity return max(ratio, word_overlap * 0.9) def _compare_text_structure(self, predicted: str, expected: str) -> Dict[str, Any]: """Compare text structure when not JSON.""" result = {"score": 0.0, "details": {"type": "text"}} # Word overlap pred_words = set(predicted.lower().split()) exp_words = set(expected.lower().split()) if not exp_words: result["score"] = 1.0 if not pred_words else 0.5 return result overlap = len(pred_words & exp_words) result["details"]["word_overlap"] = overlap result["details"]["expected_words"] = len(exp_words) # Jaccard similarity union = len(pred_words | exp_words) result["score"] = overlap / union if union > 0 else 0.0 return result def _llm_semantic_analysis(self, predicted: str, expected: str) -> Dict[str, Any]: """ Use LLM for semantic analysis of predicted vs expected. Uses XML-delimited prompt structure to prevent context bleeding and Multi-Dimensional Scoring (Semantics vs. Syntax). Returns: Dict with 'score' (0.0-1.0), 'details', and 'feedback' """ # Check cache cache_key = f"{hash(predicted)}:{hash(expected)}" if cache_key in self._analysis_cache: return self._analysis_cache[cache_key] result = {"score": 0.0, "details": {}, "feedback": ""} try: # Truncate for token limits but preserve enough context expected_truncated = expected[:10000] predicted_truncated = predicted[:10000] # OPTIMIZED: Penalty-based scoring with self-verification # Starts at 1.0 and deducts for failures - more consistent than subjective scoring analysis_prompt = f""" You are a **Semantic Logic Engine** tasked with grading AI performance. You must compare a [PREDICTED] output against a [EXPECTED] truth. {expected_truncated} {predicted_truncated} Calculate the score based on these STRICT rules. Start with 1.0 and deduct penalties. 1. **Information Completeness (Max -0.5)**: - If key facts/fields are missing, deduct proportional to importance. - If a nested JSON field is missing, deduct 0.1 per field. 2. **Accuracy & Hallucination (Max -1.0)**: - If factual numbers/IDs are wrong: Score = 0 immediately. - If the model invents information NOT in the input: Deduct 0.3. 3. **Format Compliance (Max -0.3)**: - If JSON is requested but Markdown is returned: Deduct 0.3. - If keys are lowercase instead of snake_case: Deduct 0.1. 4. **Semantic Equivalence (No Penalty)**: - Synonyms are ACCEPTED (e.g., "Purchase" == "Buy"). - Formatting differences (whitespace) are IGNORED. Before finalizing the score, ask: "If I used the predicted output in code expecting the original output, would the code crash?" - If YES (Crash) -> Score must be < 0.5. - If NO (Safe) -> Score can be high. Return JSON ONLY: {{ "semantic_similarity": 0.0-1.0, "structural_similarity": 0.0-1.0, "verdict": "PERFECT" | "ACCEPTABLE" | "FORMAT_ERROR" | "DATA_CORRUPTION", "critical_failures": ["List specific failures that caused score < 1.0"], "penalty_breakdown": {{"completeness": -0.0, "accuracy": -0.0, "format": -0.0}}, "fix_directive": "Imperative command to fix the prompt" }} """ response = self.llm_client.generate( system_prompt="You are a Semantic Logic Engine. Calculate scores using penalty-based deduction from 1.0. Respond only with valid JSON.", user_prompt=analysis_prompt, image_base64="" ) content = response.get("content", str(response)) if isinstance(response, dict) else str(response) # Parse JSON response analysis = self._extract_json_from_response(content) if analysis: # Extract semantic similarity (primary score) semantic_sim = float(analysis.get("semantic_similarity", 0.5)) structural_sim = float(analysis.get("structural_similarity", semantic_sim)) # Compute weighted score based on verdict (updated for new schema) verdict = analysis.get("verdict", "ACCEPTABLE") verdict_multiplier = { "PERFECT": 1.0, "ACCEPTABLE": 0.85, "FORMAT_ERROR": 0.6, # New: was WRONG_FORMAT "DATA_CORRUPTION": 0.1, # New: replaces WRONG_CONTENT + HALLUCINATION # Legacy support "WRONG_FORMAT": 0.6, "WRONG_CONTENT": 0.3, "HALLUCINATION": 0.1 }.get(verdict, 0.5) # Final score: weighted combination result["score"] = min(1.0, semantic_sim * 0.6 + structural_sim * 0.3 + verdict_multiplier * 0.1) # Extract penalty breakdown if available penalty_breakdown = analysis.get("penalty_breakdown", {}) critical_failures = analysis.get("critical_failures", []) result["details"] = { "verdict": verdict, "semantic_similarity": semantic_sim, "structural_similarity": structural_sim, "critical_failures": critical_failures, "penalty_breakdown": penalty_breakdown, # Legacy field support "key_matches": analysis.get("key_matches", []), "key_differences": analysis.get("key_differences", critical_failures), "value_errors": analysis.get("value_errors", {}), "reasoning": analysis.get("reasoning", "") } result["feedback"] = analysis.get("fix_directive", "") else: # Fallback if JSON parsing fails result = self._heuristic_semantic_analysis(predicted, expected) # Cache result self._analysis_cache[cache_key] = result except Exception as e: logger.warning(f"LLM semantic analysis failed: {e}, falling back to heuristics") result = self._heuristic_semantic_analysis(predicted, expected) return result def _extract_json_from_response(self, content: str) -> Optional[Dict]: """Extract JSON from LLM response.""" # Try to find JSON in response json_match = re.search(r'\{[\s\S]*\}', content) if json_match: try: return json.loads(json_match.group(0)) except json.JSONDecodeError: pass return None def _heuristic_semantic_analysis(self, predicted: str, expected: str) -> Dict[str, Any]: """ Heuristic-based semantic analysis when LLM is not available. Uses multiple signals: - Word overlap (Jaccard) - Sequence matching (SequenceMatcher) - Number extraction and comparison - Key phrase matching """ result = {"score": 0.0, "details": {}, "feedback": ""} pred_lower = predicted.lower() exp_lower = expected.lower() # 1. Sequence similarity seq_sim = SequenceMatcher(None, pred_lower, exp_lower).ratio() # 2. Word overlap (Jaccard) pred_words = set(pred_lower.split()) exp_words = set(exp_lower.split()) jaccard = len(pred_words & exp_words) / len(pred_words | exp_words) if (pred_words | exp_words) else 0.0 # 3. Number comparison pred_nums = re.findall(r'-?\d+\.?\d*', predicted) exp_nums = re.findall(r'-?\d+\.?\d*', expected) num_score = 1.0 num_errors = [] if exp_nums: matches = 0 for exp_num in exp_nums: if exp_num in pred_nums: matches += 1 else: # Check for close matches try: exp_val = float(exp_num) for pred_num in pred_nums: pred_val = float(pred_num) if abs(pred_val - exp_val) <= 1: # Off by 1 matches += 0.9 num_errors.append(f"Number close: expected {exp_num}, got {pred_num}") break else: num_errors.append(f"Number missing: expected {exp_num}") except ValueError: pass num_score = matches / len(exp_nums) if exp_nums else 1.0 # 4. Key entity extraction (simple approach) # Look for capitalized words, quoted strings, etc. pred_entities = set(re.findall(r'\b[A-Z][a-z]+\b', predicted)) exp_entities = set(re.findall(r'\b[A-Z][a-z]+\b', expected)) entity_overlap = len(pred_entities & exp_entities) / len(exp_entities) if exp_entities else 1.0 # Combine scores result["score"] = ( 0.3 * seq_sim + 0.25 * jaccard + 0.25 * num_score + 0.2 * entity_overlap ) result["details"] = { "sequence_similarity": seq_sim, "word_overlap": jaccard, "number_accuracy": num_score, "entity_overlap": entity_overlap, "number_errors": num_errors } # Generate feedback feedback_parts = [] if jaccard < 0.5: feedback_parts.append("Low word overlap - output may be missing key terms.") if num_errors: feedback_parts.append(f"Number issues: {'; '.join(num_errors[:3])}") if entity_overlap < 0.5 and exp_entities: missing = exp_entities - pred_entities feedback_parts.append(f"Missing entities: {', '.join(list(missing)[:3])}") if feedback_parts: result["feedback"] = " | ".join(feedback_parts) else: result["feedback"] = "Output is semantically similar but not exact match." return result def _generate_default_feedback(self, result: Dict) -> str: """Generate default feedback based on scores.""" score = result["composite_score"] semantic = result["semantic_similarity"] structural = result["structural_similarity"] if score >= 0.9: return "✅ Excellent match! Minor differences only." elif score >= 0.7: return f"⚠️ Good match (semantic={semantic:.0%}, structural={structural:.0%}). Some differences to address." elif score >= 0.5: return f"⚠️ Partial match (semantic={semantic:.0%}, structural={structural:.0%}). Significant differences found." else: return f"❌ Poor match (semantic={semantic:.0%}, structural={structural:.0%}). Major issues to fix." def get_evaluation_summary(self, results: List[Dict]) -> Dict[str, Any]: """ Get summary statistics for a batch of evaluations. Args: results: List of evaluation result dictionaries Returns: Summary statistics """ if not results: return { "total_samples": 0, "accuracy": 0.0, "avg_semantic_similarity": 0.0, "avg_structural_similarity": 0.0 } total = len(results) scores = [r.get("composite_score", 0.0) for r in results] semantic_scores = [r.get("semantic_similarity", 0.0) for r in results] structural_scores = [r.get("structural_similarity", 0.0) for r in results] return { "total_samples": total, "accuracy": sum(1 for s in scores if s >= 0.8) / total, "avg_composite_score": sum(scores) / total, "avg_semantic_similarity": sum(semantic_scores) / total, "avg_structural_similarity": sum(structural_scores) / total, "min_score": min(scores), "max_score": max(scores) } # Convenience function to create evaluator def create_universal_evaluator(llm_client=None) -> UniversalSemanticEvaluator: """ Create a Universal Semantic Evaluator. Args: llm_client: Optional LLM client for semantic analysis. If not provided, uses heuristic-based analysis. Returns: Configured UniversalSemanticEvaluator instance """ return UniversalSemanticEvaluator( llm_client=llm_client, use_llm_analysis=llm_client is not None )