""" Index Caching Evaluator for Index-Based Element Selection Use Case Evaluates predicted index caching results against expected results. Compares all 5 fields with equal weight: - is_index_based - index_value - parent_element_id - element_id_of_nth_child_of_parent - selected_element_is_correct """ from typing import Dict, Any, Optional import json import re import logging from .base_evaluator import BaseEvaluator class IndexCachingEvaluator(BaseEvaluator): """ Evaluator for index caching use case. Features: - Compares all 5 fields with equal weight (20% each) - Parses JSON from LLM response - Handles null values correctly - Returns detailed field-by-field comparison """ def __init__(self, metric_weights: Optional[Dict[str, float]] = None): """ Initialize index caching evaluator. Args: metric_weights: Weights for evaluation metrics Default: Equal weight for all 5 fields (0.2 each) """ # Each field gets 20% weight (5 fields * 0.2 = 1.0) default_weights = { "is_index_based_match": 0.2, "index_value_match": 0.2, "parent_element_id_match": 0.2, "element_id_of_nth_child_match": 0.2, "selected_element_correct_match": 0.2, } weights = metric_weights or default_weights super().__init__(metric_weights=weights) def evaluate(self, predicted: str, expected: str) -> Dict[str, float]: """ Evaluate predicted index caching result against expected result. Args: predicted: LLM's output (JSON string with all 5 fields) expected: Expected output (JSON string or dict with all 5 fields) Returns: Dictionary with evaluation metrics: { "is_index_based_match": 1.0 or 0.0, "index_value_match": 1.0 or 0.0, "parent_element_id_match": 1.0 or 0.0, "element_id_of_nth_child_match": 1.0 or 0.0, "selected_element_correct_match": 1.0 or 0.0, "composite_score": 0.0 to 1.0, "predicted_output": str, "expected_output": str, "field_scores": {...}, "evaluation_reason": str } """ if not predicted or not expected: return { "is_index_based_match": 0.0, "index_value_match": 0.0, "parent_element_id_match": 0.0, "element_id_of_nth_child_match": 0.0, "selected_element_correct_match": 0.0, "composite_score": 0.0, "predicted_output": str(predicted).strip() if predicted else "", "expected_output": str(expected).strip() if expected else "", "field_scores": {}, "evaluation_reason": "❌ Empty or missing input/output" } # Parse expected (could be JSON string or dict) try: if isinstance(expected, str): expected_dict = json.loads(expected) else: expected_dict = expected except (json.JSONDecodeError, TypeError): # If expected is already a dict from dataset expected_dict = expected if isinstance(expected, dict) else {} # Parse predicted (must be JSON string) try: predicted_dict = self._parse_json_response(predicted) except Exception as e: # Log the actual response for debugging response_preview = predicted[:200] if predicted else "(empty)" self.logger.warning(f"Failed to parse predicted JSON: {e}") self.logger.warning(f"Response preview: {response_preview}...") predicted_dict = {} # NOTE: "notes" field is present in the output but is NOT used for scoring or reflection # It's kept for reference but ignored in evaluation # Compare each field (only the 5 core fields, ignoring "notes") field_scores = {} field_reasons = [] # 1. is_index_based (boolean) pred_is_index = predicted_dict.get("is_index_based") exp_is_index = expected_dict.get("is_index_based") is_index_match = (pred_is_index == exp_is_index) if (pred_is_index is not None and exp_is_index is not None) else False field_scores["is_index_based"] = 1.0 if is_index_match else 0.0 field_reasons.append(f"is_index_based: {pred_is_index} vs {exp_is_index} → {'✅' if is_index_match else '❌'}") # 2. index_value (int or null) pred_index_val = predicted_dict.get("index_value") exp_index_val = expected_dict.get("index_value") # Handle null/None comparison index_val_match = (pred_index_val == exp_index_val) or (pred_index_val is None and exp_index_val is None) field_scores["index_value"] = 1.0 if index_val_match else 0.0 field_reasons.append(f"index_value: {pred_index_val} vs {exp_index_val} → {'✅' if index_val_match else '❌'}") # 3. parent_element_id (string or null) pred_parent = predicted_dict.get("parent_element_id") exp_parent = expected_dict.get("parent_element_id") # Handle null/None comparison parent_match = (pred_parent == exp_parent) or (pred_parent is None and exp_parent is None) field_scores["parent_element_id"] = 1.0 if parent_match else 0.0 field_reasons.append(f"parent_element_id: {pred_parent} vs {exp_parent} → {'✅' if parent_match else '❌'}") # 4. element_id_of_nth_child_of_parent (string or null) pred_element = predicted_dict.get("element_id_of_nth_child_of_parent") exp_element = expected_dict.get("element_id_of_nth_child_of_parent") # Handle null/None comparison element_match = (pred_element == exp_element) or (pred_element is None and exp_element is None) field_scores["element_id_of_nth_child_of_parent"] = 1.0 if element_match else 0.0 field_reasons.append(f"element_id_of_nth_child: {pred_element} vs {exp_element} → {'✅' if element_match else '❌'}") # 5. selected_element_is_correct (boolean) pred_selected = predicted_dict.get("selected_element_is_correct") exp_selected = expected_dict.get("selected_element_is_correct") selected_match = (pred_selected == exp_selected) if (pred_selected is not None and exp_selected is not None) else False field_scores["selected_element_is_correct"] = 1.0 if selected_match else 0.0 field_reasons.append(f"selected_element_is_correct: {pred_selected} vs {exp_selected} → {'✅' if selected_match else '❌'}") # Calculate composite score (weighted average) composite_score = ( field_scores["is_index_based"] * 0.2 + field_scores["index_value"] * 0.2 + field_scores["parent_element_id"] * 0.2 + field_scores["element_id_of_nth_child_of_parent"] * 0.2 + field_scores["selected_element_is_correct"] * 0.2 ) # Build evaluation reason all_match = composite_score == 1.0 reason = "✅ All fields match!" if all_match else f"❌ Partial match ({composite_score:.1%})" reason += "\n" + "\n".join(f" {r}" for r in field_reasons) # Log evaluation details self.logger.info(f"\n{'─'*70}") self.logger.info(f"📊 INDEX CACHING EVALUATION") self.logger.info(f"{'─'*70}") self.logger.info(f" 🎯 COMPOSITE SCORE: {composite_score:.2f} ({composite_score:.1%})") for field, score in field_scores.items(): status = "✅" if score == 1.0 else "❌" self.logger.info(f" {status} {field}: {score:.0f}") self.logger.info(f"{'─'*70}\n") return { "is_index_based_match": field_scores["is_index_based"], "index_value_match": field_scores["index_value"], "parent_element_id_match": field_scores["parent_element_id"], "element_id_of_nth_child_match": field_scores["element_id_of_nth_child_of_parent"], "selected_element_correct_match": field_scores["selected_element_is_correct"], "composite_score": composite_score, "predicted_output": predicted, "expected_output": json.dumps(expected_dict) if isinstance(expected_dict, dict) else str(expected), "predicted_dict": predicted_dict, "expected_dict": expected_dict, "field_scores": field_scores, "evaluation_reason": reason } def _parse_json_response(self, response: str) -> Dict[str, Any]: """ Parse JSON from LLM response, handling markdown code blocks and various formats. Args: response: LLM response string (may contain markdown) Returns: Parsed JSON dictionary (empty dict if parsing fails) """ if not response or not isinstance(response, str): return {} response = response.strip() # If response is empty, return empty dict if not response: return {} # Strategy 1: Try to extract JSON from markdown code block json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL) if json_match: try: json_str = json_match.group(1).strip() return json.loads(json_str) except json.JSONDecodeError: pass # Strategy 2: Find JSON object in response (handle nested braces) json_start = response.find('{') if json_start != -1: # Find matching closing brace brace_count = 0 json_end = json_start for i in range(json_start, len(response)): if response[i] == '{': brace_count += 1 elif response[i] == '}': brace_count -= 1 if brace_count == 0: json_end = i + 1 break if brace_count == 0: json_str = response[json_start:json_end] try: return json.loads(json_str) except json.JSONDecodeError: pass # Strategy 3: Try to find any JSON-like structure (more lenient) # Look for patterns like {"key": "value"} even if not perfectly formatted json_pattern = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response, re.DOTALL) if json_pattern: try: return json.loads(json_pattern.group(0)) except json.JSONDecodeError: pass # Strategy 4: Try parsing entire response as JSON try: return json.loads(response) except json.JSONDecodeError: pass # If all strategies fail, return empty dict self.logger.debug(f"Could not parse JSON from response: {response[:100]}...") return {} def get_evaluation_summary(self, results: list) -> Dict[str, Any]: """ Get summary statistics for a batch of evaluations. Args: results: List of evaluation result dictionaries Returns: Summary statistics including accuracy per field and overall """ if not results: return { "total_samples": 0, "overall_accuracy": 0.0, "field_accuracies": {}, "perfect_matches": 0 } total = len(results) perfect_matches = sum(1 for r in results if r.get("composite_score", 0.0) == 1.0) overall_accuracy = perfect_matches / total if total > 0 else 0.0 # Calculate accuracy per field field_accuracies = { "is_index_based": sum(1 for r in results if r.get("is_index_based_match", 0.0) == 1.0) / total, "index_value": sum(1 for r in results if r.get("index_value_match", 0.0) == 1.0) / total, "parent_element_id": sum(1 for r in results if r.get("parent_element_id_match", 0.0) == 1.0) / total, "element_id_of_nth_child": sum(1 for r in results if r.get("element_id_of_nth_child_match", 0.0) == 1.0) / total, "selected_element_is_correct": sum(1 for r in results if r.get("selected_element_correct_match", 0.0) == 1.0) / total, } return { "total_samples": total, "overall_accuracy": overall_accuracy, "field_accuracies": field_accuracies, "perfect_matches": perfect_matches, "partial_matches": total - perfect_matches } # Example usage and testing if __name__ == "__main__": print("🚀 Testing Index Caching Evaluator...") evaluator = IndexCachingEvaluator() # Test cases test_cases = [ # (predicted, expected, should_be_perfect) ( '{"is_index_based": true, "index_value": 1, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": true}', {"is_index_based": True, "index_value": 1, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": True}, True ), ( '{"is_index_based": false, "index_value": null, "parent_element_id": null, "element_id_of_nth_child_of_parent": null, "selected_element_is_correct": true}', {"is_index_based": False, "index_value": None, "parent_element_id": None, "element_id_of_nth_child_of_parent": None, "selected_element_is_correct": True}, True ), ( '{"is_index_based": true, "index_value": 3, "parent_element_id": null, "element_id_of_nth_child_of_parent": "aaaaaw", "selected_element_is_correct": true}', {"is_index_based": True, "index_value": 3, "parent_element_id": None, "element_id_of_nth_child_of_parent": "aaaaaw", "selected_element_is_correct": True}, True ), ( '{"is_index_based": true, "index_value": 2, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": true}', {"is_index_based": True, "index_value": 1, "parent_element_id": "aaaabf", "element_id_of_nth_child_of_parent": "aaaabg", "selected_element_is_correct": True}, False # index_value mismatch ), ] print("\n📝 Running test cases:") print("-" * 80) results = [] for predicted, expected, should_be_perfect in test_cases: result = evaluator.evaluate(predicted, expected) is_perfect = result["composite_score"] == 1.0 status = "✅" if is_perfect == should_be_perfect else "❌" print(f"{status} Test: Perfect match = {is_perfect} (expected {should_be_perfect})") print(f" Score: {result['composite_score']:.2f}") print() results.append(result) # Summary print("\n📊 Summary:") summary = evaluator.get_evaluation_summary(results) print(f" Total: {summary['total_samples']}") print(f" Perfect matches: {summary['perfect_matches']}") print(f" Overall accuracy: {summary['overall_accuracy']:.1%}") print(f" Field accuracies:") for field, acc in summary['field_accuracies'].items(): print(f" {field}: {acc:.1%}")