Spaces:
Running
Running
| # agents/tool_calling_agents.py (Corrected with longer timeout) | |
| import httpx | |
| import logging | |
| # --- Configuration --- | |
| import os | |
| MCP_GATEWAY_URL = os.getenv("MCP_GATEWAY_URL", "http://127.0.0.1:8000/route_agent_request") | |
| # --- Logging Setup --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("ToolCallingAgents") | |
| class BaseAgent: | |
| """A base class for agents that call tools via the MCP Gateway.""" | |
| def __init__(self): | |
| # A reasonable default timeout for fast, external APIs | |
| self.client = httpx.Client(timeout=30.0) | |
| def call_mcp_gateway(self, target_service: str, payload: dict) -> dict: | |
| """A standardized method to make a request to the MCP Gateway.""" | |
| request_body = { "target_service": target_service, "payload": payload } | |
| try: | |
| logger.info(f"Agent calling MCP Gateway for service '{target_service}' with payload: {payload}") | |
| response = self.client.post(MCP_GATEWAY_URL, json=request_body) | |
| response.raise_for_status() | |
| logger.info(f"Received successful response from MCP Gateway for '{target_service}'.") | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"Error response {e.response.status_code} from MCP Gateway: {e.response.text}") | |
| raise | |
| except httpx.RequestError as e: | |
| logger.error(f"Failed to connect to MCP Gateway at {MCP_GATEWAY_URL}: {e}") | |
| raise | |
| class WebResearchAgent(BaseAgent): | |
| """An agent specialized in performing web research using Tavily.""" | |
| def research(self, queries: list[str], search_depth: str = "basic") -> dict: | |
| payload = { "queries": queries, "search_depth": search_depth } | |
| return self.call_mcp_gateway("tavily_research", payload) | |
| class MarketDataAgent(BaseAgent): | |
| """An agent specialized in fetching financial market data.""" | |
| def get_market_data(self, symbol: str, time_range: str = "DAILY") -> dict: | |
| payload = { "symbol": symbol, "time_range": time_range } | |
| return self.call_mcp_gateway("alpha_vantage_market_data", payload) | |
| def get_company_overview(self, symbol: str) -> dict: | |
| """Fetch company fundamentals (Revenue, EPS, P/E, Market Cap, etc.) - FREE tier.""" | |
| payload = { "symbol": symbol } | |
| return self.call_mcp_gateway("alpha_vantage_overview", payload) | |
| def get_global_quote(self, symbol: str) -> dict: | |
| """Fetch real-time price quote (price, change, volume) - FREE tier.""" | |
| payload = { "symbol": symbol } | |
| return self.call_mcp_gateway("alpha_vantage_quote", payload) | |
| class InternalPortfolioAgent(BaseAgent): | |
| """An agent specialized in securely querying the internal portfolio database.""" | |
| # --- THIS IS THE FIX --- | |
| def __init__(self): | |
| # Override the default client with one that has a longer timeout | |
| # because local LLM calls can be slow. | |
| super().__init__() | |
| self.client = httpx.Client(timeout=180.0) # Give it 180 seconds | |
| def query_portfolio(self, question: str) -> dict: | |
| payload = { "question": question } | |
| return self.call_mcp_gateway("internal_portfolio_data", payload) | |
| # --- Example Usage (for testing this file directly) --- | |
| if __name__ == '__main__': | |
| print("--- Testing Agents ---") | |
| # Make sure all your MCP servers and the gateway are running. | |
| # 1. Test the Web Research Agent | |
| print("\n[1] Testing Web Research Agent...") | |
| try: | |
| web_agent = WebResearchAgent() | |
| research_results = web_agent.research(queries=["What is the current market sentiment on NVIDIA?"]) | |
| print("Web Research Result:", research_results['status']) | |
| except Exception as e: | |
| print("Web Research Agent failed:", e) | |
| # 2. Test the Market Data Agent | |
| print("\n[2] Testing Market Data Agent...") | |
| try: | |
| market_agent = MarketDataAgent() | |
| market_results = market_agent.get_intraday_data(symbol="TSLA", interval="15min") | |
| print("Market Data Result:", market_results['status']) | |
| except Exception as e: | |
| print("Market Data Agent failed:", e) | |
| # 3. Test the Internal Portfolio Agent | |
| print("\n[3] Testing Internal Portfolio Agent...") | |
| try: | |
| portfolio_agent = InternalPortfolioAgent() | |
| portfolio_results = portfolio_agent.query_portfolio(question="How many shares of AAPL do we own?") | |
| print("Portfolio Query Result:", portfolio_results['status']) | |
| except Exception as e: | |
| print("Internal Portfolio Agent failed:", e) |