# """ # Stage 2: MCP Executor - Parallel API Execution # """ # import asyncio # import time # from typing import List, Dict, Any # from .servers.weather import WeatherServer # from .servers.soil import SoilPropertiesServer # from .servers.water import WaterServer # from .servers.elevation import ElevationServer # from .servers.pests import PestsServer # # MCP Server Registry # MCP_SERVER_REGISTRY = { # "weather": { # "name": "Weather Server (Open-Meteo)", # "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity", # "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"], # "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"] # }, # "soil_properties": { # "name": "Soil Properties Server (SoilGrids)", # "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database", # "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"], # "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"] # }, # "water": { # "name": "Groundwater Server (GRACE)", # "description": "Groundwater levels and drought indicators from NASA GRACE satellite data", # "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"], # "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"] # }, # "elevation": { # "name": "Elevation Server (OpenElevation)", # "description": "Field elevation and terrain data for irrigation planning", # "capabilities": ["elevation", "terrain_analysis"], # "use_for": ["elevation", "slope", "terrain", "drainage"] # }, # "pests": { # "name": "Pest Observation Server (iNaturalist)", # "description": "Recent pest and insect observations from community reporting", # "capabilities": ["pest_observations", "disease_reports", "pest_distribution"], # "use_for": ["pests", "insects", "disease", "outbreak"] # } # } # class MCPExecutor: # """Stage 2: Execute API calls in parallel""" # def __init__(self): # self.servers = { # "weather": WeatherServer(), # "soil_properties": SoilPropertiesServer(), # "water": WaterServer(), # "elevation": ElevationServer(), # "pests": PestsServer() # } # async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]: # """ # Call multiple servers simultaneously # Returns: # { # "results": { # "weather": {"status": "success", "data": {...}}, # ... # }, # "execution_time_seconds": float # } # """ # start_time = time.time() # tasks = [] # valid_servers = [] # for name in server_names: # if name in self.servers: # tasks.append(self.servers[name].get_data(lat, lon)) # valid_servers.append(name) # else: # print(f"⚠️ Unknown server: {name}") # # Execute all in parallel # results = await asyncio.gather(*tasks, return_exceptions=True) # # Format results # formatted_results = {} # for i, server_name in enumerate(valid_servers): # result = results[i] # if isinstance(result, Exception): # formatted_results[server_name] = { # "status": "error", # "error": str(result) # } # else: # formatted_results[server_name] = result # elapsed_time = time.time() - start_time # return { # "results": formatted_results, # "execution_time_seconds": round(elapsed_time, 2) # } # """ # MCP Executor - Stage 2 # Executes parallel calls to MCP servers based on routing decisions # """ # from typing import Dict, Any # from concurrent.futures import ThreadPoolExecutor, as_completed # import asyncio # class MCPExecutor: # """ # Executes MCP server calls based on routing decisions. # Integrates with existing server implementations in src/servers/ # Handles both sync and async server methods. # """ # def __init__(self, servers: Dict[str, Any]): # """ # Initialize executor with MCP server instances. # Args: # servers: Dict mapping server names to initialized server objects # e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...} # """ # self.servers = servers # def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: # """ # Execute MCP server calls in parallel based on routing. # Args: # routing: Simple dict with server names as keys and True/False as values # location: Dict with 'latitude' and 'longitude' keys # Returns: # Dict mapping server names to their results with metadata # """ # results = {} # tasks = [] # # Prepare tasks for servers marked for querying # for server_name, should_query in routing.items(): # if should_query and server_name in self.servers: # tasks.append({ # "server_name": server_name, # "server": self.servers[server_name], # "location": location # }) # # Execute in parallel using ThreadPoolExecutor # with ThreadPoolExecutor(max_workers=5) as executor: # futures = { # executor.submit(self._call_server_sync, task): task # for task in tasks # } # for future in as_completed(futures): # task = futures[future] # server_name = task["server_name"] # try: # result = future.result(timeout=30) # results[server_name] = { # "data": result, # "status": "success" # } # print(f"✓ {server_name.upper()}: Retrieved successfully") # except Exception as e: # results[server_name] = { # "data": None, # "status": "error", # "error": str(e) # } # print(f"✗ {server_name.upper()}: Error - {str(e)}") # return results # def _call_server_sync(self, task: Dict[str, Any]) -> Any: # """ # Call individual MCP server, handling both sync and async methods. # Args: # task: Dict containing server, location, and metadata # Returns: # Server response data # """ # server = task["server"] # location = task["location"] # # Try async method first (most of your servers use async) # if hasattr(server, 'get_data'): # method = getattr(server, 'get_data') # # Check if it's async # if asyncio.iscoroutinefunction(method): # # Run async method in new event loop # try: # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # result = loop.run_until_complete( # method(location['latitude'], location['longitude']) # ) # loop.close() # return result # except Exception as e: # raise Exception(f"Async execution failed: {str(e)}") # else: # # Sync method # return method(location['latitude'], location['longitude']) # # Fallback to other method names # elif hasattr(server, 'query'): # return server.query(location) # elif hasattr(server, 'fetch_data'): # return server.fetch_data(location['latitude'], location['longitude']) # else: # raise AttributeError(f"Server {task['server_name']} has no compatible query method") # def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: # """ # Execute MCP server calls sequentially (fallback if parallel fails). # Args: # routing: Simple dict with server names as keys and True/False as values # location: Dict with 'latitude' and 'longitude' keys # Returns: # Dict mapping server names to their results # """ # results = {} # for server_name, should_query in routing.items(): # if should_query and server_name in self.servers: # try: # task = { # "server_name": server_name, # "server": self.servers[server_name], # "location": location # } # result = self._call_server_sync(task) # results[server_name] = { # "data": result, # "status": "success" # } # print(f"✓ {server_name.upper()}: Retrieved successfully") # except Exception as e: # results[server_name] = { # "data": None, # "status": "error", # "error": str(e) # } # print(f"✗ {server_name.upper()}: Error - {str(e)}") # return results # return results """ MCP Executor - Stage 2 Executes parallel calls to MCP servers based on routing decisions FIXED: 1. Proper async handling for FastAPI (no asyncio.run inside existing loop) 2. Fixed double-wrapping of server results """ from typing import Dict, Any import asyncio import inspect class MCPExecutor: """ Executes MCP server calls based on routing decisions. Properly handles async servers within FastAPI's event loop. """ def __init__(self, servers: Dict[str, Any]): """ Initialize executor with MCP server instances. Args: servers: Dict mapping server names to initialized server objects """ self.servers = servers async def execute_parallel_async(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: """ Execute MCP server calls in parallel (async version for FastAPI). Args: routing: Dict with server names as keys and True/False as values location: Dict with 'latitude' and 'longitude' keys Returns: Dict mapping server names to their results """ results = {} tasks = [] server_names = [] for server_name, should_query in routing.items(): if should_query and server_name in self.servers: server = self.servers[server_name] tasks.append(self._call_server(server, server_name, location)) server_names.append(server_name) if not tasks: return results # Execute all tasks concurrently task_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for server_name, result in zip(server_names, task_results): if isinstance(result, Exception): results[server_name] = { "data": None, "status": "error", "error": str(result) } print(f"✗ {server_name.upper()}: Error - {str(result)}") else: # FIX: Handle servers that return {"status": ..., "data": ...} # Don't double-wrap! if isinstance(result, dict) and "status" in result: # Server already returned proper format if result.get("status") == "success": results[server_name] = { "data": result.get("data"), # Extract actual data "status": "success" } else: results[server_name] = { "data": None, "status": "error", "error": result.get("error", "Unknown error") } else: # Server returned raw data results[server_name] = { "data": result, "status": "success" } print(f"✓ {server_name.upper()}: Retrieved successfully") return results def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: """ Execute MCP server calls in parallel (sync wrapper). Detects if we're already in an async context and handles appropriately. """ try: # Check if there's already a running event loop loop = asyncio.get_running_loop() # We're in an async context - need to use nest_asyncio or return a coroutine # For FastAPI, the endpoint should be async and call execute_parallel_async directly raise RuntimeError( "execute_parallel called from async context. " "Use 'await executor.execute_parallel_async()' instead." ) except RuntimeError: # No running loop - safe to use asyncio.run return asyncio.run(self.execute_parallel_async(routing, location)) async def _call_server(self, server: Any, server_name: str, location: Dict[str, float]) -> Any: """ Call individual MCP server, handling both sync and async methods. """ lat = location['latitude'] lon = location['longitude'] if hasattr(server, 'get_data'): method = getattr(server, 'get_data') if inspect.iscoroutinefunction(method): # Async method - await it return await method(lat, lon) else: # Sync method - run in executor to not block loop = asyncio.get_event_loop() return await loop.run_in_executor(None, method, lat, lon) else: raise AttributeError(f"Server {server_name} has no get_data method")