llms-txt-mcp / src /app /services /finance_agent.py
shern
update agents to inherit env var
70b6a2a
"""Agent service for managing financial analysis and reporting."""
import os
from typing import AsyncGenerator, Any
from mistralai import Mistral
from mistralai.extra.run.context import RunContext
from mcp import StdioServerParameters
from mistralai.extra.mcp.stdio import MCPClientSTDIO
from app.models.schemas import FinanceOutputFormat
from app.utils.config import MODEL, prompt_map
from app.services.base_agent import BaseAgent
from app.mcp_servers import mcp_servers_dir
class FinanceAgent(BaseAgent):
"""Agent that analyzes financial data and generates financial reports.
This agent specializes in financial data analysis, stock market research,
and generating comprehensive financial reports and insights.
Attributes:
client: The Mistral AI client instance
agent: The configured agent instance
"""
def __init__(self) -> None:
"""Initialize the FinanceAgent with default values."""
self.client = None
self.agent = None
async def initialize(self) -> None:
"""Initialize the MistralAI client and create the financial agent."""
api_key = os.environ["MISTRAL_API_KEY"]
self.client = Mistral(api_key=api_key)
self.agent = self.client.beta.agents.create(
model=MODEL,
name=prompt_map["finance_agent"]["name"],
instructions=prompt_map["finance_agent"]["instructions"],
description=prompt_map["finance_agent"]["description"],
)
async def process_query(self, query: str) -> AsyncGenerator[Any, None]:
"""Process user query using the agent with MCP servers for financial data and reporting.
Args:
query: The user's question or request for financial analysis
Yields:
Events from the agent processing pipeline
"""
async def run_in_context():
if not self.client or not self.agent:
await self.initialize()
# Configure MCP servers for different capabilities
server_params = [
# Yahoo Finance server for stock data and financial metrics
StdioServerParameters(
command="python",
args=[str((mcp_servers_dir / "stdio_yfinance_server.py").resolve())],
env=dict(os.environ),
),
]
async with RunContext(
agent_id=self.agent.id,
continue_on_fn_error=False,
output_format=FinanceOutputFormat,
) as run_ctx:
# Register all MCP clients with the run context
mcp_clients = [MCPClientSTDIO(stdio_params=params) for params in server_params]
await run_ctx.register_mcp_clients(mcp_clients=mcp_clients)
# Stream agent responses with tool execution events
result_events = await self.client.beta.conversations.run_stream_async(
run_ctx=run_ctx,
# instructions, # NOTE: instructions are ignored if agent_id is provided
inputs=[
{"role": "user", "content": query},
],
)
async for event in result_events:
yield event
return run_in_context()