Agent_Course_Final_Assignment / src /agents /file_processor_agent.py
Chris
Final 4
225a75e
#!/usr/bin/env python3
"""
File Processor Agent for GAIA Agent System
Handles file-based questions with intelligent processing strategies
"""
import os
import logging
from typing import Dict, List, Optional, Any
from pathlib import Path
from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult
from models.qwen_client import QwenClient, ModelTier
from tools.file_processor import FileProcessorTool
from tools.calculator import CalculatorTool
logger = logging.getLogger(__name__)
class FileProcessorAgent:
"""
Specialized agent for file processing tasks
Handles images, audio, CSV/Excel, Python code, and other file types
"""
def __init__(self, llm_client: QwenClient):
self.llm_client = llm_client
self.file_processor = FileProcessorTool()
self.calculator = CalculatorTool() # For data analysis
def process(self, state: GAIAAgentState) -> GAIAAgentState:
"""
Process file-based questions using file analysis tools
"""
logger.info(f"File processor processing: {state.question[:100]}...")
state.add_processing_step("File Processor: Starting file analysis")
try:
# Check if file exists
if not state.file_path or not os.path.exists(state.file_path):
error_msg = f"File not found: {state.file_path}"
state.add_error(error_msg)
result = self._create_failure_result(error_msg)
state.add_agent_result(result)
return state
# Determine processing strategy
strategy = self._determine_processing_strategy(state.question, state.file_path)
state.add_processing_step(f"File Processor: Strategy = {strategy}")
# Execute processing based on strategy
if strategy == "image_analysis":
result = self._process_image(state)
elif strategy == "data_analysis":
result = self._process_data_file(state)
elif strategy == "code_analysis":
result = self._process_code_file(state)
elif strategy == "audio_analysis":
result = self._process_audio_file(state)
elif strategy == "text_analysis":
result = self._process_text_file(state)
else:
result = self._process_generic_file(state)
# Add result to state
state.add_agent_result(result)
state.add_processing_step(f"File Processor: Completed with confidence {result.confidence:.2f}")
return state
except Exception as e:
error_msg = f"File processing failed: {str(e)}"
state.add_error(error_msg)
logger.error(error_msg)
# Create failure result
failure_result = self._create_failure_result(error_msg)
state.add_agent_result(failure_result)
return state
def _determine_processing_strategy(self, question: str, file_path: str) -> str:
"""Determine the best processing strategy based on file type and question"""
file_extension = Path(file_path).suffix.lower()
question_lower = question.lower()
# Image file analysis
if file_extension in {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}:
return "image_analysis"
# Audio file analysis
if file_extension in {'.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac'}:
return "audio_analysis"
# Data file analysis
if file_extension in {'.csv', '.xlsx', '.xls', '.json'}:
return "data_analysis"
# Code file analysis
if file_extension in {'.py', '.js', '.java', '.cpp', '.c', '.html', '.css'}:
return "code_analysis"
# Text file analysis
if file_extension in {'.txt', '.md', '.rst'}:
return "text_analysis"
# Default to generic processing
return "generic_analysis"
def _process_image(self, state: GAIAAgentState) -> AgentResult:
"""Process image files and answer questions about them"""
logger.info(f"Processing image: {state.file_path}")
# Analyze image with file processor
file_result = self.file_processor.execute(state.file_path)
if file_result.success and file_result.result.get('success'):
file_data = file_result.result['result']
# Create analysis prompt based on image metadata and question
analysis_prompt = f"""
Based on this image analysis, please answer the following question:
Question: {state.question}
Image Information:
- File: {file_data.get('file_path', '')}
- Type: {file_data.get('file_type', '')}
- Content Description: {file_data.get('content', '')}
- Metadata: {file_data.get('metadata', {})}
Please provide a direct answer based on the image analysis.
If the question asks about specific details that cannot be determined from the metadata alone,
please indicate what information is available and what would require visual analysis.
"""
# Use main model for image analysis
model_tier = ModelTier.MAIN
llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400)
if llm_result.success:
confidence = 0.75 # Good confidence for image metadata analysis
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=llm_result.response,
confidence=confidence,
reasoning="Analyzed image metadata and properties",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used=llm_result.model_used,
processing_time=file_result.execution_time + llm_result.response_time,
cost_estimate=llm_result.cost_estimate
)
else:
# Fallback to metadata description
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=file_data.get('content', 'Image analyzed'),
confidence=0.60,
reasoning="Image processed but analysis failed",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used="fallback",
processing_time=file_result.execution_time,
cost_estimate=0.0
)
else:
return self._create_failure_result("Image processing failed")
def _process_data_file(self, state: GAIAAgentState) -> AgentResult:
"""Process CSV/Excel files and perform data analysis"""
logger.info(f"Processing data file: {state.file_path}")
# Analyze data file
file_result = self.file_processor.execute(state.file_path)
if file_result.success and file_result.result.get('success'):
file_data = file_result.result['result']
metadata = file_data.get('metadata', {})
content = file_data.get('content', {})
# Check if question requires calculations
question_lower = state.question.lower()
needs_calculation = any(term in question_lower for term in [
'calculate', 'sum', 'total', 'average', 'mean', 'count',
'maximum', 'minimum', 'how many', 'what is the'
])
if needs_calculation and 'sample_data' in content:
return self._perform_data_calculations(state, file_data, file_result)
else:
return self._analyze_data_structure(state, file_data, file_result)
else:
return self._create_failure_result("Data file processing failed")
def _perform_data_calculations(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult:
"""Perform calculations on data file content"""
metadata = file_data.get('metadata', {})
content = file_data.get('content', {})
# Extract data for calculations
sample_data = content.get('sample_data', [])
# Use LLM to determine what calculations to perform
calculation_prompt = f"""
Based on this data file and question, determine what calculations are needed:
Question: {state.question}
Data Structure:
- Columns: {metadata.get('columns', [])}
- Rows: {metadata.get('row_count', 0)}
- Sample Data: {sample_data[:3]} # First 3 rows
Please specify what calculations should be performed and on which columns.
Respond with specific calculation instructions.
"""
llm_result = self.llm_client.generate(calculation_prompt, tier=ModelTier.MAIN, max_tokens=200)
if llm_result.success:
# For now, provide data summary with LLM analysis
analysis_prompt = f"""
Based on this data analysis, please answer the question:
Question: {state.question}
Data Summary:
- File: {metadata.get('shape', [])} (rows x columns)
- Columns: {metadata.get('columns', [])}
- Numeric columns: {metadata.get('numeric_columns', [])}
- Statistics: {metadata.get('numeric_stats', {})}
- Sample data: {sample_data}
Calculation guidance: {llm_result.response}
Please provide the answer based on the data.
"""
analysis_result = self.llm_client.generate(analysis_prompt, tier=ModelTier.MAIN, max_tokens=400)
if analysis_result.success:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=analysis_result.response,
confidence=0.80,
reasoning="Performed data analysis and calculations",
tools_used=[file_result],
model_used=analysis_result.model_used,
processing_time=file_result.execution_time + llm_result.response_time + analysis_result.response_time,
cost_estimate=llm_result.cost_estimate + analysis_result.cost_estimate
)
# Fallback to basic data summary
return self._analyze_data_structure(state, file_data, file_result)
def _analyze_data_structure(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult:
"""Analyze data file structure and content"""
metadata = file_data.get('metadata', {})
content = file_data.get('content', {})
analysis_prompt = f"""
Based on this data file analysis, please answer the question:
Question: {state.question}
Data File Information:
- Structure: {metadata.get('shape', [])} (rows x columns)
- Columns: {metadata.get('columns', [])}
- Data types: {metadata.get('data_types', {})}
- Description: {content.get('description', '')}
- Sample data: {content.get('sample_data', [])}
Please provide a direct answer based on the data structure and content.
"""
model_tier = ModelTier.MAIN
llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400)
if llm_result.success:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=llm_result.response,
confidence=0.75,
reasoning="Analyzed data file structure and content",
tools_used=[file_result],
model_used=llm_result.model_used,
processing_time=file_result.execution_time + llm_result.response_time,
cost_estimate=llm_result.cost_estimate
)
else:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=content.get('description', 'Data file analyzed'),
confidence=0.60,
reasoning="Data file processed but analysis failed",
tools_used=[file_result],
model_used="fallback",
processing_time=file_result.execution_time,
cost_estimate=0.0
)
def _process_code_file(self, state: GAIAAgentState) -> AgentResult:
"""Process code files and analyze their content"""
logger.info(f"Processing code file: {state.file_path}")
# Analyze code file
file_result = self.file_processor.execute(state.file_path)
if file_result.success and file_result.result.get('success'):
file_data = file_result.result['result']
metadata = file_data.get('metadata', {})
content = file_data.get('content', {})
analysis_prompt = f"""
Based on this code analysis, please answer the question:
Question: {state.question}
Code File Information:
- Type: {file_data.get('file_type', '')}
- Description: {content.get('description', '')}
- Metadata: {metadata}
- Code snippet: {content.get('code_snippet', '')}
Please analyze the code and provide a direct answer.
"""
model_tier = ModelTier.MAIN
llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500)
if llm_result.success:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=llm_result.response,
confidence=0.80,
reasoning="Analyzed code structure and content",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used=llm_result.model_used,
processing_time=file_result.execution_time + llm_result.response_time,
cost_estimate=llm_result.cost_estimate
)
else:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=content.get('description', 'Code file analyzed'),
confidence=0.60,
reasoning="Code file processed but analysis failed",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used="fallback",
processing_time=file_result.execution_time,
cost_estimate=0.0
)
else:
return self._create_failure_result("Code file processing failed")
def _process_audio_file(self, state: GAIAAgentState) -> AgentResult:
"""Process audio files (basic metadata for now)"""
logger.info(f"Processing audio file: {state.file_path}")
# Analyze audio file
file_result = self.file_processor.execute(state.file_path)
if file_result.success and file_result.result.get('success'):
file_data = file_result.result['result']
analysis_prompt = f"""
Based on this audio file information, please answer the question:
Question: {state.question}
Audio File Information:
- Content: {file_data.get('content', '')}
- Metadata: {file_data.get('metadata', {})}
Please provide an answer based on the available audio file information.
Note: Full audio transcription is not currently available, but file metadata is provided.
"""
model_tier = ModelTier.ROUTER # Use lighter model for basic audio metadata
llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300)
if llm_result.success:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=llm_result.response,
confidence=0.50, # Lower confidence due to limited audio processing
reasoning="Analyzed audio file metadata (transcription not available)",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used=llm_result.model_used,
processing_time=file_result.execution_time + llm_result.response_time,
cost_estimate=llm_result.cost_estimate
)
return self._create_failure_result("Audio file processing not fully supported")
def _process_text_file(self, state: GAIAAgentState) -> AgentResult:
"""Process text files and analyze their content"""
logger.info(f"Processing text file: {state.file_path}")
# Analyze text file
file_result = self.file_processor.execute(state.file_path)
if file_result.success and file_result.result.get('success'):
file_data = file_result.result['result']
content = file_data.get('content', {})
analysis_prompt = f"""
Based on this text file content, please answer the question:
Question: {state.question}
Text Content:
{content.get('text', '')[:2000]}...
File Statistics:
- Word count: {file_data.get('metadata', {}).get('word_count', 0)}
- Line count: {file_data.get('metadata', {}).get('line_count', 0)}
Please analyze the text and provide a direct answer.
"""
model_tier = ModelTier.MAIN
llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400)
if llm_result.success:
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=llm_result.response,
confidence=0.85,
reasoning="Analyzed text file content",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used=llm_result.model_used,
processing_time=file_result.execution_time + llm_result.response_time,
cost_estimate=llm_result.cost_estimate
)
return self._create_failure_result("Text file processing failed")
def _process_generic_file(self, state: GAIAAgentState) -> AgentResult:
"""Process unknown file types with generic analysis"""
logger.info(f"Processing generic file: {state.file_path}")
# Try generic file processing
file_result = self.file_processor.execute(state.file_path)
if file_result.success:
file_data = file_result.result
# Create basic response about file
basic_info = f"File analyzed: {state.file_path}. "
if file_data.get('success'):
basic_info += f"File type: {file_data.get('result', {}).get('file_type', 'unknown')}. "
basic_info += "Generic file analysis completed."
else:
basic_info += f"Analysis result: {file_data.get('message', 'Processing completed')}"
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=True,
result=basic_info,
confidence=0.40,
reasoning="Generic file processing attempted",
tools_used=[ToolResult(
tool_name="file_processor",
success=True,
result=file_data,
execution_time=file_result.execution_time
)],
model_used="basic",
processing_time=file_result.execution_time,
cost_estimate=0.0
)
else:
return self._create_failure_result("Generic file processing failed")
def _create_failure_result(self, error_message: str) -> AgentResult:
"""Create a failure result"""
return AgentResult(
agent_role=AgentRole.FILE_PROCESSOR,
success=False,
result=error_message,
confidence=0.0,
reasoning=error_message,
model_used="error",
processing_time=0.0,
cost_estimate=0.0
)