| | """ |
| | Financial Document Analysis Workflow - Agno Workflow 2.0 Implementation (Fixed) |
| | |
| | This workflow processes financial documents through a multi-agent system using the new |
| | step-based architecture introduced in Agno Workflow 2.0: |
| | 1. Data Extractor Agent: Extracts structured financial data |
| | 2. Data Arrangement Function: Organizes data into Excel-ready format |
| | 3. Code Generator Agent: Creates professional Excel reports |
| | |
| | Built according to Agno Workflow 2.0 standards with simple sequential execution. |
| | """ |
| |
|
| | import json |
| | import time |
| | from pathlib import Path |
| | from typing import Optional, Dict, Any |
| | from textwrap import dedent |
| | import os |
| |
|
| | from agno.agent import Agent |
| | from agno.models.google import Gemini |
| | from agno.tools.file import FileTools |
| | from agno.tools.shell import ShellTools |
| | from agno.tools.python import PythonTools |
| | from agno.workflow.v2.workflow import Workflow |
| | from agno.workflow.v2.types import StepInput, StepOutput |
| | from agno.workflow.v2.step import Step |
| | from agno.storage.sqlite import SqliteStorage |
| | from agno.utils.log import logger |
| | from pydantic import BaseModel, Field |
| |
|
| | from config.settings import settings |
| | from utils.prompt_loader import prompt_loader |
| | from utils.shell_toolkit import RestrictedShellTools |
| | from utils.restricted_python_tools import RestrictedPythonTools |
| |
|
| |
|
| | class DataPoint(BaseModel): |
| | """Individual financial data point.""" |
| | field_name: str = Field(description="Name of the financial data field") |
| | value: str = Field(description="Value of the field") |
| | category: str = Field(description="Financial category (revenue, expenses, assets, etc.)") |
| | period: str = Field(default="", description="Time period if applicable") |
| | unit: str = Field(default="", description="Currency or measurement unit") |
| | confidence: float = Field(default=0.9, description="Confidence score 0-1") |
| |
|
| |
|
| | class Metadata(BaseModel): |
| | """Metadata for extracted financial data.""" |
| | company_name: str = Field(default="Unknown Company", description="Company name") |
| | document_type: str = Field(default="Unknown", description="Type of financial document") |
| | reporting_period: str = Field(default="", description="Reporting period") |
| | currency: str = Field(default="", description="Primary currency used") |
| |
|
| |
|
| | class ExtractedFinancialData(BaseModel): |
| | """Structured model for extracted financial data.""" |
| | data_points: list[DataPoint] = Field(description="List of extracted financial data points") |
| | summary: str = Field(description="Summary of the extracted data") |
| | metadata: Metadata = Field(default_factory=Metadata, description="Additional metadata") |
| |
|
| |
|
| | class FinancialDocumentWorkflow(Workflow): |
| | """ |
| | Financial document analysis workflow using Agno Workflow 2.0 step-based architecture. |
| | |
| | This workflow processes financial documents through three specialized steps: |
| | - Data extraction with structured outputs |
| | - Data arrangement for Excel compatibility |
| | - Excel report generation with formatting |
| | """ |
| | |
| | def __init__(self, session_id: Optional[str] = None, **kwargs): |
| | """Initialize workflow with session management and step-based architecture.""" |
| | |
| | |
| | self._setup_session_directories(session_id) |
| | |
| | |
| | storage = SqliteStorage( |
| | table_name="financial_workflows", |
| | db_file="tmp/agno_workflows.db", |
| | mode="workflow_v2", |
| | auto_upgrade_schema=True |
| | ) |
| | |
| | |
| | self.data_extractor = self._create_data_extractor() |
| | self.data_arranger = self._create_data_arranger() |
| | self.code_generator = self._create_code_generator() |
| | |
| | |
| | data_extraction_step = Step( |
| | name="FinancialDataExtractor", |
| | agent=self.data_extractor, |
| | description="Expert financial data extraction specialist optimized for Gemini" |
| | ) |
| | |
| | data_arrangement_step = Step( |
| | name="DataArrangement", |
| | executor=self._arrangement_function, |
| | description="User-defined callable step for data arrangement" |
| | ) |
| | |
| | excel_generation_step = Step( |
| | name="ExcelReportGenerator", |
| | agent=self.code_generator, |
| | description="Excel report generator optimized for Gemini with cross-platform support" |
| | ) |
| | |
| | |
| | super().__init__( |
| | name="FinancialDocumentWorkflow", |
| | description=dedent("""\ |
| | Financial document analysis workflow using Agno Workflow 2.0 with step-based execution. |
| | Processes financial documents through extraction, arrangement, and Excel report generation. |
| | Uses session state for caching and proper error recovery mechanisms. |
| | """), |
| | steps=[ |
| | data_extraction_step, |
| | data_arrangement_step, |
| | excel_generation_step |
| | ], |
| | session_id=session_id, |
| | storage=storage, |
| | debug_mode=True, |
| | **kwargs |
| | ) |
| | |
| | logger.info(f"FinancialDocumentWorkflow v2.0 initialized with session: {self.session_id}") |
| | logger.info(f"Session directories: {list(self.session_directories.keys())}") |
| | |
| | def _setup_session_directories(self, session_id: Optional[str] = None): |
| | """Setup session-specific directories.""" |
| | self.session_id = session_id |
| | self.session_directories = settings.create_session_directories(self.session_id) |
| | self.session_output_dir = self.session_directories["output"] |
| | self.session_input_dir = self.session_directories["input"] |
| | self.session_temp_dir = self.session_directories["temp"] |
| | self.session_cache_dir = self.session_directories["cache"] |
| | |
| | def _create_data_extractor(self) -> Agent: |
| | """Create the data extraction agent.""" |
| | return Agent( |
| | model=Gemini( |
| | id=settings.DATA_EXTRACTOR_MODEL, |
| | thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET, |
| | api_key=settings.GOOGLE_API_KEY |
| | ), |
| | name="FinancialDataExtractor", |
| | description="Expert financial data extraction specialist optimized for Gemini", |
| | instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"), |
| | response_model=ExtractedFinancialData, |
| | structured_outputs=True, |
| | debug_mode=True, |
| | retries=10, |
| | delay_between_retries=10, |
| | exponential_backoff=True, |
| | ) |
| | |
| | def _create_data_arranger(self) -> Agent: |
| | """Create the data arrangement agent.""" |
| | logger.info(f"Data arranger base directory: {self.session_output_dir}") |
| | logger.info(f"Directory exists: {self.session_output_dir.exists()}") |
| | logger.info(f"Directory is writable: {os.access(self.session_output_dir, os.W_OK)}") |
| | return Agent( |
| | model=Gemini( |
| | id=settings.DATA_ARRANGER_MODEL, |
| | thinking_budget=settings.DATA_ARRANGER_MODEL_THINKING_BUDGET, |
| | api_key=settings.GOOGLE_API_KEY |
| | ), |
| | name="FinancialDataArranger", |
| | description="Financial data organization specialist optimized for Gemini", |
| | instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"), |
| | tools=[ |
| | RestrictedShellTools(base_dir=self.session_output_dir), |
| | FileTools(base_dir=self.session_output_dir, save_files=True, read_files=True, list_files=True), |
| | ], |
| | markdown=False, |
| | debug_mode=True, |
| | add_memory_references=True, |
| | add_session_summary_references=True, |
| | retries=10, |
| | delay_between_retries=10, |
| | exponential_backoff=True, |
| | ) |
| | |
| | def _create_code_generator(self) -> Agent: |
| | """Create the code generation agent.""" |
| | return Agent( |
| | model=Gemini( |
| | id=settings.CODE_GENERATOR_MODEL, |
| | thinking_budget=settings.CODE_GENERATOR_MODEL_THINKING_BUDGET, |
| | api_key=settings.GOOGLE_API_KEY |
| | ), |
| | name="ExcelReportGenerator", |
| | description="Excel report generator optimized for Gemini with cross-platform support", |
| | goal="Generate professional Excel reports from arranged financial data with multiple worksheets and formatting", |
| | instructions=prompt_loader.load_instructions_as_list("agents/code_generator"), |
| | expected_output="A professionally formatted Excel file with multiple worksheets, charts, and proper styling", |
| | additional_context=f"Working directory: {self.session_output_dir}. All files must be saved in this directory only.", |
| | tools=[ |
| | RestrictedShellTools(base_dir=self.session_output_dir), |
| | RestrictedPythonTools(base_dir=self.session_output_dir), |
| | FileTools(base_dir=self.session_output_dir, save_files=True, read_files=True, list_files=True) |
| | ], |
| | markdown=False, |
| | show_tool_calls=True, |
| | debug_mode=True, |
| | add_datetime_to_instructions=True, |
| | retries=10, |
| | delay_between_retries=10, |
| | exponential_backoff=True, |
| | ) |
| | |
| | def _arrangement_function(self, step_input: StepInput) -> StepOutput: |
| | """Custom function for data arrangement step.""" |
| | try: |
| | message = step_input.message |
| | previous_step_content = step_input.previous_step_content |
| | |
| | logger.info("Starting data arrangement step") |
| | |
| | |
| | arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement") |
| | |
| | |
| | full_arrangement_prompt = f"{arrangement_prompt}\n\nHere is the extracted financial data to arrange:\n\n{previous_step_content}" |
| | |
| | |
| | response = self.data_arranger.run(full_arrangement_prompt) |
| | |
| | |
| | if hasattr(self, 'session_state') and self.session_state: |
| | cache_key = f"arrangement_{int(time.time())}" |
| | self.session_state[cache_key] = response.content |
| | logger.info(f"Cached arrangement results with key: {cache_key}") |
| | |
| | logger.info("Data arrangement completed successfully") |
| | |
| | return StepOutput( |
| | content=response.content, |
| | response=response, |
| | success=True |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"Data arrangement failed: {str(e)}") |
| | return StepOutput( |
| | content=f"Data arrangement failed: {str(e)}", |
| | success=False, |
| | ) |
| | |
| | def run(self, file_path: str = None, **kwargs): |
| | """ |
| | Main workflow execution using Workflow 2.0 step-based architecture. |
| | |
| | Args: |
| | file_path: Path to the financial document to process |
| | **kwargs: Additional parameters |
| | |
| | Returns: |
| | Workflow execution result using the new step-based system |
| | """ |
| | |
| | if file_path is None: |
| | file_path = kwargs.get('file_path') |
| | |
| | if file_path is None: |
| | logger.error("file_path is required but not provided") |
| | raise ValueError("file_path is required but not provided") |
| | |
| | start_time = time.time() |
| | |
| | try: |
| | |
| | file_path = Path(file_path).resolve() |
| | if not file_path.exists(): |
| | logger.error(f"File not found: {file_path}") |
| | raise FileNotFoundError(f"File not found: {file_path}") |
| | |
| | |
| | input_file = self.session_input_dir / file_path.name |
| | input_file.write_bytes(file_path.read_bytes()) |
| | |
| | logger.info(f"Starting financial document analysis for: {file_path.name}") |
| | |
| | |
| | from agno.media import File |
| | document = File(filepath=str(file_path)) |
| | |
| | |
| | extraction_prompt = prompt_loader.load_prompt( |
| | "workflow/data_extraction", |
| | file_path=str(file_path), |
| | output_directory=str(self.session_output_dir) |
| | ) |
| | |
| | |
| | |
| | result = super().run( |
| | message=extraction_prompt, |
| | files=[document], |
| | **kwargs |
| | ) |
| | |
| | |
| | execution_time = time.time() - start_time |
| | status = self._get_workflow_status() |
| | |
| | logger.info(f"Workflow completed successfully in {execution_time:.2f} seconds") |
| | logger.info(f"Results: {status}") |
| | |
| | return result |
| | |
| | except Exception as e: |
| | logger.error(f"Workflow execution failed: {str(e)}") |
| | raise |
| | |
| | def _get_workflow_status(self) -> Dict[str, Any]: |
| | """Get current workflow status and file counts.""" |
| | status = { |
| | "session_id": self.session_id, |
| | "output_directory": str(self.session_output_dir), |
| | "json_files": 0, |
| | "excel_files": 0, |
| | "data_points": 0 |
| | } |
| | |
| | if self.session_output_dir.exists(): |
| | status["json_files"] = len(list(self.session_output_dir.glob("*.json"))) |
| | status["excel_files"] = len(list(self.session_output_dir.glob("*.xlsx"))) |
| | |
| | return status |
| |
|
| |
|
| | |
| | def create_financial_workflow(session_id: Optional[str] = None, **kwargs) -> FinancialDocumentWorkflow: |
| | """ |
| | Create a new FinancialDocumentWorkflow instance using Workflow 2.0. |
| | |
| | Args: |
| | session_id: Optional session ID for tracking workflow execution |
| | **kwargs: Additional parameters for workflow configuration |
| | |
| | Returns: |
| | FinancialDocumentWorkflow: Configured workflow instance |
| | """ |
| | return FinancialDocumentWorkflow(session_id=session_id, **kwargs) |