Suhasdev's picture
Deploy Universal Prompt Optimizer to HF Spaces (clean)
cacd4d0
"""
Index Caching Evaluator for Index-Based Element Selection Use Case
Evaluates predicted index caching results against expected results.
Compares all 5 fields with equal weight:
- is_index_based
- index_value
- parent_element_id
- element_id_of_nth_child_of_parent
- selected_element_is_correct
"""
from typing import Dict, Any, Optional
import json
import re
import logging
from .base_evaluator import BaseEvaluator
class IndexCachingEvaluator(BaseEvaluator):
"""
Evaluator for index caching use case.
Features:
- Compares all 5 fields with equal weight (20% each)
- Parses JSON from LLM response
- Handles null values correctly
- Returns detailed field-by-field comparison
"""
def __init__(self, metric_weights: Optional[Dict[str, float]] = None):
"""
Initialize index caching evaluator.
Args:
metric_weights: Weights for evaluation metrics
Default: Equal weight for all 5 fields (0.2 each)
"""
# Each field gets 20% weight (5 fields * 0.2 = 1.0)
default_weights = {
"is_index_based_match": 0.2,
"index_value_match": 0.2,
"parent_element_id_match": 0.2,
"element_id_of_nth_child_match": 0.2,
"selected_element_correct_match": 0.2,
}
weights = metric_weights or default_weights
super().__init__(metric_weights=weights)
def evaluate(self, predicted: str, expected: str) -> Dict[str, float]:
"""
Evaluate predicted index caching result against expected result.
Args:
predicted: LLM's output (JSON string with all 5 fields)
expected: Expected output (JSON string or dict with all 5 fields)
Returns:
Dictionary with evaluation metrics:
{
"is_index_based_match": 1.0 or 0.0,
"index_value_match": 1.0 or 0.0,
"parent_element_id_match": 1.0 or 0.0,
"element_id_of_nth_child_match": 1.0 or 0.0,
"selected_element_correct_match": 1.0 or 0.0,
"composite_score": 0.0 to 1.0,
"predicted_output": str,
"expected_output": str,
"field_scores": {...},
"evaluation_reason": str
}
"""
if not predicted or not expected:
return {
"is_index_based_match": 0.0,
"index_value_match": 0.0,
"parent_element_id_match": 0.0,
"element_id_of_nth_child_match": 0.0,
"selected_element_correct_match": 0.0,
"composite_score": 0.0,
"predicted_output": str(predicted).strip() if predicted else "",
"expected_output": str(expected).strip() if expected else "",
"field_scores": {},
"evaluation_reason": "❌ Empty or missing input/output"
}
# Parse expected (could be JSON string or dict)
try:
if isinstance(expected, str):
expected_dict = json.loads(expected)
else:
expected_dict = expected
except (json.JSONDecodeError, TypeError):
# If expected is already a dict from dataset
expected_dict = expected if isinstance(expected, dict) else {}
# Parse predicted (must be JSON string)
try:
predicted_dict = self._parse_json_response(predicted)
except Exception as e:
# Log the actual response for debugging
response_preview = predicted[:200] if predicted else "(empty)"
self.logger.warning(f"Failed to parse predicted JSON: {e}")
self.logger.warning(f"Response preview: {response_preview}...")
predicted_dict = {}
# NOTE: "notes" field is present in the output but is NOT used for scoring or reflection
# It's kept for reference but ignored in evaluation
# Compare each field (only the 5 core fields, ignoring "notes")
field_scores = {}
field_reasons = []
# 1. is_index_based (boolean)
pred_is_index = predicted_dict.get("is_index_based")
exp_is_index = expected_dict.get("is_index_based")
is_index_match = (pred_is_index == exp_is_index) if (pred_is_index is not None and exp_is_index is not None) else False
field_scores["is_index_based"] = 1.0 if is_index_match else 0.0
field_reasons.append(f"is_index_based: {pred_is_index} vs {exp_is_index}{'✅' if is_index_match else '❌'}")
# 2. index_value (int or null)
pred_index_val = predicted_dict.get("index_value")
exp_index_val = expected_dict.get("index_value")
# Handle null/None comparison
index_val_match = (pred_index_val == exp_index_val) or (pred_index_val is None and exp_index_val is None)
field_scores["index_value"] = 1.0 if index_val_match else 0.0
field_reasons.append(f"index_value: {pred_index_val} vs {exp_index_val}{'✅' if index_val_match else '❌'}")
# 3. parent_element_id (string or null)
pred_parent = predicted_dict.get("parent_element_id")
exp_parent = expected_dict.get("parent_element_id")
# Handle null/None comparison
parent_match = (pred_parent == exp_parent) or (pred_parent is None and exp_parent is None)
field_scores["parent_element_id"] = 1.0 if parent_match else 0.0
field_reasons.append(f"parent_element_id: {pred_parent} vs {exp_parent}{'✅' if parent_match else '❌'}")
# 4. element_id_of_nth_child_of_parent (string or null)
pred_element = predicted_dict.get("element_id_of_nth_child_of_parent")
exp_element = expected_dict.get("element_id_of_nth_child_of_parent")
# Handle null/None comparison
element_match = (pred_element == exp_element) or (pred_element is None and exp_element is None)
field_scores["element_id_of_nth_child_of_parent"] = 1.0 if element_match else 0.0
field_reasons.append(f"element_id_of_nth_child: {pred_element} vs {exp_element}{'✅' if element_match else '❌'}")
# 5. selected_element_is_correct (boolean)
pred_selected = predicted_dict.get("selected_element_is_correct")
exp_selected = expected_dict.get("selected_element_is_correct")
selected_match = (pred_selected == exp_selected) if (pred_selected is not None and exp_selected is not None) else False
field_scores["selected_element_is_correct"] = 1.0 if selected_match else 0.0
field_reasons.append(f"selected_element_is_correct: {pred_selected} vs {exp_selected}{'✅' if selected_match else '❌'}")
# Calculate composite score (weighted average)
composite_score = (
field_scores["is_index_based"] * 0.2 +
field_scores["index_value"] * 0.2 +
field_scores["parent_element_id"] * 0.2 +
field_scores["element_id_of_nth_child_of_parent"] * 0.2 +
field_scores["selected_element_is_correct"] * 0.2
)
# Build evaluation reason
all_match = composite_score == 1.0
reason = "✅ All fields match!" if all_match else f"❌ Partial match ({composite_score:.1%})"
reason += "\n" + "\n".join(f" {r}" for r in field_reasons)
# Log evaluation details
self.logger.info(f"\n{'─'*70}")
self.logger.info(f"📊 INDEX CACHING EVALUATION")
self.logger.info(f"{'─'*70}")
self.logger.info(f" 🎯 COMPOSITE SCORE: {composite_score:.2f} ({composite_score:.1%})")
for field, score in field_scores.items():
status = "✅" if score == 1.0 else "❌"
self.logger.info(f" {status} {field}: {score:.0f}")
self.logger.info(f"{'─'*70}\n")
return {
"is_index_based_match": field_scores["is_index_based"],
"index_value_match": field_scores["index_value"],
"parent_element_id_match": field_scores["parent_element_id"],
"element_id_of_nth_child_match": field_scores["element_id_of_nth_child_of_parent"],
"selected_element_correct_match": field_scores["selected_element_is_correct"],
"composite_score": composite_score,
"predicted_output": predicted,
"expected_output": json.dumps(expected_dict) if isinstance(expected_dict, dict) else str(expected),
"predicted_dict": predicted_dict,
"expected_dict": expected_dict,
"field_scores": field_scores,
"evaluation_reason": reason
}
def _parse_json_response(self, response: str) -> Dict[str, Any]:
"""
Parse JSON from LLM response, handling markdown code blocks and various formats.
Args:
response: LLM response string (may contain markdown)
Returns:
Parsed JSON dictionary (empty dict if parsing fails)
"""
if not response or not isinstance(response, str):
return {}
response = response.strip()
# If response is empty, return empty dict
if not response:
return {}
# Strategy 1: Try to extract JSON from markdown code block
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL)
if json_match:
try:
json_str = json_match.group(1).strip()
return json.loads(json_str)
except json.JSONDecodeError:
pass
# Strategy 2: Find JSON object in response (handle nested braces)
json_start = response.find('{')
if json_start != -1:
# Find matching closing brace
brace_count = 0
json_end = json_start
for i in range(json_start, len(response)):
if response[i] == '{':
brace_count += 1
elif response[i] == '}':
brace_count -= 1
if brace_count == 0:
json_end = i + 1
break
if brace_count == 0:
json_str = response[json_start:json_end]
try:
return json.loads(json_str)
except json.JSONDecodeError:
pass
# Strategy 3: Try to find any JSON-like structure (more lenient)
# Look for patterns like {"key": "value"} even if not perfectly formatted
json_pattern = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response, re.DOTALL)
if json_pattern:
try:
return json.loads(json_pattern.group(0))
except json.JSONDecodeError:
pass
# Strategy 4: Try parsing entire response as JSON
try:
return json.loads(response)
except json.JSONDecodeError:
pass
# If all strategies fail, return empty dict
self.logger.debug(f"Could not parse JSON from response: {response[:100]}...")
return {}
def get_evaluation_summary(self, results: list) -> Dict[str, Any]:
"""
Get summary statistics for a batch of evaluations.
Args:
results: List of evaluation result dictionaries
Returns:
Summary statistics including accuracy per field and overall
"""
if not results:
return {
"total_samples": 0,
"overall_accuracy": 0.0,
"field_accuracies": {},
"perfect_matches": 0
}
total = len(results)
perfect_matches = sum(1 for r in results if r.get("composite_score", 0.0) == 1.0)
overall_accuracy = perfect_matches / total if total > 0 else 0.0
# Calculate accuracy per field
field_accuracies = {
"is_index_based": sum(1 for r in results if r.get("is_index_based_match", 0.0) == 1.0) / total,
"index_value": sum(1 for r in results if r.get("index_value_match", 0.0) == 1.0) / total,
"parent_element_id": sum(1 for r in results if r.get("parent_element_id_match", 0.0) == 1.0) / total,
"element_id_of_nth_child": sum(1 for r in results if r.get("element_id_of_nth_child_match", 0.0) == 1.0) / total,
"selected_element_is_correct": sum(1 for r in results if r.get("selected_element_correct_match", 0.0) == 1.0) / total,
}
return {
"total_samples": total,
"overall_accuracy": overall_accuracy,
"field_accuracies": field_accuracies,
"perfect_matches": perfect_matches,
"partial_matches": total - perfect_matches
}
# Example usage and testing
if __name__ == "__main__":
print("🚀 Testing Index Caching Evaluator...")
evaluator = IndexCachingEvaluator()
# Test cases
test_cases = [
# (predicted, expected, should_be_perfect)
(
'{"is_index_based": true, "index_value": 1, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": true}',
{"is_index_based": True, "index_value": 1, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": True},
True
),
(
'{"is_index_based": false, "index_value": null, "parent_element_id": null, "element_id_of_nth_child_of_parent": null, "selected_element_is_correct": true}',
{"is_index_based": False, "index_value": None, "parent_element_id": None, "element_id_of_nth_child_of_parent": None, "selected_element_is_correct": True},
True
),
(
'{"is_index_based": true, "index_value": 3, "parent_element_id": null, "element_id_of_nth_child_of_parent": "aaaaaw", "selected_element_is_correct": true}',
{"is_index_based": True, "index_value": 3, "parent_element_id": None, "element_id_of_nth_child_of_parent": "aaaaaw", "selected_element_is_correct": True},
True
),
(
'{"is_index_based": true, "index_value": 2, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": true}',
{"is_index_based": True, "index_value": 1, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": True},
False # index_value mismatch
),
]
print("\n📝 Running test cases:")
print("-" * 80)
results = []
for predicted, expected, should_be_perfect in test_cases:
result = evaluator.evaluate(predicted, expected)
is_perfect = result["composite_score"] == 1.0
status = "✅" if is_perfect == should_be_perfect else "❌"
print(f"{status} Test: Perfect match = {is_perfect} (expected {should_be_perfect})")
print(f" Score: {result['composite_score']:.2f}")
print()
results.append(result)
# Summary
print("\n📊 Summary:")
summary = evaluator.get_evaluation_summary(results)
print(f" Total: {summary['total_samples']}")
print(f" Perfect matches: {summary['perfect_matches']}")
print(f" Overall accuracy: {summary['overall_accuracy']:.1%}")
print(f" Field accuracies:")
for field, acc in summary['field_accuracies'].items():
print(f" {field}: {acc:.1%}")