Spaces:
Runtime error
Runtime error
| """Agent service for managing financial analysis and reporting.""" | |
| import os | |
| from typing import AsyncGenerator, Any | |
| from mistralai import Mistral | |
| from mistralai.extra.run.context import RunContext | |
| from mcp import StdioServerParameters | |
| from mistralai.extra.mcp.stdio import MCPClientSTDIO | |
| from app.models.schemas import FinanceOutputFormat | |
| from app.utils.config import MODEL, prompt_map | |
| from app.services.base_agent import BaseAgent | |
| from app.mcp_servers import mcp_servers_dir | |
| class FinanceAgent(BaseAgent): | |
| """Agent that analyzes financial data and generates financial reports. | |
| This agent specializes in financial data analysis, stock market research, | |
| and generating comprehensive financial reports and insights. | |
| Attributes: | |
| client: The Mistral AI client instance | |
| agent: The configured agent instance | |
| """ | |
| def __init__(self) -> None: | |
| """Initialize the FinanceAgent with default values.""" | |
| self.client = None | |
| self.agent = None | |
| async def initialize(self) -> None: | |
| """Initialize the MistralAI client and create the financial agent.""" | |
| api_key = os.environ["MISTRAL_API_KEY"] | |
| self.client = Mistral(api_key=api_key) | |
| self.agent = self.client.beta.agents.create( | |
| model=MODEL, | |
| name=prompt_map["finance_agent"]["name"], | |
| instructions=prompt_map["finance_agent"]["instructions"], | |
| description=prompt_map["finance_agent"]["description"], | |
| ) | |
| async def process_query(self, query: str) -> AsyncGenerator[Any, None]: | |
| """Process user query using the agent with MCP servers for financial data and reporting. | |
| Args: | |
| query: The user's question or request for financial analysis | |
| Yields: | |
| Events from the agent processing pipeline | |
| """ | |
| async def run_in_context(): | |
| if not self.client or not self.agent: | |
| await self.initialize() | |
| # Configure MCP servers for different capabilities | |
| server_params = [ | |
| # Yahoo Finance server for stock data and financial metrics | |
| StdioServerParameters( | |
| command="python", | |
| args=[str((mcp_servers_dir / "stdio_yfinance_server.py").resolve())], | |
| env=dict(os.environ), | |
| ), | |
| ] | |
| async with RunContext( | |
| agent_id=self.agent.id, | |
| continue_on_fn_error=False, | |
| output_format=FinanceOutputFormat, | |
| ) as run_ctx: | |
| # Register all MCP clients with the run context | |
| mcp_clients = [MCPClientSTDIO(stdio_params=params) for params in server_params] | |
| await run_ctx.register_mcp_clients(mcp_clients=mcp_clients) | |
| # Stream agent responses with tool execution events | |
| result_events = await self.client.beta.conversations.run_stream_async( | |
| run_ctx=run_ctx, | |
| # instructions, # NOTE: instructions are ignored if agent_id is provided | |
| inputs=[ | |
| {"role": "user", "content": query}, | |
| ], | |
| ) | |
| async for event in result_events: | |
| yield event | |
| return run_in_context() | |