Suhasdev's picture
Remove hardcoded debug.log file writes - fix file not found errors
c0b1bdf
"""
Universal Semantic Evaluator for ANY prompt optimization use case.
This evaluator uses LLM-powered semantic analysis to compare predicted vs expected outputs,
enabling prompt optimization for ANY task without requiring custom evaluator code.
Key Features:
- Semantic understanding (not just string matching)
- Works with text, JSON, numbers, structured outputs
- Provides rich feedback for GEPA reflection
- No task-specific assumptions
"""
import json
import re
import logging
from typing import Dict, Any, Optional, List
from difflib import SequenceMatcher
from .base_evaluator import BaseEvaluator
logger = logging.getLogger(__name__)
class UniversalSemanticEvaluator(BaseEvaluator):
"""
Universal evaluator using LLM for semantic comparison.
Works for ANY task without hardcoded assumptions:
- Text outputs: "The answer is 42" vs "42"
- JSON outputs: {"count": 23} vs {"count": 22}
- Structured data: Lists, nested objects
- Multi-modal: Image descriptions, analysis results
Evaluation Strategy:
1. Quick checks (exact match, empty handling)
2. Structural comparison (for JSON/structured data)
3. LLM semantic analysis (for meaning understanding)
4. Combine into composite score with rich feedback
"""
def __init__(
self,
llm_client=None,
use_llm_analysis: bool = True,
semantic_weight: float = 0.6,
structural_weight: float = 0.25,
exact_match_bonus: float = 0.15,
metric_weights: Optional[Dict[str, float]] = None
):
"""
Initialize Universal Semantic Evaluator.
Args:
llm_client: LLM client for semantic analysis (optional, falls back to heuristics)
use_llm_analysis: Whether to use LLM for semantic comparison
semantic_weight: Weight for semantic similarity (0.0-1.0)
structural_weight: Weight for structural similarity (0.0-1.0)
exact_match_bonus: Bonus weight for exact matches (0.0-1.0)
metric_weights: Optional custom weights (overrides above)
"""
default_weights = metric_weights or {
"semantic_similarity": semantic_weight,
"structural_similarity": structural_weight,
"exact_match": exact_match_bonus
}
super().__init__(metric_weights=default_weights)
self.llm_client = llm_client
self.use_llm_analysis = use_llm_analysis and llm_client is not None
# Cache for LLM analysis to reduce API calls
self._analysis_cache: Dict[str, Dict] = {}
logger.info(f"🎯 Universal Semantic Evaluator initialized")
logger.info(f" LLM analysis: {'enabled' if self.use_llm_analysis else 'disabled (using heuristics)'}")
logger.info(f" Weights: semantic={semantic_weight}, structural={structural_weight}, exact={exact_match_bonus}")
def evaluate(self, predicted: Any, expected: Any) -> Dict[str, float]:
"""
Evaluate predicted output against expected output using semantic understanding.
Args:
predicted: The model's predicted output (string, dict, or any serializable type)
expected: The ground truth expected output
Returns:
Dictionary with metrics including 'composite_score' (required for GEPA)
"""
# Convert to strings for comparison
predicted_str = self._to_string(predicted)
expected_str = self._to_string(expected)
# Initialize result
result = {
"composite_score": 0.0,
"exact_match": 0.0,
"semantic_similarity": 0.0,
"structural_similarity": 0.0,
"predicted_output": predicted_str[:500], # Truncate for logging
"expected_output": expected_str[:500],
"analysis": {},
"improvement_feedback": ""
}
# Handle empty/missing outputs
if not predicted_str or not predicted_str.strip():
result["improvement_feedback"] = "❌ Output is EMPTY. The prompt must instruct the model to produce output."
result["analysis"] = {"status": "empty_predicted"}
return result
if not expected_str or not expected_str.strip():
result["improvement_feedback"] = "⚠️ Expected output is empty - cannot evaluate."
result["analysis"] = {"status": "empty_expected"}
result["composite_score"] = 0.5 # Neutral score
return result
# ─────────────────────────────────────────────────────
# STEP 1: Exact Match Check (Fast Path)
# ─────────────────────────────────────────────────────
normalized_pred = self._normalize(predicted_str)
normalized_exp = self._normalize(expected_str)
if normalized_pred == normalized_exp:
result["exact_match"] = 1.0
result["semantic_similarity"] = 1.0
result["structural_similarity"] = 1.0
result["composite_score"] = 1.0
result["improvement_feedback"] = "✅ Perfect match! Output exactly matches expected."
result["analysis"] = {"status": "exact_match"}
return result
# ─────────────────────────────────────────────────────
# STEP 1.5: FORMAT MISMATCH DETECTION (CRITICAL FIX)
# ─────────────────────────────────────────────────────
# 🔥 CRITICAL: Detect when expected is JSON but predicted is narrative text
# This causes catastrophically low scores and needs explicit handling
expected_is_json = self._try_parse_json(expected_str) is not None
predicted_is_json = self._try_parse_json(predicted_str) is not None
format_mismatch = expected_is_json and not predicted_is_json
if format_mismatch:
# Expected JSON but got narrative - this is a CRITICAL format error
# Give partial credit for semantic content but penalize heavily for format
result["analysis"]["format_mismatch"] = True
result["improvement_feedback"] = (
"❌ FORMAT ERROR: Expected JSON output but received narrative text. "
"The prompt MUST enforce JSON output format. "
"Add explicit instructions like: 'Output ONLY valid JSON, no explanations.' "
"Consider adding: 'Do NOT write prose or explanations.'"
)
# Still evaluate semantic content but cap the score
# This gives feedback for improving the prompt
logger.warning(f"⚠️ Format mismatch: expected JSON ({len(expected_str)} chars), got narrative ({len(predicted_str)} chars)")
# ─────────────────────────────────────────────────────
# STEP 2: Structural Comparison (for JSON/structured data)
# ─────────────────────────────────────────────────────
structural_result = self._compare_structure(predicted_str, expected_str)
result["structural_similarity"] = structural_result["score"]
result["analysis"]["structural"] = structural_result.get("details", {})
# ─────────────────────────────────────────────────────
# STEP 3: Semantic Analysis
# ─────────────────────────────────────────────────────
if self.use_llm_analysis:
semantic_result = self._llm_semantic_analysis(predicted_str, expected_str)
else:
semantic_result = self._heuristic_semantic_analysis(predicted_str, expected_str)
result["semantic_similarity"] = semantic_result["score"]
result["analysis"]["semantic"] = semantic_result.get("details", {})
result["improvement_feedback"] = semantic_result.get("feedback", "")
# ─────────────────────────────────────────────────────
# STEP 4: Compute Composite Score
# ─────────────────────────────────────────────────────
weights = self.metric_weights
composite = (
result["semantic_similarity"] * weights.get("semantic_similarity", 0.6) +
result["structural_similarity"] * weights.get("structural_similarity", 0.25) +
result["exact_match"] * weights.get("exact_match", 0.15)
)
# 🔥 CRITICAL FIX: Apply format mismatch penalty
# If expected JSON but got narrative, cap the score to encourage format compliance
if result.get("analysis", {}).get("format_mismatch"):
# Cap at 0.3 to indicate "partial semantic match but wrong format"
# This ensures format-correct outputs always score higher
composite = min(composite, 0.30)
logger.debug(f"📊 Format mismatch penalty applied: score capped at {composite:.3f}")
result["composite_score"] = min(max(composite, 0.0), 1.0)
# Add score breakdown to feedback
if not result["improvement_feedback"]:
result["improvement_feedback"] = self._generate_default_feedback(result)
# Log evaluation
logger.debug(f"📊 Evaluation: composite={result['composite_score']:.3f}, "
f"semantic={result['semantic_similarity']:.3f}, "
f"structural={result['structural_similarity']:.3f}")
# Debug logging removed - not needed in production
return result
def _to_string(self, value: Any) -> str:
"""Convert any value to string for comparison."""
if value is None:
return ""
if isinstance(value, str):
return value.strip()
if isinstance(value, dict):
try:
return json.dumps(value, sort_keys=True, indent=2)
except (TypeError, ValueError):
return str(value)
if isinstance(value, (list, tuple)):
try:
return json.dumps(list(value), sort_keys=True)
except (TypeError, ValueError):
return str(value)
return str(value).strip()
def _normalize(self, text: str) -> str:
"""Normalize text for comparison (lowercase, whitespace)."""
# Lowercase and normalize whitespace
normalized = ' '.join(text.lower().split())
# Remove common punctuation that doesn't affect meaning
normalized = re.sub(r'[.,;:!?\'"]+$', '', normalized)
return normalized
def _compare_structure(self, predicted: str, expected: str) -> Dict[str, Any]:
"""
Compare structural similarity (especially for JSON/structured outputs).
Returns:
Dict with 'score' (0.0-1.0) and 'details'
"""
result = {"score": 0.0, "details": {}}
# Try to parse as JSON
pred_json = self._try_parse_json(predicted)
exp_json = self._try_parse_json(expected)
if pred_json is not None and exp_json is not None:
# Both are valid JSON - do structural comparison
return self._compare_json_structures(pred_json, exp_json)
# Fallback: Compare as text structure
return self._compare_text_structure(predicted, expected)
def _try_parse_json(self, text: str) -> Optional[Any]:
"""
Try to parse text as JSON with robust extraction.
🔥 FIX: LLMs often wrap JSON in markdown code blocks or add extra text.
This method now handles multiple formats:
- Direct JSON
- ```json ... ``` blocks
- ``` ... ``` blocks (no language tag)
- JSON embedded in prose
- Escaped newlines and quotes
"""
if not text or not isinstance(text, str):
return None
# 🔥 PREPROCESSING: Clean common LLM output issues
cleaned = text.strip()
# Remove BOM and other invisible characters
cleaned = cleaned.lstrip('\ufeff\u200b\u200c\u200d')
# Strategy 1: Try direct parse (cleanest case)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
pass
# Strategy 2: Extract JSON from markdown code block (```json ... ```)
# More permissive regex that handles optional language tags
json_match = re.search(r'```(?:json|JSON)?\s*([\{|\[].*?[\}|\]])\s*```', cleaned, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Strategy 3: Find JSON using balanced brace matching (handles nested objects)
def extract_balanced_json(s: str, start_char: str, end_char: str) -> Optional[str]:
"""Extract JSON with balanced braces/brackets."""
count = 0
start_idx = -1
for i, char in enumerate(s):
if char == start_char:
if count == 0:
start_idx = i
count += 1
elif char == end_char:
count -= 1
if count == 0 and start_idx >= 0:
return s[start_idx:i+1]
return None
# Try to find JSON object
json_obj = extract_balanced_json(cleaned, '{', '}')
if json_obj:
try:
return json.loads(json_obj)
except json.JSONDecodeError:
# Try to repair common issues
repaired = self._repair_json(json_obj)
try:
return json.loads(repaired)
except json.JSONDecodeError:
pass
# Try to find JSON array
json_arr = extract_balanced_json(cleaned, '[', ']')
if json_arr:
try:
return json.loads(json_arr)
except json.JSONDecodeError:
repaired = self._repair_json(json_arr)
try:
return json.loads(repaired)
except json.JSONDecodeError:
pass
return None
def _repair_json(self, json_str: str) -> str:
"""
Attempt to repair common JSON issues from LLM output.
Fixes:
- Trailing commas before } or ]
- Single quotes instead of double quotes
- Unquoted keys
- Comments (// and /* */)
"""
repaired = json_str
# Remove trailing commas
repaired = re.sub(r',\s*}', '}', repaired)
repaired = re.sub(r',\s*]', ']', repaired)
# Remove single-line comments
repaired = re.sub(r'//[^\n]*', '', repaired)
# Remove multi-line comments
repaired = re.sub(r'/\*.*?\*/', '', repaired, flags=re.DOTALL)
# Replace single quotes with double quotes (but be careful with apostrophes)
# Only replace when it looks like a JSON delimiter
def replace_single_quotes(match):
content = match.group(0)
# Skip if it looks like an apostrophe in a word
if re.match(r"'\w+'\s*:", content) or re.match(r":\s*'[^']*'", content):
return content.replace("'", '"')
return content
# Basic single quote replacement for keys
repaired = re.sub(r"'([^']+)'\s*:", r'"\1":', repaired)
return repaired
def _compare_json_structures(self, pred: Any, exp: Any) -> Dict[str, Any]:
"""Compare two JSON structures."""
result = {"score": 0.0, "details": {"type": "json", "matches": [], "mismatches": []}}
if type(pred) != type(exp):
result["details"]["mismatches"].append(f"Type mismatch: predicted={type(pred).__name__}, expected={type(exp).__name__}")
result["score"] = 0.2 # Some credit for being JSON
return result
if isinstance(pred, dict) and isinstance(exp, dict):
return self._compare_dicts(pred, exp)
elif isinstance(pred, list) and isinstance(exp, list):
return self._compare_lists(pred, exp)
else:
# Primitive types
if pred == exp:
result["score"] = 1.0
result["details"]["matches"].append(f"Values match: {pred}")
else:
result["score"] = self._value_similarity(pred, exp)
result["details"]["mismatches"].append(f"Value mismatch: predicted={pred}, expected={exp}")
return result
def _compare_dicts(self, pred: dict, exp: dict) -> Dict[str, Any]:
"""
Compare two dictionaries with CASE-INSENSITIVE key matching.
🔥 FIX: LLMs often produce keys like 'Category' when expected is 'category'.
This method now normalizes keys before comparison for fair scoring.
"""
result = {"score": 0.0, "details": {"type": "dict", "matches": [], "mismatches": [], "missing_keys": [], "extra_keys": []}}
# 🔥 NORMALIZE: Convert all keys to lowercase for comparison
# Also handle common variations like underscores vs camelCase
def normalize_key(key: str) -> str:
"""Normalize key: lowercase, underscores to nothing, strip spaces."""
return re.sub(r'[_\s-]', '', str(key).lower())
# Build normalized key mappings
pred_normalized = {normalize_key(k): (k, v) for k, v in pred.items()}
exp_normalized = {normalize_key(k): (k, v) for k, v in exp.items()}
pred_norm_keys = set(pred_normalized.keys())
exp_norm_keys = set(exp_normalized.keys())
# Check for missing/extra keys (using normalized comparison)
missing_norm = exp_norm_keys - pred_norm_keys
extra_norm = pred_norm_keys - exp_norm_keys
common_norm = pred_norm_keys & exp_norm_keys
# Convert back to original key names for reporting
missing = [exp_normalized[k][0] for k in missing_norm]
extra = [pred_normalized[k][0] for k in extra_norm]
result["details"]["missing_keys"] = missing
result["details"]["extra_keys"] = extra
if not exp_norm_keys:
result["score"] = 1.0 if not pred_norm_keys else 0.5
return result
# Score based on key overlap (normalized)
key_score = len(common_norm) / len(exp_norm_keys) if exp_norm_keys else 1.0
# Score based on value matches
value_scores = []
for norm_key in common_norm:
pred_orig_key, pred_val = pred_normalized[norm_key]
exp_orig_key, exp_val = exp_normalized[norm_key]
if pred_val == exp_val:
value_scores.append(1.0)
result["details"]["matches"].append(f"{exp_orig_key}: {exp_val}")
else:
sim = self._value_similarity(pred_val, exp_val)
value_scores.append(sim)
if sim < 0.8:
result["details"]["mismatches"].append(f"{exp_orig_key}: predicted={pred_val}, expected={exp_val}")
value_score = sum(value_scores) / len(value_scores) if value_scores else 0.0
# Combine scores
result["score"] = 0.3 * key_score + 0.7 * value_score
# Penalty for missing keys (reduced from 0.1 to 0.05 per key)
if missing:
result["score"] *= (1 - 0.05 * len(missing))
result["score"] = max(0.0, min(1.0, result["score"]))
return result
def _compare_lists(self, pred: list, exp: list) -> Dict[str, Any]:
"""Compare two lists."""
result = {"score": 0.0, "details": {"type": "list", "length_match": False, "item_matches": 0}}
if not exp:
result["score"] = 1.0 if not pred else 0.5
return result
result["details"]["length_match"] = len(pred) == len(exp)
# Compare items (order-sensitive)
matches = 0
for i, exp_item in enumerate(exp):
if i < len(pred):
if pred[i] == exp_item:
matches += 1
else:
# Check if item exists elsewhere
if exp_item in pred:
matches += 0.5 # Partial credit for wrong position
result["details"]["item_matches"] = matches
result["score"] = matches / len(exp)
# Penalty for length mismatch
if len(pred) != len(exp):
len_ratio = min(len(pred), len(exp)) / max(len(pred), len(exp))
result["score"] *= (0.7 + 0.3 * len_ratio)
return result
def _value_similarity(self, pred: Any, exp: Any) -> float:
"""
Calculate similarity between two values.
🔥 ENHANCED: Now handles:
- Case-insensitive string comparison
- Semantic similarity for common variations
- Underscore/space/dash normalization
- Numeric comparison with tolerance
"""
# Same value (exact match)
if pred == exp:
return 1.0
# Numeric comparison
try:
pred_num = float(pred)
exp_num = float(exp)
if exp_num == 0:
return 1.0 if pred_num == 0 else 0.0
# Relative error with tolerance
error = abs(pred_num - exp_num) / abs(exp_num)
return max(0.0, 1.0 - error)
except (ValueError, TypeError):
pass
# String comparison with normalization
pred_str = str(pred).strip()
exp_str = str(exp).strip()
# Case-insensitive exact match
if pred_str.lower() == exp_str.lower():
return 0.98 # Slight penalty for case mismatch
# Normalize strings (remove underscores, spaces, dashes for comparison)
def normalize_str(s: str) -> str:
return re.sub(r'[_\s\-]+', '', s.lower())
pred_norm = normalize_str(pred_str)
exp_norm = normalize_str(exp_str)
if pred_norm == exp_norm:
return 0.95 # Good match despite formatting differences
# Check if one contains the other (partial match)
if pred_norm in exp_norm or exp_norm in pred_norm:
ratio = min(len(pred_norm), len(exp_norm)) / max(len(pred_norm), len(exp_norm))
return 0.7 + (0.2 * ratio) # 0.7-0.9 for partial matches
# 🔥 SEMANTIC SIMILARITY: Check for common equivalent terms
semantic_equivalents = {
# Priority levels
'low': ['low', 'minor', 'trivial', 'p3', 'p4'],
'medium': ['medium', 'normal', 'moderate', 'p2'],
'high': ['high', 'important', 'major', 'p1', 'critical', 'urgent'],
# Boolean variations
'true': ['true', 'yes', '1', 'on', 'enabled'],
'false': ['false', 'no', '0', 'off', 'disabled'],
# Status variations
'success': ['success', 'succeeded', 'completed', 'done', 'passed'],
'failure': ['failure', 'failed', 'error', 'crashed'],
'pending': ['pending', 'waiting', 'queued', 'in_progress', 'processing'],
}
for canonical, equivalents in semantic_equivalents.items():
pred_match = any(eq in pred_norm for eq in equivalents)
exp_match = any(eq in exp_norm for eq in equivalents)
if pred_match and exp_match:
return 0.85 # Semantic match
# Sequence matching (character-level similarity)
ratio = SequenceMatcher(None, pred_str.lower(), exp_str.lower()).ratio()
# 🔥 WORD-LEVEL SIMILARITY: Check word overlap
pred_words = set(re.findall(r'\w+', pred_str.lower()))
exp_words = set(re.findall(r'\w+', exp_str.lower()))
if pred_words and exp_words:
word_overlap = len(pred_words & exp_words) / max(len(pred_words), len(exp_words))
# Combine character and word similarity
return max(ratio, word_overlap * 0.9)
def _compare_text_structure(self, predicted: str, expected: str) -> Dict[str, Any]:
"""Compare text structure when not JSON."""
result = {"score": 0.0, "details": {"type": "text"}}
# Word overlap
pred_words = set(predicted.lower().split())
exp_words = set(expected.lower().split())
if not exp_words:
result["score"] = 1.0 if not pred_words else 0.5
return result
overlap = len(pred_words & exp_words)
result["details"]["word_overlap"] = overlap
result["details"]["expected_words"] = len(exp_words)
# Jaccard similarity
union = len(pred_words | exp_words)
result["score"] = overlap / union if union > 0 else 0.0
return result
def _llm_semantic_analysis(self, predicted: str, expected: str) -> Dict[str, Any]:
"""
Use LLM for semantic analysis of predicted vs expected.
Uses XML-delimited prompt structure to prevent context bleeding
and Multi-Dimensional Scoring (Semantics vs. Syntax).
Returns:
Dict with 'score' (0.0-1.0), 'details', and 'feedback'
"""
# Check cache
cache_key = f"{hash(predicted)}:{hash(expected)}"
if cache_key in self._analysis_cache:
return self._analysis_cache[cache_key]
result = {"score": 0.0, "details": {}, "feedback": ""}
try:
# Truncate for token limits but preserve enough context
expected_truncated = expected[:10000]
predicted_truncated = predicted[:10000]
# OPTIMIZED: Penalty-based scoring with self-verification
# Starts at 1.0 and deducts for failures - more consistent than subjective scoring
analysis_prompt = f"""<system_role>
You are a **Semantic Logic Engine** tasked with grading AI performance.
You must compare a [PREDICTED] output against a [EXPECTED] truth.
</system_role>
<input_data>
<expected_output>
{expected_truncated}
</expected_output>
<predicted_output>
{predicted_truncated}
</predicted_output>
</input_data>
<scoring_algorithm>
Calculate the score based on these STRICT rules. Start with 1.0 and deduct penalties.
1. **Information Completeness (Max -0.5)**:
- If key facts/fields are missing, deduct proportional to importance.
- If a nested JSON field is missing, deduct 0.1 per field.
2. **Accuracy & Hallucination (Max -1.0)**:
- If factual numbers/IDs are wrong: Score = 0 immediately.
- If the model invents information NOT in the input: Deduct 0.3.
3. **Format Compliance (Max -0.3)**:
- If JSON is requested but Markdown is returned: Deduct 0.3.
- If keys are lowercase instead of snake_case: Deduct 0.1.
4. **Semantic Equivalence (No Penalty)**:
- Synonyms are ACCEPTED (e.g., "Purchase" == "Buy").
- Formatting differences (whitespace) are IGNORED.
</scoring_algorithm>
<self_verification>
Before finalizing the score, ask: "If I used the predicted output in code expecting the original output, would the code crash?"
- If YES (Crash) -> Score must be < 0.5.
- If NO (Safe) -> Score can be high.
</self_verification>
<output_schema>
Return JSON ONLY:
{{
"semantic_similarity": 0.0-1.0,
"structural_similarity": 0.0-1.0,
"verdict": "PERFECT" | "ACCEPTABLE" | "FORMAT_ERROR" | "DATA_CORRUPTION",
"critical_failures": ["List specific failures that caused score < 1.0"],
"penalty_breakdown": {{"completeness": -0.0, "accuracy": -0.0, "format": -0.0}},
"fix_directive": "Imperative command to fix the prompt"
}}
</output_schema>
"""
response = self.llm_client.generate(
system_prompt="You are a Semantic Logic Engine. Calculate scores using penalty-based deduction from 1.0. Respond only with valid JSON.",
user_prompt=analysis_prompt,
image_base64=""
)
content = response.get("content", str(response)) if isinstance(response, dict) else str(response)
# Parse JSON response
analysis = self._extract_json_from_response(content)
if analysis:
# Extract semantic similarity (primary score)
semantic_sim = float(analysis.get("semantic_similarity", 0.5))
structural_sim = float(analysis.get("structural_similarity", semantic_sim))
# Compute weighted score based on verdict (updated for new schema)
verdict = analysis.get("verdict", "ACCEPTABLE")
verdict_multiplier = {
"PERFECT": 1.0,
"ACCEPTABLE": 0.85,
"FORMAT_ERROR": 0.6, # New: was WRONG_FORMAT
"DATA_CORRUPTION": 0.1, # New: replaces WRONG_CONTENT + HALLUCINATION
# Legacy support
"WRONG_FORMAT": 0.6,
"WRONG_CONTENT": 0.3,
"HALLUCINATION": 0.1
}.get(verdict, 0.5)
# Final score: weighted combination
result["score"] = min(1.0, semantic_sim * 0.6 + structural_sim * 0.3 + verdict_multiplier * 0.1)
# Extract penalty breakdown if available
penalty_breakdown = analysis.get("penalty_breakdown", {})
critical_failures = analysis.get("critical_failures", [])
result["details"] = {
"verdict": verdict,
"semantic_similarity": semantic_sim,
"structural_similarity": structural_sim,
"critical_failures": critical_failures,
"penalty_breakdown": penalty_breakdown,
# Legacy field support
"key_matches": analysis.get("key_matches", []),
"key_differences": analysis.get("key_differences", critical_failures),
"value_errors": analysis.get("value_errors", {}),
"reasoning": analysis.get("reasoning", "")
}
result["feedback"] = analysis.get("fix_directive", "")
else:
# Fallback if JSON parsing fails
result = self._heuristic_semantic_analysis(predicted, expected)
# Cache result
self._analysis_cache[cache_key] = result
except Exception as e:
logger.warning(f"LLM semantic analysis failed: {e}, falling back to heuristics")
result = self._heuristic_semantic_analysis(predicted, expected)
return result
def _extract_json_from_response(self, content: str) -> Optional[Dict]:
"""Extract JSON from LLM response."""
# Try to find JSON in response
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
return None
def _heuristic_semantic_analysis(self, predicted: str, expected: str) -> Dict[str, Any]:
"""
Heuristic-based semantic analysis when LLM is not available.
Uses multiple signals:
- Word overlap (Jaccard)
- Sequence matching (SequenceMatcher)
- Number extraction and comparison
- Key phrase matching
"""
result = {"score": 0.0, "details": {}, "feedback": ""}
pred_lower = predicted.lower()
exp_lower = expected.lower()
# 1. Sequence similarity
seq_sim = SequenceMatcher(None, pred_lower, exp_lower).ratio()
# 2. Word overlap (Jaccard)
pred_words = set(pred_lower.split())
exp_words = set(exp_lower.split())
jaccard = len(pred_words & exp_words) / len(pred_words | exp_words) if (pred_words | exp_words) else 0.0
# 3. Number comparison
pred_nums = re.findall(r'-?\d+\.?\d*', predicted)
exp_nums = re.findall(r'-?\d+\.?\d*', expected)
num_score = 1.0
num_errors = []
if exp_nums:
matches = 0
for exp_num in exp_nums:
if exp_num in pred_nums:
matches += 1
else:
# Check for close matches
try:
exp_val = float(exp_num)
for pred_num in pred_nums:
pred_val = float(pred_num)
if abs(pred_val - exp_val) <= 1: # Off by 1
matches += 0.9
num_errors.append(f"Number close: expected {exp_num}, got {pred_num}")
break
else:
num_errors.append(f"Number missing: expected {exp_num}")
except ValueError:
pass
num_score = matches / len(exp_nums) if exp_nums else 1.0
# 4. Key entity extraction (simple approach)
# Look for capitalized words, quoted strings, etc.
pred_entities = set(re.findall(r'\b[A-Z][a-z]+\b', predicted))
exp_entities = set(re.findall(r'\b[A-Z][a-z]+\b', expected))
entity_overlap = len(pred_entities & exp_entities) / len(exp_entities) if exp_entities else 1.0
# Combine scores
result["score"] = (
0.3 * seq_sim +
0.25 * jaccard +
0.25 * num_score +
0.2 * entity_overlap
)
result["details"] = {
"sequence_similarity": seq_sim,
"word_overlap": jaccard,
"number_accuracy": num_score,
"entity_overlap": entity_overlap,
"number_errors": num_errors
}
# Generate feedback
feedback_parts = []
if jaccard < 0.5:
feedback_parts.append("Low word overlap - output may be missing key terms.")
if num_errors:
feedback_parts.append(f"Number issues: {'; '.join(num_errors[:3])}")
if entity_overlap < 0.5 and exp_entities:
missing = exp_entities - pred_entities
feedback_parts.append(f"Missing entities: {', '.join(list(missing)[:3])}")
if feedback_parts:
result["feedback"] = " | ".join(feedback_parts)
else:
result["feedback"] = "Output is semantically similar but not exact match."
return result
def _generate_default_feedback(self, result: Dict) -> str:
"""Generate default feedback based on scores."""
score = result["composite_score"]
semantic = result["semantic_similarity"]
structural = result["structural_similarity"]
if score >= 0.9:
return "✅ Excellent match! Minor differences only."
elif score >= 0.7:
return f"⚠️ Good match (semantic={semantic:.0%}, structural={structural:.0%}). Some differences to address."
elif score >= 0.5:
return f"⚠️ Partial match (semantic={semantic:.0%}, structural={structural:.0%}). Significant differences found."
else:
return f"❌ Poor match (semantic={semantic:.0%}, structural={structural:.0%}). Major issues to fix."
def get_evaluation_summary(self, results: List[Dict]) -> Dict[str, Any]:
"""
Get summary statistics for a batch of evaluations.
Args:
results: List of evaluation result dictionaries
Returns:
Summary statistics
"""
if not results:
return {
"total_samples": 0,
"accuracy": 0.0,
"avg_semantic_similarity": 0.0,
"avg_structural_similarity": 0.0
}
total = len(results)
scores = [r.get("composite_score", 0.0) for r in results]
semantic_scores = [r.get("semantic_similarity", 0.0) for r in results]
structural_scores = [r.get("structural_similarity", 0.0) for r in results]
return {
"total_samples": total,
"accuracy": sum(1 for s in scores if s >= 0.8) / total,
"avg_composite_score": sum(scores) / total,
"avg_semantic_similarity": sum(semantic_scores) / total,
"avg_structural_similarity": sum(structural_scores) / total,
"min_score": min(scores),
"max_score": max(scores)
}
# Convenience function to create evaluator
def create_universal_evaluator(llm_client=None) -> UniversalSemanticEvaluator:
"""
Create a Universal Semantic Evaluator.
Args:
llm_client: Optional LLM client for semantic analysis.
If not provided, uses heuristic-based analysis.
Returns:
Configured UniversalSemanticEvaluator instance
"""
return UniversalSemanticEvaluator(
llm_client=llm_client,
use_llm_analysis=llm_client is not None
)