Spaces:
Sleeping
Sleeping
| """ | |
| GAIA Error Analysis Framework | |
| Categorizes questions, failure modes, and generates actionable improvement recommendations. | |
| Implements TDD test suite specifications from tests/test_error_analysis.py | |
| """ | |
| import csv | |
| import json | |
| import re | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from typing import List, Dict, Optional, Any | |
| from collections import defaultdict, Counter | |
| class QuestionType(Enum): | |
| """Categories of GAIA questions""" | |
| MATH = "math" | |
| FILE = "file" | |
| WEB = "web" | |
| IMAGE = "image" | |
| AUDIO = "audio" | |
| REASONING = "reasoning" | |
| MULTIMODAL = "multimodal" | |
| UNKNOWN = "unknown" | |
| class FailureMode(Enum): | |
| """Categories of answer failures""" | |
| WRONG_ANSWER = "wrong_answer" | |
| FORMATTING_ERROR = "formatting_error" | |
| TIMEOUT = "timeout" | |
| TOOL_FAILURE = "tool_failure" | |
| EMPTY_RESPONSE = "empty_response" | |
| class TestResult: | |
| """Represents a single test result""" | |
| question_id: str | |
| question: str | |
| question_type: QuestionType | |
| expected: str | |
| actual: str | |
| success: bool | |
| failure_mode: Optional[FailureMode] = None | |
| time_elapsed: float = 0.0 | |
| tools_used: Optional[List[str]] = None | |
| error: Optional[Exception] = None | |
| def __post_init__(self): | |
| if self.tools_used is None: | |
| self.tools_used = [] | |
| class GAIATestAnalyzer: | |
| """ | |
| Analyzes GAIA agent test results to identify failure patterns and recommend improvements. | |
| This class implements error categorization, performance tracking, and reporting | |
| to guide agent optimization efforts. | |
| """ | |
| def __init__(self): | |
| self.results: List[TestResult] = [] | |
| # Patterns for question classification | |
| self.math_patterns = [ | |
| r'\d+\s*[\+\-\*\/]\s*\d+', # Arithmetic operations with numbers | |
| r'calculate|compute|sum|multiply|divide|subtract|add', | |
| r'what is \d+', | |
| r'how many|how much' | |
| ] | |
| self.file_patterns = [ | |
| r'pdf|csv|excel|spreadsheet|document|table|file', | |
| r'attached|according to the', | |
| ] | |
| self.image_patterns = [ | |
| r'image|picture|photo|screenshot|attached.*color|in the (attached )?image' | |
| ] | |
| self.audio_patterns = [ | |
| r'audio|recording|sound|said in|spoken|voice' | |
| ] | |
| self.web_patterns = [ | |
| r'who is|what is the (current|latest)|CEO|president|founded|website', | |
| r'according to.*wikipedia|look up' | |
| ] | |
| self.reasoning_patterns = [ | |
| r'if .+ then|taller than|shorter than|before|after', | |
| r'who is the (tallest|shortest|oldest|youngest)', | |
| ] | |
| self.multimodal_patterns = [ | |
| r'(image|picture|photo).*(csv|file|data|spreadsheet)', | |
| r'(csv|file|data|spreadsheet).*(image|picture|photo)', | |
| r'using the .+ and the' | |
| ] | |
| def classify_question_type(self, question: str) -> QuestionType: | |
| """ | |
| Classify a question into a QuestionType based on its content. | |
| Args: | |
| question: The question text to classify | |
| Returns: | |
| QuestionType enum value | |
| """ | |
| question_lower = question.lower() | |
| # Check multimodal first (highest priority) | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.multimodal_patterns): | |
| return QuestionType.MULTIMODAL | |
| # Check for image questions | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.image_patterns): | |
| return QuestionType.IMAGE | |
| # Check for audio questions | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.audio_patterns): | |
| return QuestionType.AUDIO | |
| # Check for file questions | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.file_patterns): | |
| return QuestionType.FILE | |
| # Check for math questions | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.math_patterns): | |
| return QuestionType.MATH | |
| # Check for reasoning questions | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.reasoning_patterns): | |
| return QuestionType.REASONING | |
| # Check for web search questions | |
| if any(re.search(pattern, question_lower, re.IGNORECASE) | |
| for pattern in self.web_patterns): | |
| return QuestionType.WEB | |
| return QuestionType.UNKNOWN | |
| def classify_failure_mode( | |
| self, | |
| expected: str, | |
| actual: Optional[str], | |
| error: Optional[Exception] = None | |
| ) -> FailureMode: | |
| """ | |
| Classify why an answer failed. | |
| Args: | |
| expected: The correct answer | |
| actual: The agent's answer (None if error occurred) | |
| error: Exception if one occurred | |
| Returns: | |
| FailureMode enum value | |
| """ | |
| # Check for exceptions first | |
| if error is not None: | |
| if isinstance(error, TimeoutError): | |
| return FailureMode.TIMEOUT | |
| else: | |
| return FailureMode.TOOL_FAILURE | |
| # Check for empty/unable responses | |
| if actual is None or actual.strip() == "": | |
| return FailureMode.EMPTY_RESPONSE | |
| if "unable to determine" in actual.lower(): | |
| return FailureMode.EMPTY_RESPONSE | |
| # Check for formatting errors | |
| expected_clean = expected.strip().lower() | |
| actual_clean = actual.strip().lower() | |
| # Remove commas and check if answers match | |
| expected_no_comma = expected_clean.replace(',', '') | |
| actual_no_comma = actual_clean.replace(',', '') | |
| if expected_no_comma == actual_no_comma and expected_clean != actual_clean: | |
| return FailureMode.FORMATTING_ERROR | |
| # Check for unwanted units | |
| if actual_clean.startswith(expected_clean): | |
| remainder = actual_clean[len(expected_clean):].strip() | |
| if remainder: # Has extra content (likely units) | |
| return FailureMode.FORMATTING_ERROR | |
| # Check for articles (the, a, an) | |
| articles = ['the ', 'a ', 'an '] | |
| for article in articles: | |
| if actual_clean.startswith(article): | |
| without_article = actual_clean[len(article):] | |
| if without_article == expected_clean: | |
| return FailureMode.FORMATTING_ERROR | |
| # If none of the above, it's a wrong answer | |
| return FailureMode.WRONG_ANSWER | |
| def log_result(self, result: TestResult): | |
| """ | |
| Add a test result to the analyzer. | |
| Args: | |
| result: TestResult object to log | |
| """ | |
| self.results.append(result) | |
| def analyze_response( | |
| self, | |
| question_id: str, | |
| question: str, | |
| expected: str, | |
| actual: str, | |
| time_elapsed: float = 0.0, | |
| tools_used: Optional[List[str]] = None, | |
| error: Optional[Exception] = None | |
| ) -> TestResult: | |
| """ | |
| Analyze a single agent response and create a TestResult. | |
| This is a convenience method that combines classification and logging. | |
| Args: | |
| question_id: Unique identifier for the question | |
| question: The question text | |
| expected: The correct answer | |
| actual: The agent's answer | |
| time_elapsed: Time taken to answer | |
| tools_used: List of tools used by the agent | |
| error: Exception if one occurred | |
| Returns: | |
| TestResult object with all classifications | |
| """ | |
| question_type = self.classify_question_type(question) | |
| success = (actual == expected) if actual is not None else False | |
| failure_mode = None | |
| if not success: | |
| failure_mode = self.classify_failure_mode(expected, actual, error) | |
| result = TestResult( | |
| question_id=question_id, | |
| question=question, | |
| question_type=question_type, | |
| expected=expected, | |
| actual=actual, | |
| success=success, | |
| failure_mode=failure_mode, | |
| time_elapsed=time_elapsed, | |
| tools_used=tools_used or [], | |
| error=error | |
| ) | |
| self.log_result(result) | |
| return result | |
| def generate_summary(self) -> Dict[str, Any]: | |
| """ | |
| Generate summary statistics for all logged results. | |
| Returns: | |
| Dictionary with summary statistics | |
| """ | |
| if not self.results: | |
| return { | |
| "total_questions": 0, | |
| "correct_count": 0, | |
| "accuracy": 0.0, | |
| "avg_time": 0.0 | |
| } | |
| total = len(self.results) | |
| correct = sum(1 for r in self.results if r.success) | |
| total_time = sum(r.time_elapsed for r in self.results) | |
| return { | |
| "total_questions": total, | |
| "correct_count": correct, | |
| "accuracy": correct / total if total > 0 else 0.0, | |
| "avg_time": total_time / total if total > 0 else 0.0 | |
| } | |
| def get_accuracy_by_type(self) -> Dict[QuestionType, float]: | |
| """ | |
| Calculate accuracy broken down by question type. | |
| Returns: | |
| Dictionary mapping QuestionType to accuracy (0.0-1.0) | |
| """ | |
| type_stats = defaultdict(lambda: {"correct": 0, "total": 0}) | |
| for result in self.results: | |
| stats = type_stats[result.question_type] | |
| stats["total"] += 1 | |
| if result.success: | |
| stats["correct"] += 1 | |
| accuracy_by_type = {} | |
| for qtype, stats in type_stats.items(): | |
| accuracy_by_type[qtype] = ( | |
| stats["correct"] / stats["total"] if stats["total"] > 0 else 0.0 | |
| ) | |
| return accuracy_by_type | |
| def get_failures_by_mode(self) -> Dict[FailureMode, int]: | |
| """ | |
| Count failures by failure mode. | |
| Returns: | |
| Dictionary mapping FailureMode to count | |
| """ | |
| failure_counts = Counter() | |
| for result in self.results: | |
| if not result.success and result.failure_mode: | |
| failure_counts[result.failure_mode] += 1 | |
| return dict(failure_counts) | |
| def export_to_csv(self, filepath: str): | |
| """ | |
| Export all results to a CSV file. | |
| Args: | |
| filepath: Path to output CSV file | |
| """ | |
| with open(filepath, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| # Write header | |
| writer.writerow([ | |
| 'question_id', 'question', 'question_type', 'expected', 'actual', | |
| 'success', 'failure_mode', 'time_elapsed', 'tools_used' | |
| ]) | |
| # Write results | |
| for result in self.results: | |
| writer.writerow([ | |
| result.question_id, | |
| result.question, | |
| result.question_type.value.upper(), | |
| result.expected, | |
| result.actual, | |
| result.success, | |
| result.failure_mode.value.upper() if result.failure_mode else '', | |
| result.time_elapsed, | |
| ','.join(result.tools_used) if result.tools_used else '' | |
| ]) | |
| def export_to_json(self, filepath: str): | |
| """ | |
| Export all results and summary to a JSON file. | |
| Args: | |
| filepath: Path to output JSON file | |
| """ | |
| data = { | |
| "summary": self.generate_summary(), | |
| "accuracy_by_type": { | |
| qtype.value: acc | |
| for qtype, acc in self.get_accuracy_by_type().items() | |
| }, | |
| "failures_by_mode": { | |
| mode.value: count | |
| for mode, count in self.get_failures_by_mode().items() | |
| }, | |
| "results": [ | |
| { | |
| "question_id": r.question_id, | |
| "question": r.question, | |
| "question_type": r.question_type.value, | |
| "expected": r.expected, | |
| "actual": r.actual, | |
| "success": r.success, | |
| "failure_mode": r.failure_mode.value if r.failure_mode else None, | |
| "time_elapsed": r.time_elapsed, | |
| "tools_used": r.tools_used | |
| } | |
| for r in self.results | |
| ] | |
| } | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2) | |
| def get_recommendations(self) -> List[str]: | |
| """ | |
| Generate actionable recommendations based on failure analysis. | |
| Returns: | |
| List of recommendation strings | |
| """ | |
| recommendations = [] | |
| # Analyze question types with low accuracy | |
| accuracy_by_type = self.get_accuracy_by_type() | |
| failures_by_mode = self.get_failures_by_mode() | |
| # Check for image-related failures | |
| image_results = [r for r in self.results if r.question_type == QuestionType.IMAGE] | |
| if image_results: | |
| image_accuracy = accuracy_by_type.get(QuestionType.IMAGE, 0.0) | |
| if image_accuracy < 0.5: | |
| recommendations.append( | |
| "Add vision capabilities (Gemini 2.5 Pro) to handle image questions" | |
| ) | |
| # Check for file processing failures | |
| file_results = [r for r in self.results if r.question_type == QuestionType.FILE] | |
| if file_results: | |
| file_accuracy = accuracy_by_type.get(QuestionType.FILE, 0.0) | |
| if file_accuracy < 0.5: | |
| recommendations.append( | |
| "Implement file processing capabilities (PDF/CSV/Excel parsing)" | |
| ) | |
| # Check for math failures | |
| math_results = [r for r in self.results if r.question_type == QuestionType.MATH] | |
| if math_results: | |
| math_accuracy = accuracy_by_type.get(QuestionType.MATH, 0.0) | |
| if math_accuracy < 0.7: | |
| recommendations.append( | |
| "Add code execution capabilities for reliable math calculations" | |
| ) | |
| # Check for formatting errors | |
| formatting_errors = failures_by_mode.get(FailureMode.FORMATTING_ERROR, 0) | |
| if formatting_errors > len(self.results) * 0.1: # More than 10% formatting errors | |
| recommendations.append( | |
| "Improve answer formatting logic to handle commas, units, and articles" | |
| ) | |
| # Check for empty responses | |
| empty_responses = failures_by_mode.get(FailureMode.EMPTY_RESPONSE, 0) | |
| if empty_responses > len(self.results) * 0.1: | |
| recommendations.append( | |
| "Improve tool reliability and add fallback mechanisms for empty responses" | |
| ) | |
| # Check for timeouts | |
| timeouts = failures_by_mode.get(FailureMode.TIMEOUT, 0) | |
| if timeouts > len(self.results) * 0.05: | |
| recommendations.append( | |
| "Optimize query speed and increase timeout thresholds for complex questions" | |
| ) | |
| # Check for audio processing | |
| audio_results = [r for r in self.results if r.question_type == QuestionType.AUDIO] | |
| if audio_results: | |
| audio_accuracy = accuracy_by_type.get(QuestionType.AUDIO, 0.0) | |
| if audio_accuracy < 0.5: | |
| recommendations.append( | |
| "Add audio transcription capabilities (Whisper)" | |
| ) | |
| # Check for multimodal questions | |
| multimodal_results = [r for r in self.results if r.question_type == QuestionType.MULTIMODAL] | |
| if multimodal_results: | |
| multimodal_accuracy = accuracy_by_type.get(QuestionType.MULTIMODAL, 0.0) | |
| if multimodal_accuracy < 0.5: | |
| recommendations.append( | |
| "Improve multimodal reasoning by integrating multiple tool outputs" | |
| ) | |
| return recommendations | |