Spaces:
Sleeping
Sleeping
| """ | |
| Universal Semantic Evaluator for ANY prompt optimization use case. | |
| This evaluator uses LLM-powered semantic analysis to compare predicted vs expected outputs, | |
| enabling prompt optimization for ANY task without requiring custom evaluator code. | |
| Key Features: | |
| - Semantic understanding (not just string matching) | |
| - Works with text, JSON, numbers, structured outputs | |
| - Provides rich feedback for GEPA reflection | |
| - No task-specific assumptions | |
| """ | |
| import json | |
| import re | |
| import logging | |
| from typing import Dict, Any, Optional, List | |
| from difflib import SequenceMatcher | |
| from .base_evaluator import BaseEvaluator | |
| logger = logging.getLogger(__name__) | |
| class UniversalSemanticEvaluator(BaseEvaluator): | |
| """ | |
| Universal evaluator using LLM for semantic comparison. | |
| Works for ANY task without hardcoded assumptions: | |
| - Text outputs: "The answer is 42" vs "42" | |
| - JSON outputs: {"count": 23} vs {"count": 22} | |
| - Structured data: Lists, nested objects | |
| - Multi-modal: Image descriptions, analysis results | |
| Evaluation Strategy: | |
| 1. Quick checks (exact match, empty handling) | |
| 2. Structural comparison (for JSON/structured data) | |
| 3. LLM semantic analysis (for meaning understanding) | |
| 4. Combine into composite score with rich feedback | |
| """ | |
| def __init__( | |
| self, | |
| llm_client=None, | |
| use_llm_analysis: bool = True, | |
| semantic_weight: float = 0.6, | |
| structural_weight: float = 0.25, | |
| exact_match_bonus: float = 0.15, | |
| metric_weights: Optional[Dict[str, float]] = None | |
| ): | |
| """ | |
| Initialize Universal Semantic Evaluator. | |
| Args: | |
| llm_client: LLM client for semantic analysis (optional, falls back to heuristics) | |
| use_llm_analysis: Whether to use LLM for semantic comparison | |
| semantic_weight: Weight for semantic similarity (0.0-1.0) | |
| structural_weight: Weight for structural similarity (0.0-1.0) | |
| exact_match_bonus: Bonus weight for exact matches (0.0-1.0) | |
| metric_weights: Optional custom weights (overrides above) | |
| """ | |
| default_weights = metric_weights or { | |
| "semantic_similarity": semantic_weight, | |
| "structural_similarity": structural_weight, | |
| "exact_match": exact_match_bonus | |
| } | |
| super().__init__(metric_weights=default_weights) | |
| self.llm_client = llm_client | |
| self.use_llm_analysis = use_llm_analysis and llm_client is not None | |
| # Cache for LLM analysis to reduce API calls | |
| self._analysis_cache: Dict[str, Dict] = {} | |
| logger.info(f"🎯 Universal Semantic Evaluator initialized") | |
| logger.info(f" LLM analysis: {'enabled' if self.use_llm_analysis else 'disabled (using heuristics)'}") | |
| logger.info(f" Weights: semantic={semantic_weight}, structural={structural_weight}, exact={exact_match_bonus}") | |
| def evaluate(self, predicted: Any, expected: Any) -> Dict[str, float]: | |
| """ | |
| Evaluate predicted output against expected output using semantic understanding. | |
| Args: | |
| predicted: The model's predicted output (string, dict, or any serializable type) | |
| expected: The ground truth expected output | |
| Returns: | |
| Dictionary with metrics including 'composite_score' (required for GEPA) | |
| """ | |
| # Convert to strings for comparison | |
| predicted_str = self._to_string(predicted) | |
| expected_str = self._to_string(expected) | |
| # Initialize result | |
| result = { | |
| "composite_score": 0.0, | |
| "exact_match": 0.0, | |
| "semantic_similarity": 0.0, | |
| "structural_similarity": 0.0, | |
| "predicted_output": predicted_str[:500], # Truncate for logging | |
| "expected_output": expected_str[:500], | |
| "analysis": {}, | |
| "improvement_feedback": "" | |
| } | |
| # Handle empty/missing outputs | |
| if not predicted_str or not predicted_str.strip(): | |
| result["improvement_feedback"] = "❌ Output is EMPTY. The prompt must instruct the model to produce output." | |
| result["analysis"] = {"status": "empty_predicted"} | |
| return result | |
| if not expected_str or not expected_str.strip(): | |
| result["improvement_feedback"] = "⚠️ Expected output is empty - cannot evaluate." | |
| result["analysis"] = {"status": "empty_expected"} | |
| result["composite_score"] = 0.5 # Neutral score | |
| return result | |
| # ───────────────────────────────────────────────────── | |
| # STEP 1: Exact Match Check (Fast Path) | |
| # ───────────────────────────────────────────────────── | |
| normalized_pred = self._normalize(predicted_str) | |
| normalized_exp = self._normalize(expected_str) | |
| if normalized_pred == normalized_exp: | |
| result["exact_match"] = 1.0 | |
| result["semantic_similarity"] = 1.0 | |
| result["structural_similarity"] = 1.0 | |
| result["composite_score"] = 1.0 | |
| result["improvement_feedback"] = "✅ Perfect match! Output exactly matches expected." | |
| result["analysis"] = {"status": "exact_match"} | |
| return result | |
| # ───────────────────────────────────────────────────── | |
| # STEP 1.5: FORMAT MISMATCH DETECTION (CRITICAL FIX) | |
| # ───────────────────────────────────────────────────── | |
| # 🔥 CRITICAL: Detect when expected is JSON but predicted is narrative text | |
| # This causes catastrophically low scores and needs explicit handling | |
| expected_is_json = self._try_parse_json(expected_str) is not None | |
| predicted_is_json = self._try_parse_json(predicted_str) is not None | |
| format_mismatch = expected_is_json and not predicted_is_json | |
| if format_mismatch: | |
| # Expected JSON but got narrative - this is a CRITICAL format error | |
| # Give partial credit for semantic content but penalize heavily for format | |
| result["analysis"]["format_mismatch"] = True | |
| result["improvement_feedback"] = ( | |
| "❌ FORMAT ERROR: Expected JSON output but received narrative text. " | |
| "The prompt MUST enforce JSON output format. " | |
| "Add explicit instructions like: 'Output ONLY valid JSON, no explanations.' " | |
| "Consider adding: 'Do NOT write prose or explanations.'" | |
| ) | |
| # Still evaluate semantic content but cap the score | |
| # This gives feedback for improving the prompt | |
| logger.warning(f"⚠️ Format mismatch: expected JSON ({len(expected_str)} chars), got narrative ({len(predicted_str)} chars)") | |
| # ───────────────────────────────────────────────────── | |
| # STEP 2: Structural Comparison (for JSON/structured data) | |
| # ───────────────────────────────────────────────────── | |
| structural_result = self._compare_structure(predicted_str, expected_str) | |
| result["structural_similarity"] = structural_result["score"] | |
| result["analysis"]["structural"] = structural_result.get("details", {}) | |
| # ───────────────────────────────────────────────────── | |
| # STEP 3: Semantic Analysis | |
| # ───────────────────────────────────────────────────── | |
| if self.use_llm_analysis: | |
| semantic_result = self._llm_semantic_analysis(predicted_str, expected_str) | |
| else: | |
| semantic_result = self._heuristic_semantic_analysis(predicted_str, expected_str) | |
| result["semantic_similarity"] = semantic_result["score"] | |
| result["analysis"]["semantic"] = semantic_result.get("details", {}) | |
| result["improvement_feedback"] = semantic_result.get("feedback", "") | |
| # ───────────────────────────────────────────────────── | |
| # STEP 4: Compute Composite Score | |
| # ───────────────────────────────────────────────────── | |
| weights = self.metric_weights | |
| composite = ( | |
| result["semantic_similarity"] * weights.get("semantic_similarity", 0.6) + | |
| result["structural_similarity"] * weights.get("structural_similarity", 0.25) + | |
| result["exact_match"] * weights.get("exact_match", 0.15) | |
| ) | |
| # 🔥 CRITICAL FIX: Apply format mismatch penalty | |
| # If expected JSON but got narrative, cap the score to encourage format compliance | |
| if result.get("analysis", {}).get("format_mismatch"): | |
| # Cap at 0.3 to indicate "partial semantic match but wrong format" | |
| # This ensures format-correct outputs always score higher | |
| composite = min(composite, 0.30) | |
| logger.debug(f"📊 Format mismatch penalty applied: score capped at {composite:.3f}") | |
| result["composite_score"] = min(max(composite, 0.0), 1.0) | |
| # Add score breakdown to feedback | |
| if not result["improvement_feedback"]: | |
| result["improvement_feedback"] = self._generate_default_feedback(result) | |
| # Log evaluation | |
| logger.debug(f"📊 Evaluation: composite={result['composite_score']:.3f}, " | |
| f"semantic={result['semantic_similarity']:.3f}, " | |
| f"structural={result['structural_similarity']:.3f}") | |
| # Debug logging removed - not needed in production | |
| return result | |
| def _to_string(self, value: Any) -> str: | |
| """Convert any value to string for comparison.""" | |
| if value is None: | |
| return "" | |
| if isinstance(value, str): | |
| return value.strip() | |
| if isinstance(value, dict): | |
| try: | |
| return json.dumps(value, sort_keys=True, indent=2) | |
| except (TypeError, ValueError): | |
| return str(value) | |
| if isinstance(value, (list, tuple)): | |
| try: | |
| return json.dumps(list(value), sort_keys=True) | |
| except (TypeError, ValueError): | |
| return str(value) | |
| return str(value).strip() | |
| def _normalize(self, text: str) -> str: | |
| """Normalize text for comparison (lowercase, whitespace).""" | |
| # Lowercase and normalize whitespace | |
| normalized = ' '.join(text.lower().split()) | |
| # Remove common punctuation that doesn't affect meaning | |
| normalized = re.sub(r'[.,;:!?\'"]+$', '', normalized) | |
| return normalized | |
| def _compare_structure(self, predicted: str, expected: str) -> Dict[str, Any]: | |
| """ | |
| Compare structural similarity (especially for JSON/structured outputs). | |
| Returns: | |
| Dict with 'score' (0.0-1.0) and 'details' | |
| """ | |
| result = {"score": 0.0, "details": {}} | |
| # Try to parse as JSON | |
| pred_json = self._try_parse_json(predicted) | |
| exp_json = self._try_parse_json(expected) | |
| if pred_json is not None and exp_json is not None: | |
| # Both are valid JSON - do structural comparison | |
| return self._compare_json_structures(pred_json, exp_json) | |
| # Fallback: Compare as text structure | |
| return self._compare_text_structure(predicted, expected) | |
| def _try_parse_json(self, text: str) -> Optional[Any]: | |
| """ | |
| Try to parse text as JSON with robust extraction. | |
| 🔥 FIX: LLMs often wrap JSON in markdown code blocks or add extra text. | |
| This method now handles multiple formats: | |
| - Direct JSON | |
| - ```json ... ``` blocks | |
| - ``` ... ``` blocks (no language tag) | |
| - JSON embedded in prose | |
| - Escaped newlines and quotes | |
| """ | |
| if not text or not isinstance(text, str): | |
| return None | |
| # 🔥 PREPROCESSING: Clean common LLM output issues | |
| cleaned = text.strip() | |
| # Remove BOM and other invisible characters | |
| cleaned = cleaned.lstrip('\ufeff\u200b\u200c\u200d') | |
| # Strategy 1: Try direct parse (cleanest case) | |
| try: | |
| return json.loads(cleaned) | |
| except json.JSONDecodeError: | |
| pass | |
| # Strategy 2: Extract JSON from markdown code block (```json ... ```) | |
| # More permissive regex that handles optional language tags | |
| json_match = re.search(r'```(?:json|JSON)?\s*([\{|\[].*?[\}|\]])\s*```', cleaned, re.DOTALL) | |
| if json_match: | |
| try: | |
| return json.loads(json_match.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| # Strategy 3: Find JSON using balanced brace matching (handles nested objects) | |
| def extract_balanced_json(s: str, start_char: str, end_char: str) -> Optional[str]: | |
| """Extract JSON with balanced braces/brackets.""" | |
| count = 0 | |
| start_idx = -1 | |
| for i, char in enumerate(s): | |
| if char == start_char: | |
| if count == 0: | |
| start_idx = i | |
| count += 1 | |
| elif char == end_char: | |
| count -= 1 | |
| if count == 0 and start_idx >= 0: | |
| return s[start_idx:i+1] | |
| return None | |
| # Try to find JSON object | |
| json_obj = extract_balanced_json(cleaned, '{', '}') | |
| if json_obj: | |
| try: | |
| return json.loads(json_obj) | |
| except json.JSONDecodeError: | |
| # Try to repair common issues | |
| repaired = self._repair_json(json_obj) | |
| try: | |
| return json.loads(repaired) | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to find JSON array | |
| json_arr = extract_balanced_json(cleaned, '[', ']') | |
| if json_arr: | |
| try: | |
| return json.loads(json_arr) | |
| except json.JSONDecodeError: | |
| repaired = self._repair_json(json_arr) | |
| try: | |
| return json.loads(repaired) | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def _repair_json(self, json_str: str) -> str: | |
| """ | |
| Attempt to repair common JSON issues from LLM output. | |
| Fixes: | |
| - Trailing commas before } or ] | |
| - Single quotes instead of double quotes | |
| - Unquoted keys | |
| - Comments (// and /* */) | |
| """ | |
| repaired = json_str | |
| # Remove trailing commas | |
| repaired = re.sub(r',\s*}', '}', repaired) | |
| repaired = re.sub(r',\s*]', ']', repaired) | |
| # Remove single-line comments | |
| repaired = re.sub(r'//[^\n]*', '', repaired) | |
| # Remove multi-line comments | |
| repaired = re.sub(r'/\*.*?\*/', '', repaired, flags=re.DOTALL) | |
| # Replace single quotes with double quotes (but be careful with apostrophes) | |
| # Only replace when it looks like a JSON delimiter | |
| def replace_single_quotes(match): | |
| content = match.group(0) | |
| # Skip if it looks like an apostrophe in a word | |
| if re.match(r"'\w+'\s*:", content) or re.match(r":\s*'[^']*'", content): | |
| return content.replace("'", '"') | |
| return content | |
| # Basic single quote replacement for keys | |
| repaired = re.sub(r"'([^']+)'\s*:", r'"\1":', repaired) | |
| return repaired | |
| def _compare_json_structures(self, pred: Any, exp: Any) -> Dict[str, Any]: | |
| """Compare two JSON structures.""" | |
| result = {"score": 0.0, "details": {"type": "json", "matches": [], "mismatches": []}} | |
| if type(pred) != type(exp): | |
| result["details"]["mismatches"].append(f"Type mismatch: predicted={type(pred).__name__}, expected={type(exp).__name__}") | |
| result["score"] = 0.2 # Some credit for being JSON | |
| return result | |
| if isinstance(pred, dict) and isinstance(exp, dict): | |
| return self._compare_dicts(pred, exp) | |
| elif isinstance(pred, list) and isinstance(exp, list): | |
| return self._compare_lists(pred, exp) | |
| else: | |
| # Primitive types | |
| if pred == exp: | |
| result["score"] = 1.0 | |
| result["details"]["matches"].append(f"Values match: {pred}") | |
| else: | |
| result["score"] = self._value_similarity(pred, exp) | |
| result["details"]["mismatches"].append(f"Value mismatch: predicted={pred}, expected={exp}") | |
| return result | |
| def _compare_dicts(self, pred: dict, exp: dict) -> Dict[str, Any]: | |
| """ | |
| Compare two dictionaries with CASE-INSENSITIVE key matching. | |
| 🔥 FIX: LLMs often produce keys like 'Category' when expected is 'category'. | |
| This method now normalizes keys before comparison for fair scoring. | |
| """ | |
| result = {"score": 0.0, "details": {"type": "dict", "matches": [], "mismatches": [], "missing_keys": [], "extra_keys": []}} | |
| # 🔥 NORMALIZE: Convert all keys to lowercase for comparison | |
| # Also handle common variations like underscores vs camelCase | |
| def normalize_key(key: str) -> str: | |
| """Normalize key: lowercase, underscores to nothing, strip spaces.""" | |
| return re.sub(r'[_\s-]', '', str(key).lower()) | |
| # Build normalized key mappings | |
| pred_normalized = {normalize_key(k): (k, v) for k, v in pred.items()} | |
| exp_normalized = {normalize_key(k): (k, v) for k, v in exp.items()} | |
| pred_norm_keys = set(pred_normalized.keys()) | |
| exp_norm_keys = set(exp_normalized.keys()) | |
| # Check for missing/extra keys (using normalized comparison) | |
| missing_norm = exp_norm_keys - pred_norm_keys | |
| extra_norm = pred_norm_keys - exp_norm_keys | |
| common_norm = pred_norm_keys & exp_norm_keys | |
| # Convert back to original key names for reporting | |
| missing = [exp_normalized[k][0] for k in missing_norm] | |
| extra = [pred_normalized[k][0] for k in extra_norm] | |
| result["details"]["missing_keys"] = missing | |
| result["details"]["extra_keys"] = extra | |
| if not exp_norm_keys: | |
| result["score"] = 1.0 if not pred_norm_keys else 0.5 | |
| return result | |
| # Score based on key overlap (normalized) | |
| key_score = len(common_norm) / len(exp_norm_keys) if exp_norm_keys else 1.0 | |
| # Score based on value matches | |
| value_scores = [] | |
| for norm_key in common_norm: | |
| pred_orig_key, pred_val = pred_normalized[norm_key] | |
| exp_orig_key, exp_val = exp_normalized[norm_key] | |
| if pred_val == exp_val: | |
| value_scores.append(1.0) | |
| result["details"]["matches"].append(f"{exp_orig_key}: {exp_val}") | |
| else: | |
| sim = self._value_similarity(pred_val, exp_val) | |
| value_scores.append(sim) | |
| if sim < 0.8: | |
| result["details"]["mismatches"].append(f"{exp_orig_key}: predicted={pred_val}, expected={exp_val}") | |
| value_score = sum(value_scores) / len(value_scores) if value_scores else 0.0 | |
| # Combine scores | |
| result["score"] = 0.3 * key_score + 0.7 * value_score | |
| # Penalty for missing keys (reduced from 0.1 to 0.05 per key) | |
| if missing: | |
| result["score"] *= (1 - 0.05 * len(missing)) | |
| result["score"] = max(0.0, min(1.0, result["score"])) | |
| return result | |
| def _compare_lists(self, pred: list, exp: list) -> Dict[str, Any]: | |
| """Compare two lists.""" | |
| result = {"score": 0.0, "details": {"type": "list", "length_match": False, "item_matches": 0}} | |
| if not exp: | |
| result["score"] = 1.0 if not pred else 0.5 | |
| return result | |
| result["details"]["length_match"] = len(pred) == len(exp) | |
| # Compare items (order-sensitive) | |
| matches = 0 | |
| for i, exp_item in enumerate(exp): | |
| if i < len(pred): | |
| if pred[i] == exp_item: | |
| matches += 1 | |
| else: | |
| # Check if item exists elsewhere | |
| if exp_item in pred: | |
| matches += 0.5 # Partial credit for wrong position | |
| result["details"]["item_matches"] = matches | |
| result["score"] = matches / len(exp) | |
| # Penalty for length mismatch | |
| if len(pred) != len(exp): | |
| len_ratio = min(len(pred), len(exp)) / max(len(pred), len(exp)) | |
| result["score"] *= (0.7 + 0.3 * len_ratio) | |
| return result | |
| def _value_similarity(self, pred: Any, exp: Any) -> float: | |
| """ | |
| Calculate similarity between two values. | |
| 🔥 ENHANCED: Now handles: | |
| - Case-insensitive string comparison | |
| - Semantic similarity for common variations | |
| - Underscore/space/dash normalization | |
| - Numeric comparison with tolerance | |
| """ | |
| # Same value (exact match) | |
| if pred == exp: | |
| return 1.0 | |
| # Numeric comparison | |
| try: | |
| pred_num = float(pred) | |
| exp_num = float(exp) | |
| if exp_num == 0: | |
| return 1.0 if pred_num == 0 else 0.0 | |
| # Relative error with tolerance | |
| error = abs(pred_num - exp_num) / abs(exp_num) | |
| return max(0.0, 1.0 - error) | |
| except (ValueError, TypeError): | |
| pass | |
| # String comparison with normalization | |
| pred_str = str(pred).strip() | |
| exp_str = str(exp).strip() | |
| # Case-insensitive exact match | |
| if pred_str.lower() == exp_str.lower(): | |
| return 0.98 # Slight penalty for case mismatch | |
| # Normalize strings (remove underscores, spaces, dashes for comparison) | |
| def normalize_str(s: str) -> str: | |
| return re.sub(r'[_\s\-]+', '', s.lower()) | |
| pred_norm = normalize_str(pred_str) | |
| exp_norm = normalize_str(exp_str) | |
| if pred_norm == exp_norm: | |
| return 0.95 # Good match despite formatting differences | |
| # Check if one contains the other (partial match) | |
| if pred_norm in exp_norm or exp_norm in pred_norm: | |
| ratio = min(len(pred_norm), len(exp_norm)) / max(len(pred_norm), len(exp_norm)) | |
| return 0.7 + (0.2 * ratio) # 0.7-0.9 for partial matches | |
| # 🔥 SEMANTIC SIMILARITY: Check for common equivalent terms | |
| semantic_equivalents = { | |
| # Priority levels | |
| 'low': ['low', 'minor', 'trivial', 'p3', 'p4'], | |
| 'medium': ['medium', 'normal', 'moderate', 'p2'], | |
| 'high': ['high', 'important', 'major', 'p1', 'critical', 'urgent'], | |
| # Boolean variations | |
| 'true': ['true', 'yes', '1', 'on', 'enabled'], | |
| 'false': ['false', 'no', '0', 'off', 'disabled'], | |
| # Status variations | |
| 'success': ['success', 'succeeded', 'completed', 'done', 'passed'], | |
| 'failure': ['failure', 'failed', 'error', 'crashed'], | |
| 'pending': ['pending', 'waiting', 'queued', 'in_progress', 'processing'], | |
| } | |
| for canonical, equivalents in semantic_equivalents.items(): | |
| pred_match = any(eq in pred_norm for eq in equivalents) | |
| exp_match = any(eq in exp_norm for eq in equivalents) | |
| if pred_match and exp_match: | |
| return 0.85 # Semantic match | |
| # Sequence matching (character-level similarity) | |
| ratio = SequenceMatcher(None, pred_str.lower(), exp_str.lower()).ratio() | |
| # 🔥 WORD-LEVEL SIMILARITY: Check word overlap | |
| pred_words = set(re.findall(r'\w+', pred_str.lower())) | |
| exp_words = set(re.findall(r'\w+', exp_str.lower())) | |
| if pred_words and exp_words: | |
| word_overlap = len(pred_words & exp_words) / max(len(pred_words), len(exp_words)) | |
| # Combine character and word similarity | |
| return max(ratio, word_overlap * 0.9) | |
| def _compare_text_structure(self, predicted: str, expected: str) -> Dict[str, Any]: | |
| """Compare text structure when not JSON.""" | |
| result = {"score": 0.0, "details": {"type": "text"}} | |
| # Word overlap | |
| pred_words = set(predicted.lower().split()) | |
| exp_words = set(expected.lower().split()) | |
| if not exp_words: | |
| result["score"] = 1.0 if not pred_words else 0.5 | |
| return result | |
| overlap = len(pred_words & exp_words) | |
| result["details"]["word_overlap"] = overlap | |
| result["details"]["expected_words"] = len(exp_words) | |
| # Jaccard similarity | |
| union = len(pred_words | exp_words) | |
| result["score"] = overlap / union if union > 0 else 0.0 | |
| return result | |
| def _llm_semantic_analysis(self, predicted: str, expected: str) -> Dict[str, Any]: | |
| """ | |
| Use LLM for semantic analysis of predicted vs expected. | |
| Uses XML-delimited prompt structure to prevent context bleeding | |
| and Multi-Dimensional Scoring (Semantics vs. Syntax). | |
| Returns: | |
| Dict with 'score' (0.0-1.0), 'details', and 'feedback' | |
| """ | |
| # Check cache | |
| cache_key = f"{hash(predicted)}:{hash(expected)}" | |
| if cache_key in self._analysis_cache: | |
| return self._analysis_cache[cache_key] | |
| result = {"score": 0.0, "details": {}, "feedback": ""} | |
| try: | |
| # Truncate for token limits but preserve enough context | |
| expected_truncated = expected[:10000] | |
| predicted_truncated = predicted[:10000] | |
| # OPTIMIZED: Penalty-based scoring with self-verification | |
| # Starts at 1.0 and deducts for failures - more consistent than subjective scoring | |
| analysis_prompt = f"""<system_role> | |
| You are a **Semantic Logic Engine** tasked with grading AI performance. | |
| You must compare a [PREDICTED] output against a [EXPECTED] truth. | |
| </system_role> | |
| <input_data> | |
| <expected_output> | |
| {expected_truncated} | |
| </expected_output> | |
| <predicted_output> | |
| {predicted_truncated} | |
| </predicted_output> | |
| </input_data> | |
| <scoring_algorithm> | |
| Calculate the score based on these STRICT rules. Start with 1.0 and deduct penalties. | |
| 1. **Information Completeness (Max -0.5)**: | |
| - If key facts/fields are missing, deduct proportional to importance. | |
| - If a nested JSON field is missing, deduct 0.1 per field. | |
| 2. **Accuracy & Hallucination (Max -1.0)**: | |
| - If factual numbers/IDs are wrong: Score = 0 immediately. | |
| - If the model invents information NOT in the input: Deduct 0.3. | |
| 3. **Format Compliance (Max -0.3)**: | |
| - If JSON is requested but Markdown is returned: Deduct 0.3. | |
| - If keys are lowercase instead of snake_case: Deduct 0.1. | |
| 4. **Semantic Equivalence (No Penalty)**: | |
| - Synonyms are ACCEPTED (e.g., "Purchase" == "Buy"). | |
| - Formatting differences (whitespace) are IGNORED. | |
| </scoring_algorithm> | |
| <self_verification> | |
| Before finalizing the score, ask: "If I used the predicted output in code expecting the original output, would the code crash?" | |
| - If YES (Crash) -> Score must be < 0.5. | |
| - If NO (Safe) -> Score can be high. | |
| </self_verification> | |
| <output_schema> | |
| Return JSON ONLY: | |
| {{ | |
| "semantic_similarity": 0.0-1.0, | |
| "structural_similarity": 0.0-1.0, | |
| "verdict": "PERFECT" | "ACCEPTABLE" | "FORMAT_ERROR" | "DATA_CORRUPTION", | |
| "critical_failures": ["List specific failures that caused score < 1.0"], | |
| "penalty_breakdown": {{"completeness": -0.0, "accuracy": -0.0, "format": -0.0}}, | |
| "fix_directive": "Imperative command to fix the prompt" | |
| }} | |
| </output_schema> | |
| """ | |
| response = self.llm_client.generate( | |
| system_prompt="You are a Semantic Logic Engine. Calculate scores using penalty-based deduction from 1.0. Respond only with valid JSON.", | |
| user_prompt=analysis_prompt, | |
| image_base64="" | |
| ) | |
| content = response.get("content", str(response)) if isinstance(response, dict) else str(response) | |
| # Parse JSON response | |
| analysis = self._extract_json_from_response(content) | |
| if analysis: | |
| # Extract semantic similarity (primary score) | |
| semantic_sim = float(analysis.get("semantic_similarity", 0.5)) | |
| structural_sim = float(analysis.get("structural_similarity", semantic_sim)) | |
| # Compute weighted score based on verdict (updated for new schema) | |
| verdict = analysis.get("verdict", "ACCEPTABLE") | |
| verdict_multiplier = { | |
| "PERFECT": 1.0, | |
| "ACCEPTABLE": 0.85, | |
| "FORMAT_ERROR": 0.6, # New: was WRONG_FORMAT | |
| "DATA_CORRUPTION": 0.1, # New: replaces WRONG_CONTENT + HALLUCINATION | |
| # Legacy support | |
| "WRONG_FORMAT": 0.6, | |
| "WRONG_CONTENT": 0.3, | |
| "HALLUCINATION": 0.1 | |
| }.get(verdict, 0.5) | |
| # Final score: weighted combination | |
| result["score"] = min(1.0, semantic_sim * 0.6 + structural_sim * 0.3 + verdict_multiplier * 0.1) | |
| # Extract penalty breakdown if available | |
| penalty_breakdown = analysis.get("penalty_breakdown", {}) | |
| critical_failures = analysis.get("critical_failures", []) | |
| result["details"] = { | |
| "verdict": verdict, | |
| "semantic_similarity": semantic_sim, | |
| "structural_similarity": structural_sim, | |
| "critical_failures": critical_failures, | |
| "penalty_breakdown": penalty_breakdown, | |
| # Legacy field support | |
| "key_matches": analysis.get("key_matches", []), | |
| "key_differences": analysis.get("key_differences", critical_failures), | |
| "value_errors": analysis.get("value_errors", {}), | |
| "reasoning": analysis.get("reasoning", "") | |
| } | |
| result["feedback"] = analysis.get("fix_directive", "") | |
| else: | |
| # Fallback if JSON parsing fails | |
| result = self._heuristic_semantic_analysis(predicted, expected) | |
| # Cache result | |
| self._analysis_cache[cache_key] = result | |
| except Exception as e: | |
| logger.warning(f"LLM semantic analysis failed: {e}, falling back to heuristics") | |
| result = self._heuristic_semantic_analysis(predicted, expected) | |
| return result | |
| def _extract_json_from_response(self, content: str) -> Optional[Dict]: | |
| """Extract JSON from LLM response.""" | |
| # Try to find JSON in response | |
| json_match = re.search(r'\{[\s\S]*\}', content) | |
| if json_match: | |
| try: | |
| return json.loads(json_match.group(0)) | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def _heuristic_semantic_analysis(self, predicted: str, expected: str) -> Dict[str, Any]: | |
| """ | |
| Heuristic-based semantic analysis when LLM is not available. | |
| Uses multiple signals: | |
| - Word overlap (Jaccard) | |
| - Sequence matching (SequenceMatcher) | |
| - Number extraction and comparison | |
| - Key phrase matching | |
| """ | |
| result = {"score": 0.0, "details": {}, "feedback": ""} | |
| pred_lower = predicted.lower() | |
| exp_lower = expected.lower() | |
| # 1. Sequence similarity | |
| seq_sim = SequenceMatcher(None, pred_lower, exp_lower).ratio() | |
| # 2. Word overlap (Jaccard) | |
| pred_words = set(pred_lower.split()) | |
| exp_words = set(exp_lower.split()) | |
| jaccard = len(pred_words & exp_words) / len(pred_words | exp_words) if (pred_words | exp_words) else 0.0 | |
| # 3. Number comparison | |
| pred_nums = re.findall(r'-?\d+\.?\d*', predicted) | |
| exp_nums = re.findall(r'-?\d+\.?\d*', expected) | |
| num_score = 1.0 | |
| num_errors = [] | |
| if exp_nums: | |
| matches = 0 | |
| for exp_num in exp_nums: | |
| if exp_num in pred_nums: | |
| matches += 1 | |
| else: | |
| # Check for close matches | |
| try: | |
| exp_val = float(exp_num) | |
| for pred_num in pred_nums: | |
| pred_val = float(pred_num) | |
| if abs(pred_val - exp_val) <= 1: # Off by 1 | |
| matches += 0.9 | |
| num_errors.append(f"Number close: expected {exp_num}, got {pred_num}") | |
| break | |
| else: | |
| num_errors.append(f"Number missing: expected {exp_num}") | |
| except ValueError: | |
| pass | |
| num_score = matches / len(exp_nums) if exp_nums else 1.0 | |
| # 4. Key entity extraction (simple approach) | |
| # Look for capitalized words, quoted strings, etc. | |
| pred_entities = set(re.findall(r'\b[A-Z][a-z]+\b', predicted)) | |
| exp_entities = set(re.findall(r'\b[A-Z][a-z]+\b', expected)) | |
| entity_overlap = len(pred_entities & exp_entities) / len(exp_entities) if exp_entities else 1.0 | |
| # Combine scores | |
| result["score"] = ( | |
| 0.3 * seq_sim + | |
| 0.25 * jaccard + | |
| 0.25 * num_score + | |
| 0.2 * entity_overlap | |
| ) | |
| result["details"] = { | |
| "sequence_similarity": seq_sim, | |
| "word_overlap": jaccard, | |
| "number_accuracy": num_score, | |
| "entity_overlap": entity_overlap, | |
| "number_errors": num_errors | |
| } | |
| # Generate feedback | |
| feedback_parts = [] | |
| if jaccard < 0.5: | |
| feedback_parts.append("Low word overlap - output may be missing key terms.") | |
| if num_errors: | |
| feedback_parts.append(f"Number issues: {'; '.join(num_errors[:3])}") | |
| if entity_overlap < 0.5 and exp_entities: | |
| missing = exp_entities - pred_entities | |
| feedback_parts.append(f"Missing entities: {', '.join(list(missing)[:3])}") | |
| if feedback_parts: | |
| result["feedback"] = " | ".join(feedback_parts) | |
| else: | |
| result["feedback"] = "Output is semantically similar but not exact match." | |
| return result | |
| def _generate_default_feedback(self, result: Dict) -> str: | |
| """Generate default feedback based on scores.""" | |
| score = result["composite_score"] | |
| semantic = result["semantic_similarity"] | |
| structural = result["structural_similarity"] | |
| if score >= 0.9: | |
| return "✅ Excellent match! Minor differences only." | |
| elif score >= 0.7: | |
| return f"⚠️ Good match (semantic={semantic:.0%}, structural={structural:.0%}). Some differences to address." | |
| elif score >= 0.5: | |
| return f"⚠️ Partial match (semantic={semantic:.0%}, structural={structural:.0%}). Significant differences found." | |
| else: | |
| return f"❌ Poor match (semantic={semantic:.0%}, structural={structural:.0%}). Major issues to fix." | |
| def get_evaluation_summary(self, results: List[Dict]) -> Dict[str, Any]: | |
| """ | |
| Get summary statistics for a batch of evaluations. | |
| Args: | |
| results: List of evaluation result dictionaries | |
| Returns: | |
| Summary statistics | |
| """ | |
| if not results: | |
| return { | |
| "total_samples": 0, | |
| "accuracy": 0.0, | |
| "avg_semantic_similarity": 0.0, | |
| "avg_structural_similarity": 0.0 | |
| } | |
| total = len(results) | |
| scores = [r.get("composite_score", 0.0) for r in results] | |
| semantic_scores = [r.get("semantic_similarity", 0.0) for r in results] | |
| structural_scores = [r.get("structural_similarity", 0.0) for r in results] | |
| return { | |
| "total_samples": total, | |
| "accuracy": sum(1 for s in scores if s >= 0.8) / total, | |
| "avg_composite_score": sum(scores) / total, | |
| "avg_semantic_similarity": sum(semantic_scores) / total, | |
| "avg_structural_similarity": sum(structural_scores) / total, | |
| "min_score": min(scores), | |
| "max_score": max(scores) | |
| } | |
| # Convenience function to create evaluator | |
| def create_universal_evaluator(llm_client=None) -> UniversalSemanticEvaluator: | |
| """ | |
| Create a Universal Semantic Evaluator. | |
| Args: | |
| llm_client: Optional LLM client for semantic analysis. | |
| If not provided, uses heuristic-based analysis. | |
| Returns: | |
| Configured UniversalSemanticEvaluator instance | |
| """ | |
| return UniversalSemanticEvaluator( | |
| llm_client=llm_client, | |
| use_llm_analysis=llm_client is not None | |
| ) | |