File size: 16,166 Bytes
f504b2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
"""
GAIA Error Analysis Framework

Categorizes questions, failure modes, and generates actionable improvement recommendations.
Implements TDD test suite specifications from tests/test_error_analysis.py
"""

import csv
import json
import re
from dataclasses import dataclass, asdict
from enum import Enum
from typing import List, Dict, Optional, Any
from collections import defaultdict, Counter


class QuestionType(Enum):
    """Categories of GAIA questions"""
    MATH = "math"
    FILE = "file"
    WEB = "web"
    IMAGE = "image"
    AUDIO = "audio"
    REASONING = "reasoning"
    MULTIMODAL = "multimodal"
    UNKNOWN = "unknown"


class FailureMode(Enum):
    """Categories of answer failures"""
    WRONG_ANSWER = "wrong_answer"
    FORMATTING_ERROR = "formatting_error"
    TIMEOUT = "timeout"
    TOOL_FAILURE = "tool_failure"
    EMPTY_RESPONSE = "empty_response"


@dataclass
class TestResult:
    """Represents a single test result"""
    question_id: str
    question: str
    question_type: QuestionType
    expected: str
    actual: str
    success: bool
    failure_mode: Optional[FailureMode] = None
    time_elapsed: float = 0.0
    tools_used: Optional[List[str]] = None
    error: Optional[Exception] = None

    def __post_init__(self):
        if self.tools_used is None:
            self.tools_used = []


