| | |
| | """ |
| | File Processor Agent for GAIA Agent System |
| | Handles file-based questions with intelligent processing strategies |
| | """ |
| |
|
| | import os |
| | import logging |
| | from typing import Dict, List, Optional, Any |
| | from pathlib import Path |
| |
|
| | from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult |
| | from models.qwen_client import QwenClient, ModelTier |
| | from tools.file_processor import FileProcessorTool |
| | from tools.calculator import CalculatorTool |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class FileProcessorAgent: |
| | """ |
| | Specialized agent for file processing tasks |
| | Handles images, audio, CSV/Excel, Python code, and other file types |
| | """ |
| | |
| | def __init__(self, llm_client: QwenClient): |
| | self.llm_client = llm_client |
| | self.file_processor = FileProcessorTool() |
| | self.calculator = CalculatorTool() |
| | |
| | def process(self, state: GAIAAgentState) -> GAIAAgentState: |
| | """ |
| | Process file-based questions using file analysis tools |
| | """ |
| | logger.info(f"File processor processing: {state.question[:100]}...") |
| | state.add_processing_step("File Processor: Starting file analysis") |
| | |
| | try: |
| | |
| | if not state.file_path or not os.path.exists(state.file_path): |
| | error_msg = f"File not found: {state.file_path}" |
| | state.add_error(error_msg) |
| | result = self._create_failure_result(error_msg) |
| | state.add_agent_result(result) |
| | return state |
| | |
| | |
| | strategy = self._determine_processing_strategy(state.question, state.file_path) |
| | state.add_processing_step(f"File Processor: Strategy = {strategy}") |
| | |
| | |
| | if strategy == "image_analysis": |
| | result = self._process_image(state) |
| | elif strategy == "data_analysis": |
| | result = self._process_data_file(state) |
| | elif strategy == "code_analysis": |
| | result = self._process_code_file(state) |
| | elif strategy == "audio_analysis": |
| | result = self._process_audio_file(state) |
| | elif strategy == "text_analysis": |
| | result = self._process_text_file(state) |
| | else: |
| | result = self._process_generic_file(state) |
| | |
| | |
| | state.add_agent_result(result) |
| | state.add_processing_step(f"File Processor: Completed with confidence {result.confidence:.2f}") |
| | |
| | return state |
| | |
| | except Exception as e: |
| | error_msg = f"File processing failed: {str(e)}" |
| | state.add_error(error_msg) |
| | logger.error(error_msg) |
| | |
| | |
| | failure_result = self._create_failure_result(error_msg) |
| | state.add_agent_result(failure_result) |
| | return state |
| | |
| | def _determine_processing_strategy(self, question: str, file_path: str) -> str: |
| | """Determine the best processing strategy based on file type and question""" |
| | |
| | file_extension = Path(file_path).suffix.lower() |
| | question_lower = question.lower() |
| | |
| | |
| | if file_extension in {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}: |
| | return "image_analysis" |
| | |
| | |
| | if file_extension in {'.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac'}: |
| | return "audio_analysis" |
| | |
| | |
| | if file_extension in {'.csv', '.xlsx', '.xls', '.json'}: |
| | return "data_analysis" |
| | |
| | |
| | if file_extension in {'.py', '.js', '.java', '.cpp', '.c', '.html', '.css'}: |
| | return "code_analysis" |
| | |
| | |
| | if file_extension in {'.txt', '.md', '.rst'}: |
| | return "text_analysis" |
| | |
| | |
| | return "generic_analysis" |
| | |
| | def _process_image(self, state: GAIAAgentState) -> AgentResult: |
| | """Process image files and answer questions about them""" |
| | |
| | logger.info(f"Processing image: {state.file_path}") |
| | |
| | |
| | file_result = self.file_processor.execute(state.file_path) |
| | |
| | if file_result.success and file_result.result.get('success'): |
| | file_data = file_result.result['result'] |
| | |
| | |
| | analysis_prompt = f""" |
| | Based on this image analysis, please answer the following question: |
| | |
| | Question: {state.question} |
| | |
| | Image Information: |
| | - File: {file_data.get('file_path', '')} |
| | - Type: {file_data.get('file_type', '')} |
| | - Content Description: {file_data.get('content', '')} |
| | - Metadata: {file_data.get('metadata', {})} |
| | |
| | Please provide a direct answer based on the image analysis. |
| | If the question asks about specific details that cannot be determined from the metadata alone, |
| | please indicate what information is available and what would require visual analysis. |
| | """ |
| | |
| | |
| | model_tier = ModelTier.MAIN |
| | llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| | |
| | if llm_result.success: |
| | confidence = 0.75 |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=llm_result.response, |
| | confidence=confidence, |
| | reasoning="Analyzed image metadata and properties", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used=llm_result.model_used, |
| | processing_time=file_result.execution_time + llm_result.response_time, |
| | cost_estimate=llm_result.cost_estimate |
| | ) |
| | else: |
| | |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=file_data.get('content', 'Image analyzed'), |
| | confidence=0.60, |
| | reasoning="Image processed but analysis failed", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used="fallback", |
| | processing_time=file_result.execution_time, |
| | cost_estimate=0.0 |
| | ) |
| | else: |
| | return self._create_failure_result("Image processing failed") |
| | |
| | def _process_data_file(self, state: GAIAAgentState) -> AgentResult: |
| | """Process CSV/Excel files and perform data analysis""" |
| | |
| | logger.info(f"Processing data file: {state.file_path}") |
| | |
| | |
| | file_result = self.file_processor.execute(state.file_path) |
| | |
| | if file_result.success and file_result.result.get('success'): |
| | file_data = file_result.result['result'] |
| | metadata = file_data.get('metadata', {}) |
| | content = file_data.get('content', {}) |
| | |
| | |
| | question_lower = state.question.lower() |
| | needs_calculation = any(term in question_lower for term in [ |
| | 'calculate', 'sum', 'total', 'average', 'mean', 'count', |
| | 'maximum', 'minimum', 'how many', 'what is the' |
| | ]) |
| | |
| | if needs_calculation and 'sample_data' in content: |
| | return self._perform_data_calculations(state, file_data, file_result) |
| | else: |
| | return self._analyze_data_structure(state, file_data, file_result) |
| | else: |
| | return self._create_failure_result("Data file processing failed") |
| | |
| | def _perform_data_calculations(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult: |
| | """Perform calculations on data file content""" |
| | |
| | metadata = file_data.get('metadata', {}) |
| | content = file_data.get('content', {}) |
| | |
| | |
| | sample_data = content.get('sample_data', []) |
| | |
| | |
| | calculation_prompt = f""" |
| | Based on this data file and question, determine what calculations are needed: |
| | |
| | Question: {state.question} |
| | |
| | Data Structure: |
| | - Columns: {metadata.get('columns', [])} |
| | - Rows: {metadata.get('row_count', 0)} |
| | - Sample Data: {sample_data[:3]} # First 3 rows |
| | |
| | Please specify what calculations should be performed and on which columns. |
| | Respond with specific calculation instructions. |
| | """ |
| | |
| | llm_result = self.llm_client.generate(calculation_prompt, tier=ModelTier.MAIN, max_tokens=200) |
| | |
| | if llm_result.success: |
| | |
| | analysis_prompt = f""" |
| | Based on this data analysis, please answer the question: |
| | |
| | Question: {state.question} |
| | |
| | Data Summary: |
| | - File: {metadata.get('shape', [])} (rows x columns) |
| | - Columns: {metadata.get('columns', [])} |
| | - Numeric columns: {metadata.get('numeric_columns', [])} |
| | - Statistics: {metadata.get('numeric_stats', {})} |
| | - Sample data: {sample_data} |
| | |
| | Calculation guidance: {llm_result.response} |
| | |
| | Please provide the answer based on the data. |
| | """ |
| | |
| | analysis_result = self.llm_client.generate(analysis_prompt, tier=ModelTier.MAIN, max_tokens=400) |
| | |
| | if analysis_result.success: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=analysis_result.response, |
| | confidence=0.80, |
| | reasoning="Performed data analysis and calculations", |
| | tools_used=[file_result], |
| | model_used=analysis_result.model_used, |
| | processing_time=file_result.execution_time + llm_result.response_time + analysis_result.response_time, |
| | cost_estimate=llm_result.cost_estimate + analysis_result.cost_estimate |
| | ) |
| | |
| | |
| | return self._analyze_data_structure(state, file_data, file_result) |
| | |
| | def _analyze_data_structure(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult: |
| | """Analyze data file structure and content""" |
| | |
| | metadata = file_data.get('metadata', {}) |
| | content = file_data.get('content', {}) |
| | |
| | analysis_prompt = f""" |
| | Based on this data file analysis, please answer the question: |
| | |
| | Question: {state.question} |
| | |
| | Data File Information: |
| | - Structure: {metadata.get('shape', [])} (rows x columns) |
| | - Columns: {metadata.get('columns', [])} |
| | - Data types: {metadata.get('data_types', {})} |
| | - Description: {content.get('description', '')} |
| | - Sample data: {content.get('sample_data', [])} |
| | |
| | Please provide a direct answer based on the data structure and content. |
| | """ |
| | |
| | model_tier = ModelTier.MAIN |
| | llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| | |
| | if llm_result.success: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=llm_result.response, |
| | confidence=0.75, |
| | reasoning="Analyzed data file structure and content", |
| | tools_used=[file_result], |
| | model_used=llm_result.model_used, |
| | processing_time=file_result.execution_time + llm_result.response_time, |
| | cost_estimate=llm_result.cost_estimate |
| | ) |
| | else: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=content.get('description', 'Data file analyzed'), |
| | confidence=0.60, |
| | reasoning="Data file processed but analysis failed", |
| | tools_used=[file_result], |
| | model_used="fallback", |
| | processing_time=file_result.execution_time, |
| | cost_estimate=0.0 |
| | ) |
| | |
| | def _process_code_file(self, state: GAIAAgentState) -> AgentResult: |
| | """Process code files and analyze their content""" |
| | |
| | logger.info(f"Processing code file: {state.file_path}") |
| | |
| | |
| | file_result = self.file_processor.execute(state.file_path) |
| | |
| | if file_result.success and file_result.result.get('success'): |
| | file_data = file_result.result['result'] |
| | metadata = file_data.get('metadata', {}) |
| | content = file_data.get('content', {}) |
| | |
| | analysis_prompt = f""" |
| | Based on this code analysis, please answer the question: |
| | |
| | Question: {state.question} |
| | |
| | Code File Information: |
| | - Type: {file_data.get('file_type', '')} |
| | - Description: {content.get('description', '')} |
| | - Metadata: {metadata} |
| | - Code snippet: {content.get('code_snippet', '')} |
| | |
| | Please analyze the code and provide a direct answer. |
| | """ |
| | |
| | model_tier = ModelTier.MAIN |
| | llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) |
| | |
| | if llm_result.success: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=llm_result.response, |
| | confidence=0.80, |
| | reasoning="Analyzed code structure and content", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used=llm_result.model_used, |
| | processing_time=file_result.execution_time + llm_result.response_time, |
| | cost_estimate=llm_result.cost_estimate |
| | ) |
| | else: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=content.get('description', 'Code file analyzed'), |
| | confidence=0.60, |
| | reasoning="Code file processed but analysis failed", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used="fallback", |
| | processing_time=file_result.execution_time, |
| | cost_estimate=0.0 |
| | ) |
| | else: |
| | return self._create_failure_result("Code file processing failed") |
| | |
| | def _process_audio_file(self, state: GAIAAgentState) -> AgentResult: |
| | """Process audio files (basic metadata for now)""" |
| | |
| | logger.info(f"Processing audio file: {state.file_path}") |
| | |
| | |
| | file_result = self.file_processor.execute(state.file_path) |
| | |
| | if file_result.success and file_result.result.get('success'): |
| | file_data = file_result.result['result'] |
| | |
| | analysis_prompt = f""" |
| | Based on this audio file information, please answer the question: |
| | |
| | Question: {state.question} |
| | |
| | Audio File Information: |
| | - Content: {file_data.get('content', '')} |
| | - Metadata: {file_data.get('metadata', {})} |
| | |
| | Please provide an answer based on the available audio file information. |
| | Note: Full audio transcription is not currently available, but file metadata is provided. |
| | """ |
| | |
| | model_tier = ModelTier.ROUTER |
| | llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) |
| | |
| | if llm_result.success: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=llm_result.response, |
| | confidence=0.50, |
| | reasoning="Analyzed audio file metadata (transcription not available)", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used=llm_result.model_used, |
| | processing_time=file_result.execution_time + llm_result.response_time, |
| | cost_estimate=llm_result.cost_estimate |
| | ) |
| | |
| | return self._create_failure_result("Audio file processing not fully supported") |
| | |
| | def _process_text_file(self, state: GAIAAgentState) -> AgentResult: |
| | """Process text files and analyze their content""" |
| | |
| | logger.info(f"Processing text file: {state.file_path}") |
| | |
| | |
| | file_result = self.file_processor.execute(state.file_path) |
| | |
| | if file_result.success and file_result.result.get('success'): |
| | file_data = file_result.result['result'] |
| | content = file_data.get('content', {}) |
| | |
| | analysis_prompt = f""" |
| | Based on this text file content, please answer the question: |
| | |
| | Question: {state.question} |
| | |
| | Text Content: |
| | {content.get('text', '')[:2000]}... |
| | |
| | File Statistics: |
| | - Word count: {file_data.get('metadata', {}).get('word_count', 0)} |
| | - Line count: {file_data.get('metadata', {}).get('line_count', 0)} |
| | |
| | Please analyze the text and provide a direct answer. |
| | """ |
| | |
| | model_tier = ModelTier.MAIN |
| | llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| | |
| | if llm_result.success: |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=llm_result.response, |
| | confidence=0.85, |
| | reasoning="Analyzed text file content", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used=llm_result.model_used, |
| | processing_time=file_result.execution_time + llm_result.response_time, |
| | cost_estimate=llm_result.cost_estimate |
| | ) |
| | |
| | return self._create_failure_result("Text file processing failed") |
| | |
| | def _process_generic_file(self, state: GAIAAgentState) -> AgentResult: |
| | """Process unknown file types with generic analysis""" |
| | |
| | logger.info(f"Processing generic file: {state.file_path}") |
| | |
| | |
| | file_result = self.file_processor.execute(state.file_path) |
| | |
| | if file_result.success: |
| | file_data = file_result.result |
| | |
| | |
| | basic_info = f"File analyzed: {state.file_path}. " |
| | if file_data.get('success'): |
| | basic_info += f"File type: {file_data.get('result', {}).get('file_type', 'unknown')}. " |
| | basic_info += "Generic file analysis completed." |
| | else: |
| | basic_info += f"Analysis result: {file_data.get('message', 'Processing completed')}" |
| | |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=True, |
| | result=basic_info, |
| | confidence=0.40, |
| | reasoning="Generic file processing attempted", |
| | tools_used=[ToolResult( |
| | tool_name="file_processor", |
| | success=True, |
| | result=file_data, |
| | execution_time=file_result.execution_time |
| | )], |
| | model_used="basic", |
| | processing_time=file_result.execution_time, |
| | cost_estimate=0.0 |
| | ) |
| | else: |
| | return self._create_failure_result("Generic file processing failed") |
| | |
| | def _create_failure_result(self, error_message: str) -> AgentResult: |
| | """Create a failure result""" |
| | return AgentResult( |
| | agent_role=AgentRole.FILE_PROCESSOR, |
| | success=False, |
| | result=error_message, |
| | confidence=0.0, |
| | reasoning=error_message, |
| | model_used="error", |
| | processing_time=0.0, |
| | cost_estimate=0.0 |
| | ) |