File size: 3,905 Bytes
b20698b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
Stage 2: MCP Executor - Parallel API Execution
"""

import asyncio
import time
from typing import List, Dict, Any

from .servers.weather import WeatherServer
from .servers.soil import SoilPropertiesServer
from .servers.water import WaterServer
from .servers.elevation import ElevationServer
from .servers.pests import PestsServer


# MCP Server Registry
MCP_SERVER_REGISTRY = {
    "weather": {
        "name": "Weather Server (Open-Meteo)",
        "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity",
        "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"],
        "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"]
    },
    "soil_properties": {
        "name": "Soil Properties Server (SoilGrids)",
        "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database",
        "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"],
        "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"]
    },
    "water": {
        "name": "Groundwater Server (GRACE)",
        "description": "Groundwater levels and drought indicators from NASA GRACE satellite data",
        "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"],
        "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"]
    },
    "elevation": {
        "name": "Elevation Server (OpenElevation)",
        "description": "Field elevation and terrain data for irrigation planning",
        "capabilities": ["elevation", "terrain_analysis"],
        "use_for": ["elevation", "slope", "terrain", "drainage"]
    },
    "pests": {
        "name": "Pest Observation Server (iNaturalist)",
        "description": "Recent pest and insect observations from community reporting",
        "capabilities": ["pest_observations", "disease_reports", "pest_distribution"],
        "use_for": ["pests", "insects", "disease", "outbreak"]
    }
}


class MCPExecutor:
    """Stage 2: Execute API calls in parallel"""

    def __init__(self):
        self.servers = {
            "weather": WeatherServer(),
            "soil_properties": SoilPropertiesServer(),
            "water": WaterServer(),
            "elevation": ElevationServer(),
            "pests": PestsServer()
        }

    async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]:
        """
        Call multiple servers simultaneously
        
        Returns:
            {
                "results": {
                    "weather": {"status": "success", "data": {...}},
                    ...
                },
                "execution_time_seconds": float
            }
        """
        start_time = time.time()

        tasks = []
        valid_servers = []

        for name in server_names:
            if name in self.servers:
                tasks.append(self.servers[name].get_data(lat, lon))
                valid_servers.append(name)
            else:
                print(f"⚠️ Unknown server: {name}")

        # Execute all in parallel
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Format results
        formatted_results = {}
        for i, server_name in enumerate(valid_servers):
            result = results[i]
            if isinstance(result, Exception):
                formatted_results[server_name] = {
                    "status": "error",
                    "error": str(result)
                }
            else:
                formatted_results[server_name] = result

        elapsed_time = time.time() - start_time

        return {
            "results": formatted_results,
            "execution_time_seconds": round(elapsed_time, 2)
        }