|
|
""" |
|
|
Stage 2: MCP Executor - Parallel API Execution |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
from .servers.weather import WeatherServer |
|
|
from .servers.soil import SoilPropertiesServer |
|
|
from .servers.water import WaterServer |
|
|
from .servers.elevation import ElevationServer |
|
|
from .servers.pests import PestsServer |
|
|
|
|
|
|
|
|
|
|
|
MCP_SERVER_REGISTRY = { |
|
|
"weather": { |
|
|
"name": "Weather Server (Open-Meteo)", |
|
|
"description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity", |
|
|
"capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"], |
|
|
"use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"] |
|
|
}, |
|
|
"soil_properties": { |
|
|
"name": "Soil Properties Server (SoilGrids)", |
|
|
"description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database", |
|
|
"capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"], |
|
|
"use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"] |
|
|
}, |
|
|
"water": { |
|
|
"name": "Groundwater Server (GRACE)", |
|
|
"description": "Groundwater levels and drought indicators from NASA GRACE satellite data", |
|
|
"capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"], |
|
|
"use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"] |
|
|
}, |
|
|
"elevation": { |
|
|
"name": "Elevation Server (OpenElevation)", |
|
|
"description": "Field elevation and terrain data for irrigation planning", |
|
|
"capabilities": ["elevation", "terrain_analysis"], |
|
|
"use_for": ["elevation", "slope", "terrain", "drainage"] |
|
|
}, |
|
|
"pests": { |
|
|
"name": "Pest Observation Server (iNaturalist)", |
|
|
"description": "Recent pest and insect observations from community reporting", |
|
|
"capabilities": ["pest_observations", "disease_reports", "pest_distribution"], |
|
|
"use_for": ["pests", "insects", "disease", "outbreak"] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class MCPExecutor: |
|
|
"""Stage 2: Execute API calls in parallel""" |
|
|
|
|
|
def __init__(self): |
|
|
self.servers = { |
|
|
"weather": WeatherServer(), |
|
|
"soil_properties": SoilPropertiesServer(), |
|
|
"water": WaterServer(), |
|
|
"elevation": ElevationServer(), |
|
|
"pests": PestsServer() |
|
|
} |
|
|
|
|
|
async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]: |
|
|
""" |
|
|
Call multiple servers simultaneously |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"results": { |
|
|
"weather": {"status": "success", "data": {...}}, |
|
|
... |
|
|
}, |
|
|
"execution_time_seconds": float |
|
|
} |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
tasks = [] |
|
|
valid_servers = [] |
|
|
|
|
|
for name in server_names: |
|
|
if name in self.servers: |
|
|
tasks.append(self.servers[name].get_data(lat, lon)) |
|
|
valid_servers.append(name) |
|
|
else: |
|
|
print(f"⚠️ Unknown server: {name}") |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
formatted_results = {} |
|
|
for i, server_name in enumerate(valid_servers): |
|
|
result = results[i] |
|
|
if isinstance(result, Exception): |
|
|
formatted_results[server_name] = { |
|
|
"status": "error", |
|
|
"error": str(result) |
|
|
} |
|
|
else: |
|
|
formatted_results[server_name] = result |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
|
|
|
|
return { |
|
|
"results": formatted_results, |
|
|
"execution_time_seconds": round(elapsed_time, 2) |
|
|
} |