class GAIATestAnalyzer:
    """
    Analyzes GAIA agent test results to identify failure patterns and recommend improvements.

    This class implements error categorization, performance tracking, and reporting
    to guide agent optimization efforts.
    """

    def __init__(self):
        self.results: List[TestResult] = []

        # Patterns for question classification
        self.math_patterns = [
            r'\d+\s*[\+\-\*\/]\s*\d+',  # Arithmetic operations with numbers
            r'calculate|compute|sum|multiply|divide|subtract|add',
            r'what is \d+',
            r'how many|how much'
        ]

        self.file_patterns = [
            r'pdf|csv|excel|spreadsheet|document|table|file',
            r'attached|according to the',
        ]

        self.image_patterns = [
            r'image|picture|photo|screenshot|attached.*color|in the (attached )?image'
        ]

        self.audio_patterns = [
            r'audio|recording|sound|said in|spoken|voice'
        ]

        self.web_patterns = [
            r'who is|what is the (current|latest)|CEO|president|founded|website',
            r'according to.*wikipedia|look up'
        ]

        self.reasoning_patterns = [
            r'if .+ then|taller than|shorter than|before|after',
            r'who is the (tallest|shortest|oldest|youngest)',
        ]

        self.multimodal_patterns = [
            r'(image|picture|photo).*(csv|file|data|spreadsheet)',
            r'(csv|file|data|spreadsheet).*(image|picture|photo)',
            r'using the .+ and the'
        ]

    def classify_question_type(self, question: str) -> QuestionType:
        """
        Classify a question into a QuestionType based on its content.

        Args:
            question: The question text to classify

        Returns:
            QuestionType enum value
        """
        question_lower = question.lower()

        # Check multimodal first (highest priority)
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.multimodal_patterns):
            return QuestionType.MULTIMODAL

        # Check for image questions
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.image_patterns):
            return QuestionType.IMAGE

        # Check for audio questions
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.audio_patterns):
            return QuestionType.AUDIO

        # Check for file questions
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.file_patterns):
            return QuestionType.FILE

        # Check for math questions
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.math_patterns):
            return QuestionType.MATH

        # Check for reasoning questions
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.reasoning_patterns):
            return QuestionType.REASONING

        # Check for web search questions
        if any(re.search(pattern, question_lower, re.IGNORECASE)
               for pattern in self.web_patterns):
            return QuestionType.WEB

        return QuestionType.UNKNOWN

    def classify_failure_mode(
        self,
        expected: str,
        actual: Optional[str],
        error: Optional[Exception] = None
    ) -> FailureMode:
        """
        Classify why an answer failed.

        Args:
            expected: The correct answer
            actual: The agent's answer (None if error occurred)
            error: Exception if one occurred

        Returns:
            FailureMode enum value
        """
        # Check for exceptions first
        if error is not None:
            if isinstance(error, TimeoutError):
                return FailureMode.TIMEOUT
            else:
                return FailureMode.TOOL_FAILURE

        # Check for empty/unable responses
        if actual is None or actual.strip() == "":
            return FailureMode.EMPTY_RESPONSE

        if "unable to determine" in actual.lower():
            return FailureMode.EMPTY_RESPONSE

        # Check for formatting errors
        expected_clean = expected.strip().lower()
        actual_clean = actual.strip().lower()

        # Remove commas and check if answers match
        expected_no_comma = expected_clean.replace(',', '')
        actual_no_comma = actual_clean.replace(',', '')
        if expected_no_comma == actual_no_comma and expected_clean != actual_clean:
            return FailureMode.FORMATTING_ERROR

        # Check for unwanted units
        if actual_clean.startswith(expected_clean):
            remainder = actual_clean[len(expected_clean):].strip()
            if remainder:  # Has extra content (likely units)
                return FailureMode.FORMATTING_ERROR

        # Check for articles (the, a, an)
        articles = ['the ', 'a ', 'an ']
        for article in articles:
            if actual_clean.startswith(article):
                without_article = actual_clean[len(article):]
                if without_article == expected_clean:
                    return FailureMode.FORMATTING_ERROR

        # If none of the above, it's a wrong answer
        return FailureMode.WRONG_ANSWER

    def log_result(self, result: TestResult):
        """
        Add a test result to the analyzer.

        Args:
            result: TestResult object to log
        """
        self.results.append(result)

    def analyze_response(
        self,
        question_id: str,
        question: str,
        expected: str,
        actual: str,
        time_elapsed: float = 0.0,
        tools_used: Optional[List[str]] = None,
        error: Optional[Exception] = None
    ) -> TestResult:
        """
        Analyze a single agent response and create a TestResult.

        This is a convenience method that combines classification and logging.

        Args:
            question_id: Unique identifier for the question
            question: The question text
            expected: The correct answer
            actual: The agent's answer
            time_elapsed: Time taken to answer
            tools_used: List of tools used by the agent
            error: Exception if one occurred

        Returns:
            TestResult object with all classifications
        """
        question_type = self.classify_question_type(question)
        success = (actual == expected) if actual is not None else False

        failure_mode = None
        if not success:
            failure_mode = self.classify_failure_mode(expected, actual, error)

        result = TestResult(
            question_id=question_id,
            question=question,
            question_type=question_type,
            expected=expected,
            actual=actual,
            success=success,
            failure_mode=failure_mode,
            time_elapsed=time_elapsed,
            tools_used=tools_used or [],
            error=error
        )

        self.log_result(result)
        return result

    def generate_summary(self) -> Dict[str, Any]:
        """
        Generate summary statistics for all logged results.

        Returns:
            Dictionary with summary statistics
        """
        if not self.results:
            return {
                "total_questions": 0,
                "correct_count": 0,
                "accuracy": 0.0,
                "avg_time": 0.0
            }

        total = len(self.results)
        correct = sum(1 for r in self.results if r.success)
        total_time = sum(r.time_elapsed for r in self.results)

        return {
            "total_questions": total,
            "correct_count": correct,
            "accuracy": correct / total if total > 0 else 0.0,
            "avg_time": total_time / total if total > 0 else 0.0
        }

    def get_accuracy_by_type(self) -> Dict[QuestionType, float]:
        """
        Calculate accuracy broken down by question type.

        Returns:
            Dictionary mapping QuestionType to accuracy (0.0-1.0)
        """
        type_stats = defaultdict(lambda: {"correct": 0, "total": 0})

        for result in self.results:
            stats = type_stats[result.question_type]
            stats["total"] += 1
            if result.success:
                stats["correct"] += 1

        accuracy_by_type = {}
        for qtype, stats in type_stats.items():
            accuracy_by_type[qtype] = (
                stats["correct"] / stats["total"] if stats["total"] > 0 else 0.0
            )

        return accuracy_by_type

    def get_failures_by_mode(self) -> Dict[FailureMode, int]:
        """
        Count failures by failure mode.

        Returns:
            Dictionary mapping FailureMode to count
        """
        failure_counts = Counter()

        for result in self.results:
            if not result.success and result.failure_mode:
                failure_counts[result.failure_mode] += 1

        return dict(failure_counts)

    def export_to_csv(self, filepath: str):
        """
        Export all results to a CSV file.

        Args:
            filepath: Path to output CSV file
        """
        with open(filepath, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)

            # Write header
            writer.writerow([
                'question_id', 'question', 'question_type', 'expected', 'actual',
                'success', 'failure_mode', 'time_elapsed', 'tools_used'
            ])

            # Write results
            for result in self.results:
                writer.writerow([
                    result.question_id,
                    result.question,
                    result.question_type.value.upper(),
                    result.expected,
                    result.actual,
                    result.success,
                    result.failure_mode.value.upper() if result.failure_mode else '',
                    result.time_elapsed,
                    ','.join(result.tools_used) if result.tools_used else ''
                ])

    def export_to_json(self, filepath: str):
        """
        Export all results and summary to a JSON file.

        Args:
            filepath: Path to output JSON file
        """
        data = {
            "summary": self.generate_summary(),
            "accuracy_by_type": {
                qtype.value: acc
                for qtype, acc in self.get_accuracy_by_type().items()
            },
            "failures_by_mode": {
                mode.value: count
                for mode, count in self.get_failures_by_mode().items()
            },
            "results": [
                {
                    "question_id": r.question_id,
                    "question": r.question,
                    "question_type": r.question_type.value,
                    "expected": r.expected,
                    "actual": r.actual,
                    "success": r.success,
                    "failure_mode": r.failure_mode.value if r.failure_mode else None,
                    "time_elapsed": r.time_elapsed,
                    "tools_used": r.tools_used
                }
                for r in self.results
            ]
        }

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2)

    def get_recommendations(self) -> List[str]:
        """
        Generate actionable recommendations based on failure analysis.

        Returns:
            List of recommendation strings
        """
        recommendations = []

        # Analyze question types with low accuracy
        accuracy_by_type = self.get_accuracy_by_type()
        failures_by_mode = self.get_failures_by_mode()

        # Check for image-related failures
        image_results = [r for r in self.results if r.question_type == QuestionType.IMAGE]
        if image_results:
            image_accuracy = accuracy_by_type.get(QuestionType.IMAGE, 0.0)
            if image_accuracy < 0.5:
                recommendations.append(
                    "Add vision capabilities (Gemini 2.5 Pro) to handle image questions"
                )

        # Check for file processing failures
        file_results = [r for r in self.results if r.question_type == QuestionType.FILE]
        if file_results:
            file_accuracy = accuracy_by_type.get(QuestionType.FILE, 0.0)
            if file_accuracy < 0.5:
                recommendations.append(
                    "Implement file processing capabilities (PDF/CSV/Excel parsing)"
                )

        # Check for math failures
        math_results = [r for r in self.results if r.question_type == QuestionType.MATH]
        if math_results:
            math_accuracy = accuracy_by_type.get(QuestionType.MATH, 0.0)
            if math_accuracy < 0.7:
                recommendations.append(
                    "Add code execution capabilities for reliable math calculations"
                )

        # Check for formatting errors
        formatting_errors = failures_by_mode.get(FailureMode.FORMATTING_ERROR, 0)
        if formatting_errors > len(self.results) * 0.1:  # More than 10% formatting errors
            recommendations.append(
                "Improve answer formatting logic to handle commas, units, and articles"
            )

        # Check for empty responses
        empty_responses = failures_by_mode.get(FailureMode.EMPTY_RESPONSE, 0)
        if empty_responses > len(self.results) * 0.1:
            recommendations.append(
                "Improve tool reliability and add fallback mechanisms for empty responses"
            )

        # Check for timeouts
        timeouts = failures_by_mode.get(FailureMode.TIMEOUT, 0)
        if timeouts > len(self.results) * 0.05:
            recommendations.append(
                "Optimize query speed and increase timeout thresholds for complex questions"
            )

        # Check for audio processing
        audio_results = [r for r in self.results if r.question_type == QuestionType.AUDIO]
        if audio_results:
            audio_accuracy = accuracy_by_type.get(QuestionType.AUDIO, 0.0)
            if audio_accuracy < 0.5:
                recommendations.append(
                    "Add audio transcription capabilities (Whisper)"
                )

        # Check for multimodal questions
        multimodal_results = [r for r in self.results if r.question_type == QuestionType.MULTIMODAL]
        if multimodal_results:
            multimodal_accuracy = accuracy_by_type.get(QuestionType.MULTIMODAL, 0.0)
            if multimodal_accuracy < 0.5:
                recommendations.append(
                    "Improve multimodal reasoning by integrating multiple tool outputs"
                )

        return recommendations