Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| File Processor Agent for GAIA Agent System | |
| Handles file-based questions with intelligent processing strategies | |
| """ | |
| import os | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from pathlib import Path | |
| from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult | |
| from models.qwen_client import QwenClient, ModelTier | |
| from tools.file_processor import FileProcessorTool | |
| from tools.calculator import CalculatorTool | |
| logger = logging.getLogger(__name__) | |
| class FileProcessorAgent: | |
| """ | |
| Specialized agent for file processing tasks | |
| Handles images, audio, CSV/Excel, Python code, and other file types | |
| """ | |
| def __init__(self, llm_client: QwenClient): | |
| self.llm_client = llm_client | |
| self.file_processor = FileProcessorTool() | |
| self.calculator = CalculatorTool() # For data analysis | |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: | |
| """ | |
| Process file-based questions using file analysis tools | |
| """ | |
| logger.info(f"File processor processing: {state.question[:100]}...") | |
| state.add_processing_step("File Processor: Starting file analysis") | |
| try: | |
| # Check if file exists | |
| if not state.file_path or not os.path.exists(state.file_path): | |
| error_msg = f"File not found: {state.file_path}" | |
| state.add_error(error_msg) | |
| result = self._create_failure_result(error_msg) | |
| state.add_agent_result(result) | |
| return state | |
| # Determine processing strategy | |
| strategy = self._determine_processing_strategy(state.question, state.file_path) | |
| state.add_processing_step(f"File Processor: Strategy = {strategy}") | |
| # Execute processing based on strategy | |
| if strategy == "image_analysis": | |
| result = self._process_image(state) | |
| elif strategy == "data_analysis": | |
| result = self._process_data_file(state) | |
| elif strategy == "code_analysis": | |
| result = self._process_code_file(state) | |
| elif strategy == "audio_analysis": | |
| result = self._process_audio_file(state) | |
| elif strategy == "text_analysis": | |
| result = self._process_text_file(state) | |
| else: | |
| result = self._process_generic_file(state) | |
| # Add result to state | |
| state.add_agent_result(result) | |
| state.add_processing_step(f"File Processor: Completed with confidence {result.confidence:.2f}") | |
| return state | |
| except Exception as e: | |
| error_msg = f"File processing failed: {str(e)}" | |
| state.add_error(error_msg) | |
| logger.error(error_msg) | |
| # Create failure result | |
| failure_result = self._create_failure_result(error_msg) | |
| state.add_agent_result(failure_result) | |
| return state | |
| def _determine_processing_strategy(self, question: str, file_path: str) -> str: | |
| """Determine the best processing strategy based on file type and question""" | |
| file_extension = Path(file_path).suffix.lower() | |
| question_lower = question.lower() | |
| # Image file analysis | |
| if file_extension in {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}: | |
| return "image_analysis" | |
| # Audio file analysis | |
| if file_extension in {'.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac'}: | |
| return "audio_analysis" | |
| # Data file analysis | |
| if file_extension in {'.csv', '.xlsx', '.xls', '.json'}: | |
| return "data_analysis" | |
| # Code file analysis | |
| if file_extension in {'.py', '.js', '.java', '.cpp', '.c', '.html', '.css'}: | |
| return "code_analysis" | |
| # Text file analysis | |
| if file_extension in {'.txt', '.md', '.rst'}: | |
| return "text_analysis" | |
| # Default to generic processing | |
| return "generic_analysis" | |
| def _process_image(self, state: GAIAAgentState) -> AgentResult: | |
| """Process image files and answer questions about them""" | |
| logger.info(f"Processing image: {state.file_path}") | |
| # Analyze image with file processor | |
| file_result = self.file_processor.execute(state.file_path) | |
| if file_result.success and file_result.result.get('success'): | |
| file_data = file_result.result['result'] | |
| # Create analysis prompt based on image metadata and question | |
| analysis_prompt = f""" | |
| Based on this image analysis, please answer the following question: | |
| Question: {state.question} | |
| Image Information: | |
| - File: {file_data.get('file_path', '')} | |
| - Type: {file_data.get('file_type', '')} | |
| - Content Description: {file_data.get('content', '')} | |
| - Metadata: {file_data.get('metadata', {})} | |
| Please provide a direct answer based on the image analysis. | |
| If the question asks about specific details that cannot be determined from the metadata alone, | |
| please indicate what information is available and what would require visual analysis. | |
| """ | |
| # Use main model for image analysis | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| confidence = 0.75 # Good confidence for image metadata analysis | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=confidence, | |
| reasoning="Analyzed image metadata and properties", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used=llm_result.model_used, | |
| processing_time=file_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Fallback to metadata description | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=file_data.get('content', 'Image analyzed'), | |
| confidence=0.60, | |
| reasoning="Image processed but analysis failed", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used="fallback", | |
| processing_time=file_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| else: | |
| return self._create_failure_result("Image processing failed") | |
| def _process_data_file(self, state: GAIAAgentState) -> AgentResult: | |
| """Process CSV/Excel files and perform data analysis""" | |
| logger.info(f"Processing data file: {state.file_path}") | |
| # Analyze data file | |
| file_result = self.file_processor.execute(state.file_path) | |
| if file_result.success and file_result.result.get('success'): | |
| file_data = file_result.result['result'] | |
| metadata = file_data.get('metadata', {}) | |
| content = file_data.get('content', {}) | |
| # Check if question requires calculations | |
| question_lower = state.question.lower() | |
| needs_calculation = any(term in question_lower for term in [ | |
| 'calculate', 'sum', 'total', 'average', 'mean', 'count', | |
| 'maximum', 'minimum', 'how many', 'what is the' | |
| ]) | |
| if needs_calculation and 'sample_data' in content: | |
| return self._perform_data_calculations(state, file_data, file_result) | |
| else: | |
| return self._analyze_data_structure(state, file_data, file_result) | |
| else: | |
| return self._create_failure_result("Data file processing failed") | |
| def _perform_data_calculations(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult: | |
| """Perform calculations on data file content""" | |
| metadata = file_data.get('metadata', {}) | |
| content = file_data.get('content', {}) | |
| # Extract data for calculations | |
| sample_data = content.get('sample_data', []) | |
| # Use LLM to determine what calculations to perform | |
| calculation_prompt = f""" | |
| Based on this data file and question, determine what calculations are needed: | |
| Question: {state.question} | |
| Data Structure: | |
| - Columns: {metadata.get('columns', [])} | |
| - Rows: {metadata.get('row_count', 0)} | |
| - Sample Data: {sample_data[:3]} # First 3 rows | |
| Please specify what calculations should be performed and on which columns. | |
| Respond with specific calculation instructions. | |
| """ | |
| llm_result = self.llm_client.generate(calculation_prompt, tier=ModelTier.MAIN, max_tokens=200) | |
| if llm_result.success: | |
| # For now, provide data summary with LLM analysis | |
| analysis_prompt = f""" | |
| Based on this data analysis, please answer the question: | |
| Question: {state.question} | |
| Data Summary: | |
| - File: {metadata.get('shape', [])} (rows x columns) | |
| - Columns: {metadata.get('columns', [])} | |
| - Numeric columns: {metadata.get('numeric_columns', [])} | |
| - Statistics: {metadata.get('numeric_stats', {})} | |
| - Sample data: {sample_data} | |
| Calculation guidance: {llm_result.response} | |
| Please provide the answer based on the data. | |
| """ | |
| analysis_result = self.llm_client.generate(analysis_prompt, tier=ModelTier.MAIN, max_tokens=400) | |
| if analysis_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=analysis_result.response, | |
| confidence=0.80, | |
| reasoning="Performed data analysis and calculations", | |
| tools_used=[file_result], | |
| model_used=analysis_result.model_used, | |
| processing_time=file_result.execution_time + llm_result.response_time + analysis_result.response_time, | |
| cost_estimate=llm_result.cost_estimate + analysis_result.cost_estimate | |
| ) | |
| # Fallback to basic data summary | |
| return self._analyze_data_structure(state, file_data, file_result) | |
| def _analyze_data_structure(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult: | |
| """Analyze data file structure and content""" | |
| metadata = file_data.get('metadata', {}) | |
| content = file_data.get('content', {}) | |
| analysis_prompt = f""" | |
| Based on this data file analysis, please answer the question: | |
| Question: {state.question} | |
| Data File Information: | |
| - Structure: {metadata.get('shape', [])} (rows x columns) | |
| - Columns: {metadata.get('columns', [])} | |
| - Data types: {metadata.get('data_types', {})} | |
| - Description: {content.get('description', '')} | |
| - Sample data: {content.get('sample_data', [])} | |
| Please provide a direct answer based on the data structure and content. | |
| """ | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.75, | |
| reasoning="Analyzed data file structure and content", | |
| tools_used=[file_result], | |
| model_used=llm_result.model_used, | |
| processing_time=file_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=content.get('description', 'Data file analyzed'), | |
| confidence=0.60, | |
| reasoning="Data file processed but analysis failed", | |
| tools_used=[file_result], | |
| model_used="fallback", | |
| processing_time=file_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _process_code_file(self, state: GAIAAgentState) -> AgentResult: | |
| """Process code files and analyze their content""" | |
| logger.info(f"Processing code file: {state.file_path}") | |
| # Analyze code file | |
| file_result = self.file_processor.execute(state.file_path) | |
| if file_result.success and file_result.result.get('success'): | |
| file_data = file_result.result['result'] | |
| metadata = file_data.get('metadata', {}) | |
| content = file_data.get('content', {}) | |
| analysis_prompt = f""" | |
| Based on this code analysis, please answer the question: | |
| Question: {state.question} | |
| Code File Information: | |
| - Type: {file_data.get('file_type', '')} | |
| - Description: {content.get('description', '')} | |
| - Metadata: {metadata} | |
| - Code snippet: {content.get('code_snippet', '')} | |
| Please analyze the code and provide a direct answer. | |
| """ | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.80, | |
| reasoning="Analyzed code structure and content", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used=llm_result.model_used, | |
| processing_time=file_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=content.get('description', 'Code file analyzed'), | |
| confidence=0.60, | |
| reasoning="Code file processed but analysis failed", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used="fallback", | |
| processing_time=file_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| else: | |
| return self._create_failure_result("Code file processing failed") | |
| def _process_audio_file(self, state: GAIAAgentState) -> AgentResult: | |
| """Process audio files (basic metadata for now)""" | |
| logger.info(f"Processing audio file: {state.file_path}") | |
| # Analyze audio file | |
| file_result = self.file_processor.execute(state.file_path) | |
| if file_result.success and file_result.result.get('success'): | |
| file_data = file_result.result['result'] | |
| analysis_prompt = f""" | |
| Based on this audio file information, please answer the question: | |
| Question: {state.question} | |
| Audio File Information: | |
| - Content: {file_data.get('content', '')} | |
| - Metadata: {file_data.get('metadata', {})} | |
| Please provide an answer based on the available audio file information. | |
| Note: Full audio transcription is not currently available, but file metadata is provided. | |
| """ | |
| model_tier = ModelTier.ROUTER # Use lighter model for basic audio metadata | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.50, # Lower confidence due to limited audio processing | |
| reasoning="Analyzed audio file metadata (transcription not available)", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used=llm_result.model_used, | |
| processing_time=file_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| return self._create_failure_result("Audio file processing not fully supported") | |
| def _process_text_file(self, state: GAIAAgentState) -> AgentResult: | |
| """Process text files and analyze their content""" | |
| logger.info(f"Processing text file: {state.file_path}") | |
| # Analyze text file | |
| file_result = self.file_processor.execute(state.file_path) | |
| if file_result.success and file_result.result.get('success'): | |
| file_data = file_result.result['result'] | |
| content = file_data.get('content', {}) | |
| analysis_prompt = f""" | |
| Based on this text file content, please answer the question: | |
| Question: {state.question} | |
| Text Content: | |
| {content.get('text', '')[:2000]}... | |
| File Statistics: | |
| - Word count: {file_data.get('metadata', {}).get('word_count', 0)} | |
| - Line count: {file_data.get('metadata', {}).get('line_count', 0)} | |
| Please analyze the text and provide a direct answer. | |
| """ | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.85, | |
| reasoning="Analyzed text file content", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used=llm_result.model_used, | |
| processing_time=file_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| return self._create_failure_result("Text file processing failed") | |
| def _process_generic_file(self, state: GAIAAgentState) -> AgentResult: | |
| """Process unknown file types with generic analysis""" | |
| logger.info(f"Processing generic file: {state.file_path}") | |
| # Try generic file processing | |
| file_result = self.file_processor.execute(state.file_path) | |
| if file_result.success: | |
| file_data = file_result.result | |
| # Create basic response about file | |
| basic_info = f"File analyzed: {state.file_path}. " | |
| if file_data.get('success'): | |
| basic_info += f"File type: {file_data.get('result', {}).get('file_type', 'unknown')}. " | |
| basic_info += "Generic file analysis completed." | |
| else: | |
| basic_info += f"Analysis result: {file_data.get('message', 'Processing completed')}" | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=True, | |
| result=basic_info, | |
| confidence=0.40, | |
| reasoning="Generic file processing attempted", | |
| tools_used=[ToolResult( | |
| tool_name="file_processor", | |
| success=True, | |
| result=file_data, | |
| execution_time=file_result.execution_time | |
| )], | |
| model_used="basic", | |
| processing_time=file_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| else: | |
| return self._create_failure_result("Generic file processing failed") | |
| def _create_failure_result(self, error_message: str) -> AgentResult: | |
| """Create a failure result""" | |
| return AgentResult( | |
| agent_role=AgentRole.FILE_PROCESSOR, | |
| success=False, | |
| result=error_message, | |
| confidence=0.0, | |
| reasoning=error_message, | |
| model_used="error", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) |