File size: 4,107 Bytes
3e30d53 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | # agents/tool_calling_agents.py (Corrected with longer timeout)
import httpx
import logging
# --- Configuration ---
import os
MCP_GATEWAY_URL = os.getenv("MCP_GATEWAY_URL", "http://127.0.0.1:8000/route_agent_request")
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ToolCallingAgents")
class BaseAgent:
"""A base class for agents that call tools via the MCP Gateway."""
def __init__(self):
# A reasonable default timeout for fast, external APIs
self.client = httpx.Client(timeout=30.0)
def call_mcp_gateway(self, target_service: str, payload: dict) -> dict:
"""A standardized method to make a request to the MCP Gateway."""
request_body = { "target_service": target_service, "payload": payload }
try:
logger.info(f"Agent calling MCP Gateway for service '{target_service}' with payload: {payload}")
response = self.client.post(MCP_GATEWAY_URL, json=request_body)
response.raise_for_status()
logger.info(f"Received successful response from MCP Gateway for '{target_service}'.")
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"Error response {e.response.status_code} from MCP Gateway: {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Failed to connect to MCP Gateway at {MCP_GATEWAY_URL}: {e}")
raise
class WebResearchAgent(BaseAgent):
"""An agent specialized in performing web research using Tavily."""
def research(self, queries: list[str], search_depth: str = "basic") -> dict:
payload = { "queries": queries, "search_depth": search_depth }
return self.call_mcp_gateway("tavily_research", payload)
class MarketDataAgent(BaseAgent):
"""An agent specialized in fetching financial market data."""
def get_market_data(self, symbol: str, time_range: str = "INTRADAY") -> dict:
payload = { "symbol": symbol, "time_range": time_range }
return self.call_mcp_gateway("alpha_vantage_market_data", payload)
class InternalPortfolioAgent(BaseAgent):
"""An agent specialized in securely querying the internal portfolio database."""
# --- THIS IS THE FIX ---
def __init__(self):
# Override the default client with one that has a longer timeout
# because local LLM calls can be slow.
super().__init__()
self.client = httpx.Client(timeout=180.0) # Give it 180 seconds
def query_portfolio(self, question: str) -> dict:
payload = { "question": question }
return self.call_mcp_gateway("internal_portfolio_data", payload)
# --- Example Usage (for testing this file directly) ---
if __name__ == '__main__':
print("--- Testing Agents ---")
# Make sure all your MCP servers and the gateway are running.
# 1. Test the Web Research Agent
print("\n[1] Testing Web Research Agent...")
try:
web_agent = WebResearchAgent()
research_results = web_agent.research(queries=["What is the current market sentiment on NVIDIA?"])
print("Web Research Result:", research_results['status'])
except Exception as e:
print("Web Research Agent failed:", e)
# 2. Test the Market Data Agent
print("\n[2] Testing Market Data Agent...")
try:
market_agent = MarketDataAgent()
market_results = market_agent.get_intraday_data(symbol="TSLA", interval="15min")
print("Market Data Result:", market_results['status'])
except Exception as e:
print("Market Data Agent failed:", e)
# 3. Test the Internal Portfolio Agent
print("\n[3] Testing Internal Portfolio Agent...")
try:
portfolio_agent = InternalPortfolioAgent()
portfolio_results = portfolio_agent.query_portfolio(question="How many shares of AAPL do we own?")
print("Portfolio Query Result:", portfolio_results['status'])
except Exception as e:
print("Internal Portfolio Agent failed:", e) |