Spaces:
Running
Running
| # """ | |
| # Stage 2: MCP Executor - Parallel API Execution | |
| # """ | |
| # import asyncio | |
| # import time | |
| # from typing import List, Dict, Any | |
| # from .servers.weather import WeatherServer | |
| # from .servers.soil import SoilPropertiesServer | |
| # from .servers.water import WaterServer | |
| # from .servers.elevation import ElevationServer | |
| # from .servers.pests import PestsServer | |
| # # MCP Server Registry | |
| # MCP_SERVER_REGISTRY = { | |
| # "weather": { | |
| # "name": "Weather Server (Open-Meteo)", | |
| # "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity", | |
| # "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"], | |
| # "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"] | |
| # }, | |
| # "soil_properties": { | |
| # "name": "Soil Properties Server (SoilGrids)", | |
| # "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database", | |
| # "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"], | |
| # "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"] | |
| # }, | |
| # "water": { | |
| # "name": "Groundwater Server (GRACE)", | |
| # "description": "Groundwater levels and drought indicators from NASA GRACE satellite data", | |
| # "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"], | |
| # "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"] | |
| # }, | |
| # "elevation": { | |
| # "name": "Elevation Server (OpenElevation)", | |
| # "description": "Field elevation and terrain data for irrigation planning", | |
| # "capabilities": ["elevation", "terrain_analysis"], | |
| # "use_for": ["elevation", "slope", "terrain", "drainage"] | |
| # }, | |
| # "pests": { | |
| # "name": "Pest Observation Server (iNaturalist)", | |
| # "description": "Recent pest and insect observations from community reporting", | |
| # "capabilities": ["pest_observations", "disease_reports", "pest_distribution"], | |
| # "use_for": ["pests", "insects", "disease", "outbreak"] | |
| # } | |
| # } | |
| # class MCPExecutor: | |
| # """Stage 2: Execute API calls in parallel""" | |
| # def __init__(self): | |
| # self.servers = { | |
| # "weather": WeatherServer(), | |
| # "soil_properties": SoilPropertiesServer(), | |
| # "water": WaterServer(), | |
| # "elevation": ElevationServer(), | |
| # "pests": PestsServer() | |
| # } | |
| # async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]: | |
| # """ | |
| # Call multiple servers simultaneously | |
| # Returns: | |
| # { | |
| # "results": { | |
| # "weather": {"status": "success", "data": {...}}, | |
| # ... | |
| # }, | |
| # "execution_time_seconds": float | |
| # } | |
| # """ | |
| # start_time = time.time() | |
| # tasks = [] | |
| # valid_servers = [] | |
| # for name in server_names: | |
| # if name in self.servers: | |
| # tasks.append(self.servers[name].get_data(lat, lon)) | |
| # valid_servers.append(name) | |
| # else: | |
| # print(f"β οΈ Unknown server: {name}") | |
| # # Execute all in parallel | |
| # results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # # Format results | |
| # formatted_results = {} | |
| # for i, server_name in enumerate(valid_servers): | |
| # result = results[i] | |
| # if isinstance(result, Exception): | |
| # formatted_results[server_name] = { | |
| # "status": "error", | |
| # "error": str(result) | |
| # } | |
| # else: | |
| # formatted_results[server_name] = result | |
| # elapsed_time = time.time() - start_time | |
| # return { | |
| # "results": formatted_results, | |
| # "execution_time_seconds": round(elapsed_time, 2) | |
| # } | |
| # """ | |
| # MCP Executor - Stage 2 | |
| # Executes parallel calls to MCP servers based on routing decisions | |
| # """ | |
| # from typing import Dict, Any | |
| # from concurrent.futures import ThreadPoolExecutor, as_completed | |
| # import asyncio | |
| # class MCPExecutor: | |
| # """ | |
| # Executes MCP server calls based on routing decisions. | |
| # Integrates with existing server implementations in src/servers/ | |
| # Handles both sync and async server methods. | |
| # """ | |
| # def __init__(self, servers: Dict[str, Any]): | |
| # """ | |
| # Initialize executor with MCP server instances. | |
| # Args: | |
| # servers: Dict mapping server names to initialized server objects | |
| # e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...} | |
| # """ | |
| # self.servers = servers | |
| # def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: | |
| # """ | |
| # Execute MCP server calls in parallel based on routing. | |
| # Args: | |
| # routing: Simple dict with server names as keys and True/False as values | |
| # location: Dict with 'latitude' and 'longitude' keys | |
| # Returns: | |
| # Dict mapping server names to their results with metadata | |
| # """ | |
| # results = {} | |
| # tasks = [] | |
| # # Prepare tasks for servers marked for querying | |
| # for server_name, should_query in routing.items(): | |
| # if should_query and server_name in self.servers: | |
| # tasks.append({ | |
| # "server_name": server_name, | |
| # "server": self.servers[server_name], | |
| # "location": location | |
| # }) | |
| # # Execute in parallel using ThreadPoolExecutor | |
| # with ThreadPoolExecutor(max_workers=5) as executor: | |
| # futures = { | |
| # executor.submit(self._call_server_sync, task): task | |
| # for task in tasks | |
| # } | |
| # for future in as_completed(futures): | |
| # task = futures[future] | |
| # server_name = task["server_name"] | |
| # try: | |
| # result = future.result(timeout=30) | |
| # results[server_name] = { | |
| # "data": result, | |
| # "status": "success" | |
| # } | |
| # print(f"β {server_name.upper()}: Retrieved successfully") | |
| # except Exception as e: | |
| # results[server_name] = { | |
| # "data": None, | |
| # "status": "error", | |
| # "error": str(e) | |
| # } | |
| # print(f"β {server_name.upper()}: Error - {str(e)}") | |
| # return results | |
| # def _call_server_sync(self, task: Dict[str, Any]) -> Any: | |
| # """ | |
| # Call individual MCP server, handling both sync and async methods. | |
| # Args: | |
| # task: Dict containing server, location, and metadata | |
| # Returns: | |
| # Server response data | |
| # """ | |
| # server = task["server"] | |
| # location = task["location"] | |
| # # Try async method first (most of your servers use async) | |
| # if hasattr(server, 'get_data'): | |
| # method = getattr(server, 'get_data') | |
| # # Check if it's async | |
| # if asyncio.iscoroutinefunction(method): | |
| # # Run async method in new event loop | |
| # try: | |
| # loop = asyncio.new_event_loop() | |
| # asyncio.set_event_loop(loop) | |
| # result = loop.run_until_complete( | |
| # method(location['latitude'], location['longitude']) | |
| # ) | |
| # loop.close() | |
| # return result | |
| # except Exception as e: | |
| # raise Exception(f"Async execution failed: {str(e)}") | |
| # else: | |
| # # Sync method | |
| # return method(location['latitude'], location['longitude']) | |
| # # Fallback to other method names | |
| # elif hasattr(server, 'query'): | |
| # return server.query(location) | |
| # elif hasattr(server, 'fetch_data'): | |
| # return server.fetch_data(location['latitude'], location['longitude']) | |
| # else: | |
| # raise AttributeError(f"Server {task['server_name']} has no compatible query method") | |
| # def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: | |
| # """ | |
| # Execute MCP server calls sequentially (fallback if parallel fails). | |
| # Args: | |
| # routing: Simple dict with server names as keys and True/False as values | |
| # location: Dict with 'latitude' and 'longitude' keys | |
| # Returns: | |
| # Dict mapping server names to their results | |
| # """ | |
| # results = {} | |
| # for server_name, should_query in routing.items(): | |
| # if should_query and server_name in self.servers: | |
| # try: | |
| # task = { | |
| # "server_name": server_name, | |
| # "server": self.servers[server_name], | |
| # "location": location | |
| # } | |
| # result = self._call_server_sync(task) | |
| # results[server_name] = { | |
| # "data": result, | |
| # "status": "success" | |
| # } | |
| # print(f"β {server_name.upper()}: Retrieved successfully") | |
| # except Exception as e: | |
| # results[server_name] = { | |
| # "data": None, | |
| # "status": "error", | |
| # "error": str(e) | |
| # } | |
| # print(f"β {server_name.upper()}: Error - {str(e)}") | |
| # return results | |
| # return results | |
| """ | |
| MCP Executor - Stage 2 | |
| Executes parallel calls to MCP servers based on routing decisions | |
| FIXED: Simpler async handling to prevent deadlocks | |
| """ | |
| from typing import Dict, Any | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import asyncio | |
| import inspect | |
| class MCPExecutor: | |
| """ | |
| Executes MCP server calls based on routing decisions. | |
| Integrates with existing server implementations in src/servers/ | |
| Handles both sync and async server methods safely. | |
| """ | |
| def __init__(self, servers: Dict[str, Any]): | |
| """ | |
| Initialize executor with MCP server instances. | |
| Args: | |
| servers: Dict mapping server names to initialized server objects | |
| e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...} | |
| """ | |
| self.servers = servers | |
| def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: | |
| """ | |
| Execute MCP server calls in parallel based on routing. | |
| Args: | |
| routing: Simple dict with server names as keys and True/False as values | |
| location: Dict with 'latitude' and 'longitude' keys | |
| Returns: | |
| Dict mapping server names to their results with metadata | |
| """ | |
| results = {} | |
| # For async servers, we need to run them differently | |
| # Separate sync and async servers | |
| sync_tasks = [] | |
| async_tasks = [] | |
| for server_name, should_query in routing.items(): | |
| if should_query and server_name in self.servers: | |
| server = self.servers[server_name] | |
| task = { | |
| "server_name": server_name, | |
| "server": server, | |
| "location": location | |
| } | |
| # Check if server method is async | |
| if hasattr(server, 'get_data'): | |
| method = getattr(server, 'get_data') | |
| if inspect.iscoroutinefunction(method): | |
| async_tasks.append(task) | |
| else: | |
| sync_tasks.append(task) | |
| else: | |
| sync_tasks.append(task) | |
| # Execute sync servers in parallel with ThreadPoolExecutor | |
| if sync_tasks: | |
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| futures = { | |
| executor.submit(self._call_sync_server, task): task | |
| for task in sync_tasks | |
| } | |
| for future in as_completed(futures): | |
| task = futures[future] | |
| server_name = task["server_name"] | |
| try: | |
| result = future.result(timeout=30) | |
| results[server_name] = { | |
| "data": result, | |
| "status": "success" | |
| } | |
| print(f"β {server_name.upper()}: Retrieved successfully") | |
| except Exception as e: | |
| results[server_name] = { | |
| "data": None, | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| print(f"β {server_name.upper()}: Error - {str(e)}") | |
| # Execute async servers together in single event loop | |
| if async_tasks: | |
| try: | |
| async_results = asyncio.run(self._execute_async_batch(async_tasks)) | |
| results.update(async_results) | |
| except Exception as e: | |
| # If batch fails, mark all as failed | |
| for task in async_tasks: | |
| results[task["server_name"]] = { | |
| "data": None, | |
| "status": "error", | |
| "error": f"Async batch execution failed: {str(e)}" | |
| } | |
| print(f"β {task['server_name'].upper()}: Async batch error") | |
| return results | |
| async def _execute_async_batch(self, tasks: list) -> Dict[str, Any]: | |
| """ | |
| Execute multiple async server calls concurrently in a single event loop. | |
| This is safer than creating multiple event loops. | |
| """ | |
| results = {} | |
| # Create async tasks for all servers | |
| async_calls = [] | |
| for task in tasks: | |
| async_calls.append(self._call_async_server(task)) | |
| # Execute all async calls concurrently | |
| task_results = await asyncio.gather(*async_calls, return_exceptions=True) | |
| # Process results | |
| for task, result in zip(tasks, task_results): | |
| server_name = task["server_name"] | |
| if isinstance(result, Exception): | |
| results[server_name] = { | |
| "data": None, | |
| "status": "error", | |
| "error": str(result) | |
| } | |
| print(f"β {server_name.upper()}: Error - {str(result)}") | |
| else: | |
| results[server_name] = { | |
| "data": result, | |
| "status": "success" | |
| } | |
| print(f"β {server_name.upper()}: Retrieved successfully") | |
| return results | |
| async def _call_async_server(self, task: Dict[str, Any]) -> Any: | |
| """Call individual async MCP server""" | |
| server = task["server"] | |
| location = task["location"] | |
| if hasattr(server, 'get_data'): | |
| return await server.get_data(location['latitude'], location['longitude']) | |
| else: | |
| raise AttributeError(f"Server {task['server_name']} has no get_data method") | |
| def _call_sync_server(self, task: Dict[str, Any]) -> Any: | |
| """Call individual sync MCP server""" | |
| server = task["server"] | |
| location = task["location"] | |
| if hasattr(server, 'get_data'): | |
| return server.get_data(location['latitude'], location['longitude']) | |
| elif hasattr(server, 'query'): | |
| return server.query(location) | |
| elif hasattr(server, 'fetch_data'): | |
| return server.fetch_data(location['latitude'], location['longitude']) | |
| else: | |
| raise AttributeError(f"Server {task['server_name']} has no compatible query method") | |
| def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]: | |
| """ | |
| Execute MCP server calls sequentially (fallback if parallel fails). | |
| """ | |
| results = {} | |
| for server_name, should_query in routing.items(): | |
| if should_query and server_name in self.servers: | |
| try: | |
| server = self.servers[server_name] | |
| # Check if async | |
| if hasattr(server, 'get_data') and inspect.iscoroutinefunction(server.get_data): | |
| # Run async method | |
| result = asyncio.run(server.get_data(location['latitude'], location['longitude'])) | |
| else: | |
| # Run sync method | |
| task = { | |
| "server_name": server_name, | |
| "server": server, | |
| "location": location | |
| } | |
| result = self._call_sync_server(task) | |
| results[server_name] = { | |
| "data": result, | |
| "status": "success" | |
| } | |
| print(f"β {server_name.upper()}: Retrieved successfully") | |
| except Exception as e: | |
| results[server_name] = { | |
| "data": None, | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| print(f"β {server_name.upper()}: Error - {str(e)}") | |
| return results | |
| return results |