| | |
| | import httpx |
| | import logging |
| |
|
| | |
| | import os |
| | MCP_GATEWAY_URL = os.getenv("MCP_GATEWAY_URL", "http://127.0.0.1:8000/route_agent_request") |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| | logger = logging.getLogger("ToolCallingAgents") |
| |
|
| | class BaseAgent: |
| | """A base class for agents that call tools via the MCP Gateway.""" |
| | def __init__(self): |
| | |
| | self.client = httpx.Client(timeout=30.0) |
| |
|
| | def call_mcp_gateway(self, target_service: str, payload: dict) -> dict: |
| | """A standardized method to make a request to the MCP Gateway.""" |
| | request_body = { "target_service": target_service, "payload": payload } |
| | try: |
| | logger.info(f"Agent calling MCP Gateway for service '{target_service}' with payload: {payload}") |
| | response = self.client.post(MCP_GATEWAY_URL, json=request_body) |
| | response.raise_for_status() |
| | logger.info(f"Received successful response from MCP Gateway for '{target_service}'.") |
| | return response.json() |
| | except httpx.HTTPStatusError as e: |
| | logger.error(f"Error response {e.response.status_code} from MCP Gateway: {e.response.text}") |
| | raise |
| | except httpx.RequestError as e: |
| | logger.error(f"Failed to connect to MCP Gateway at {MCP_GATEWAY_URL}: {e}") |
| | raise |
| |
|
| | class WebResearchAgent(BaseAgent): |
| | """An agent specialized in performing web research using Tavily.""" |
| | def research(self, queries: list[str], search_depth: str = "basic") -> dict: |
| | payload = { "queries": queries, "search_depth": search_depth } |
| | return self.call_mcp_gateway("tavily_research", payload) |
| |
|
| | class MarketDataAgent(BaseAgent): |
| | """An agent specialized in fetching financial market data.""" |
| | def get_market_data(self, symbol: str, time_range: str = "INTRADAY") -> dict: |
| | payload = { "symbol": symbol, "time_range": time_range } |
| | return self.call_mcp_gateway("alpha_vantage_market_data", payload) |
| |
|
| | class InternalPortfolioAgent(BaseAgent): |
| | """An agent specialized in securely querying the internal portfolio database.""" |
| |
|
| | |
| | def __init__(self): |
| | |
| | |
| | super().__init__() |
| | self.client = httpx.Client(timeout=180.0) |
| |
|
| | def query_portfolio(self, question: str) -> dict: |
| | payload = { "question": question } |
| | return self.call_mcp_gateway("internal_portfolio_data", payload) |
| |
|
| | |
| | if __name__ == '__main__': |
| | print("--- Testing Agents ---") |
| | |
| | |
| | |
| | |
| | print("\n[1] Testing Web Research Agent...") |
| | try: |
| | web_agent = WebResearchAgent() |
| | research_results = web_agent.research(queries=["What is the current market sentiment on NVIDIA?"]) |
| | print("Web Research Result:", research_results['status']) |
| | except Exception as e: |
| | print("Web Research Agent failed:", e) |
| |
|
| | |
| | print("\n[2] Testing Market Data Agent...") |
| | try: |
| | market_agent = MarketDataAgent() |
| | market_results = market_agent.get_intraday_data(symbol="TSLA", interval="15min") |
| | print("Market Data Result:", market_results['status']) |
| | except Exception as e: |
| | print("Market Data Agent failed:", e) |
| |
|
| | |
| | print("\n[3] Testing Internal Portfolio Agent...") |
| | try: |
| | portfolio_agent = InternalPortfolioAgent() |
| | portfolio_results = portfolio_agent.query_portfolio(question="How many shares of AAPL do we own?") |
| | print("Portfolio Query Result:", portfolio_results['status']) |
| | except Exception as e: |
| | print("Internal Portfolio Agent failed:", e) |