""" Stage 2: MCP Executor - Parallel API Execution """ import asyncio import time from typing import List, Dict, Any from .servers.weather import WeatherServer from .servers.soil import SoilPropertiesServer from .servers.water import WaterServer from .servers.elevation import ElevationServer from .servers.pests import PestsServer # MCP Server Registry MCP_SERVER_REGISTRY = { "weather": { "name": "Weather Server (Open-Meteo)", "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity", "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"], "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"] }, "soil_properties": { "name": "Soil Properties Server (SoilGrids)", "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database", "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"], "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"] }, "water": { "name": "Groundwater Server (GRACE)", "description": "Groundwater levels and drought indicators from NASA GRACE satellite data", "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"], "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"] }, "elevation": { "name": "Elevation Server (OpenElevation)", "description": "Field elevation and terrain data for irrigation planning", "capabilities": ["elevation", "terrain_analysis"], "use_for": ["elevation", "slope", "terrain", "drainage"] }, "pests": { "name": "Pest Observation Server (iNaturalist)", "description": "Recent pest and insect observations from community reporting", "capabilities": ["pest_observations", "disease_reports", "pest_distribution"], "use_for": ["pests", "insects", "disease", "outbreak"] } } class MCPExecutor: """Stage 2: Execute API calls in parallel""" def __init__(self): self.servers = { "weather": WeatherServer(), "soil_properties": SoilPropertiesServer(), "water": WaterServer(), "elevation": ElevationServer(), "pests": PestsServer() } async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]: """ Call multiple servers simultaneously Returns: { "results": { "weather": {"status": "success", "data": {...}}, ... }, "execution_time_seconds": float } """ start_time = time.time() tasks = [] valid_servers = [] for name in server_names: if name in self.servers: tasks.append(self.servers[name].get_data(lat, lon)) valid_servers.append(name) else: print(f"⚠️ Unknown server: {name}") # Execute all in parallel results = await asyncio.gather(*tasks, return_exceptions=True) # Format results formatted_results = {} for i, server_name in enumerate(valid_servers): result = results[i] if isinstance(result, Exception): formatted_results[server_name] = { "status": "error", "error": str(result) } else: formatted_results[server_name] = result elapsed_time = time.time() - start_time return { "results": formatted_results, "execution_time_seconds": round(elapsed_time, 2) }