Spaces:
Sleeping
Sleeping
File size: 3,905 Bytes
b20698b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
"""
Stage 2: MCP Executor - Parallel API Execution
"""
import asyncio
import time
from typing import List, Dict, Any
from .servers.weather import WeatherServer
from .servers.soil import SoilPropertiesServer
from .servers.water import WaterServer
from .servers.elevation import ElevationServer
from .servers.pests import PestsServer
# MCP Server Registry
MCP_SERVER_REGISTRY = {
"weather": {
"name": "Weather Server (Open-Meteo)",
"description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity",
"capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"],
"use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"]
},
"soil_properties": {
"name": "Soil Properties Server (SoilGrids)",
"description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database",
"capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"],
"use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"]
},
"water": {
"name": "Groundwater Server (GRACE)",
"description": "Groundwater levels and drought indicators from NASA GRACE satellite data",
"capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"],
"use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"]
},
"elevation": {
"name": "Elevation Server (OpenElevation)",
"description": "Field elevation and terrain data for irrigation planning",
"capabilities": ["elevation", "terrain_analysis"],
"use_for": ["elevation", "slope", "terrain", "drainage"]
},
"pests": {
"name": "Pest Observation Server (iNaturalist)",
"description": "Recent pest and insect observations from community reporting",
"capabilities": ["pest_observations", "disease_reports", "pest_distribution"],
"use_for": ["pests", "insects", "disease", "outbreak"]
}
}
class MCPExecutor:
"""Stage 2: Execute API calls in parallel"""
def __init__(self):
self.servers = {
"weather": WeatherServer(),
"soil_properties": SoilPropertiesServer(),
"water": WaterServer(),
"elevation": ElevationServer(),
"pests": PestsServer()
}
async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]:
"""
Call multiple servers simultaneously
Returns:
{
"results": {
"weather": {"status": "success", "data": {...}},
...
},
"execution_time_seconds": float
}
"""
start_time = time.time()
tasks = []
valid_servers = []
for name in server_names:
if name in self.servers:
tasks.append(self.servers[name].get_data(lat, lon))
valid_servers.append(name)
else:
print(f"⚠️ Unknown server: {name}")
# Execute all in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
# Format results
formatted_results = {}
for i, server_name in enumerate(valid_servers):
result = results[i]
if isinstance(result, Exception):
formatted_results[server_name] = {
"status": "error",
"error": str(result)
}
else:
formatted_results[server_name] = result
elapsed_time = time.time() - start_time
return {
"results": formatted_results,
"execution_time_seconds": round(elapsed_time, 2)
} |