# """ # Stage 2: MCP Executor - Parallel API Execution # """ # import asyncio # import time # from typing import List, Dict, Any # from .servers.weather import WeatherServer # from .servers.soil import SoilPropertiesServer # from .servers.water import WaterServer # from .servers.elevation import ElevationServer # from .servers.pests import PestsServer # # MCP Server Registry # MCP_SERVER_REGISTRY = { # "weather": { # "name": "Weather Server (Open-Meteo)", # "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity", # "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"], # "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"] # }, # "soil_properties": { # "name": "Soil Properties Server (SoilGrids)", # "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database", # "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"], # "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"] # }, # "water": { # "name": "Groundwater Server (GRACE)", # "description": "Groundwater levels and drought indicators from NASA GRACE satellite data", # "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"], # "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"] # }, # "elevation": { # "name": "Elevation Server (OpenElevation)", # "description": "Field elevation and terrain data for irrigation planning", # "capabilities": ["elevation", "terrain_analysis"], # "use_for": ["elevation", "slope", "terrain", "drainage"] # }, # "pests": { # "name": "Pest Observation Server (iNaturalist)", # "description": "Recent pest and insect observations from community reporting", # "capabilities": ["pest_observations", "disease_reports", "pest_distribution"], # "use_for": ["pests", "insects", "disease", "outbreak"] # } # } # class MCPExecutor: # """Stage 2: Execute API calls in parallel""" # def __init__(self): # self.servers = { # "weather": WeatherServer(), # "soil_properties": SoilPropertiesServer(), # "water": WaterServer(), # "elevation": ElevationServer(), # "pests": PestsServer() # } # async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]: # """ # Call multiple servers simultaneously # Returns: # { # "results": { # "weather": {"status": "success", "data": {...}}, # ... # }, # "execution_time_seconds": float # } # """ # start_time = time.time() # tasks = [] # valid_servers = [] # for name in server_names: # if name in self.servers: # tasks.append(self.servers[name].get_data(lat, lon)) # valid_servers.append(name) # else: # print(f"⚠️ Unknown server: {name}") # # Execute all in parallel # results = await asyncio.gather(*tasks, return_exceptions=True) # # Format results # formatted_results = {} # for i, server_name in enumerate(valid_servers): # result = results[i] # if isinstance(result, Exception): # formatted_results[server_name] = { # "status": "error", # "error": str(result) # } # else: # formatted_results[server_name] = result # elapsed_time = time.time() - start_time # return { # "results": formatted_results, # "execution_time_seconds": round(elapsed_time, 2) # } # """ # MCP Executor - Stage 2 # Executes parallel calls to MCP servers based on routing decisions # """ # from typing import Dict, Any # from concurrent.futures import ThreadPoolExecutor, as_completed # import asyncio # class MCPExecutor: # """ # Executes MCP server calls based on routing decisions. # Integrates with existing server implementations in src/servers/ # Handles both sync and async server methods. # """ # def __init__(self, servers: Dict[str, Any]): # """ # Initialize executor with MCP server instances. # Args: # servers: Dict mapping server names to initialized server objects # e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...} # """ # self.servers = servers # def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: # """ # Execute MCP server calls in parallel based on routing. # Args: # routing: Simple dict with server names as keys and True/False as values # location: Dict with 'latitude' and 'longitude' keys # Returns: # Dict mapping server names to their results with metadata # """ # results = {} # tasks = [] # # Prepare tasks for servers marked for querying # for server_name, should_query in routing.items(): # if should_query and server_name in self.servers: # tasks.append({ # "server_name": server_name, # "server": self.servers[server_name], # "location": location # }) # # Execute in parallel using ThreadPoolExecutor # with ThreadPoolExecutor(max_workers=5) as executor: # futures = { # executor.submit(self._call_server_sync, task): task # for task in tasks # } # for future in as_completed(futures): # task = futures[future] # server_name = task["server_name"] # try: # result = future.result(timeout=30) # results[server_name] = { # "data": result, # "status": "success" # } # print(f"✓ {server_name.upper()}: Retrieved successfully") # except Exception as e: # results[server_name] = { # "data": None, # "status": "error", # "error": str(e) # } # print(f"✗ {server_name.upper()}: Error - {str(e)}") # return results # def _call_server_sync(self, task: Dict[str, Any]) -> Any: # """ # Call individual MCP server, handling both sync and async methods. # Args: # task: Dict containing server, location, and metadata # Returns: # Server response data # """ # server = task["server"] # location = task["location"] # # Try async method first (most of your servers use async) # if hasattr(server, 'get_data'): # method = getattr(server, 'get_data') # # Check if it's async # if asyncio.iscoroutinefunction(method): # # Run async method in new event loop # try: # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # result = loop.run_until_complete( # method(location['latitude'], location['longitude']) # ) # loop.close() # return result # except Exception as e: # raise Exception(f"Async execution failed: {str(e)}") # else: # # Sync method # return method(location['latitude'], location['longitude']) # # Fallback to other method names # elif hasattr(server, 'query'): # return server.query(location) # elif hasattr(server, 'fetch_data'): # return server.fetch_data(location['latitude'], location['longitude']) # else: # raise AttributeError(f"Server {task['server_name']} has no compatible query method") # def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: # """ # Execute MCP server calls sequentially (fallback if parallel fails). # Args: # routing: Simple dict with server names as keys and True/False as values # location: Dict with 'latitude' and 'longitude' keys # Returns: # Dict mapping server names to their results # """ # results = {} # for server_name, should_query in routing.items(): # if should_query and server_name in self.servers: # try: # task = { # "server_name": server_name, # "server": self.servers[server_name], # "location": location # } # result = self._call_server_sync(task) # results[server_name] = { # "data": result, # "status": "success" # } # print(f"✓ {server_name.upper()}: Retrieved successfully") # except Exception as e: # results[server_name] = { # "data": None, # "status": "error", # "error": str(e) # } # print(f"✗ {server_name.upper()}: Error - {str(e)}") # return results # return results """ MCP Executor - Stage 2 Executes parallel calls to MCP servers based on routing decisions FIXED: Simpler async handling to prevent deadlocks """ from typing import Dict, Any from concurrent.futures import ThreadPoolExecutor, as_completed import asyncio import inspect class MCPExecutor: """ Executes MCP server calls based on routing decisions. Integrates with existing server implementations in src/servers/ Handles both sync and async server methods safely. """ def __init__(self, servers: Dict[str, Any]): """ Initialize executor with MCP server instances. Args: servers: Dict mapping server names to initialized server objects e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...} """ self.servers = servers def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: """ Execute MCP server calls in parallel based on routing. Args: routing: Simple dict with server names as keys and True/False as values location: Dict with 'latitude' and 'longitude' keys Returns: Dict mapping server names to their results with metadata """ results = {} # For async servers, we need to run them differently # Separate sync and async servers sync_tasks = [] async_tasks = [] for server_name, should_query in routing.items(): if should_query and server_name in self.servers: server = self.servers[server_name] task = { "server_name": server_name, "server": server, "location": location } # Check if server method is async if hasattr(server, 'get_data'): method = getattr(server, 'get_data') if inspect.iscoroutinefunction(method): async_tasks.append(task) else: sync_tasks.append(task) else: sync_tasks.append(task) # Execute sync servers in parallel with ThreadPoolExecutor if sync_tasks: with ThreadPoolExecutor(max_workers=5) as executor: futures = { executor.submit(self._call_sync_server, task): task for task in sync_tasks } for future in as_completed(futures): task = futures[future] server_name = task["server_name"] try: result = future.result(timeout=30) results[server_name] = { "data": result, "status": "success" } print(f"✓ {server_name.upper()}: Retrieved successfully") except Exception as e: results[server_name] = { "data": None, "status": "error", "error": str(e) } print(f"✗ {server_name.upper()}: Error - {str(e)}") # Execute async servers together in single event loop if async_tasks: try: async_results = asyncio.run(self._execute_async_batch(async_tasks)) results.update(async_results) except Exception as e: # If batch fails, mark all as failed for task in async_tasks: results[task["server_name"]] = { "data": None, "status": "error", "error": f"Async batch execution failed: {str(e)}" } print(f"✗ {task['server_name'].upper()}: Async batch error") return results async def _execute_async_batch(self, tasks: list) -> Dict[str, Any]: """ Execute multiple async server calls concurrently in a single event loop. This is safer than creating multiple event loops. """ results = {} # Create async tasks for all servers async_calls = [] for task in tasks: async_calls.append(self._call_async_server(task)) # Execute all async calls concurrently task_results = await asyncio.gather(*async_calls, return_exceptions=True) # Process results for task, result in zip(tasks, task_results): server_name = task["server_name"] if isinstance(result, Exception): results[server_name] = { "data": None, "status": "error", "error": str(result) } print(f"✗ {server_name.upper()}: Error - {str(result)}") else: results[server_name] = { "data": result, "status": "success" } print(f"✓ {server_name.upper()}: Retrieved successfully") return results async def _call_async_server(self, task: Dict[str, Any]) -> Any: """Call individual async MCP server""" server = task["server"] location = task["location"] if hasattr(server, 'get_data'): return await server.get_data(location['latitude'], location['longitude']) else: raise AttributeError(f"Server {task['server_name']} has no get_data method") def _call_sync_server(self, task: Dict[str, Any]) -> Any: """Call individual sync MCP server""" server = task["server"] location = task["location"] if hasattr(server, 'get_data'): return server.get_data(location['latitude'], location['longitude']) elif hasattr(server, 'query'): return server.query(location) elif hasattr(server, 'fetch_data'): return server.fetch_data(location['latitude'], location['longitude']) else: raise AttributeError(f"Server {task['server_name']} has no compatible query method") def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: """ Execute MCP server calls sequentially (fallback if parallel fails). """ results = {} for server_name, should_query in routing.items(): if should_query and server_name in self.servers: try: server = self.servers[server_name] # Check if async if hasattr(server, 'get_data') and inspect.iscoroutinefunction(server.get_data): # Run async method result = asyncio.run(server.get_data(location['latitude'], location['longitude'])) else: # Run sync method task = { "server_name": server_name, "server": server, "location": location } result = self._call_sync_server(task) results[server_name] = { "data": result, "status": "success" } print(f"✓ {server_name.upper()}: Retrieved successfully") except Exception as e: results[server_name] = { "data": None, "status": "error", "error": str(e) } print(f"✗ {server_name.upper()}: Error - {str(e)}") return results return results