aakashdg commited on
Commit
9bdf3bb
·
verified ·
1 Parent(s): 07ff834
Files changed (1) hide show
  1. src/executor.py +0 -270
src/executor.py CHANGED
@@ -1,273 +1,3 @@
1
- # """
2
- # Stage 2: MCP Executor - Parallel API Execution
3
- # """
4
-
5
- # import asyncio
6
- # import time
7
- # from typing import List, Dict, Any
8
-
9
- # from .servers.weather import WeatherServer
10
- # from .servers.soil import SoilPropertiesServer
11
- # from .servers.water import WaterServer
12
- # from .servers.elevation import ElevationServer
13
- # from .servers.pests import PestsServer
14
-
15
-
16
- # # MCP Server Registry
17
- # MCP_SERVER_REGISTRY = {
18
- # "weather": {
19
- # "name": "Weather Server (Open-Meteo)",
20
- # "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity",
21
- # "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"],
22
- # "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"]
23
- # },
24
- # "soil_properties": {
25
- # "name": "Soil Properties Server (SoilGrids)",
26
- # "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database",
27
- # "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"],
28
- # "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"]
29
- # },
30
- # "water": {
31
- # "name": "Groundwater Server (GRACE)",
32
- # "description": "Groundwater levels and drought indicators from NASA GRACE satellite data",
33
- # "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"],
34
- # "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"]
35
- # },
36
- # "elevation": {
37
- # "name": "Elevation Server (OpenElevation)",
38
- # "description": "Field elevation and terrain data for irrigation planning",
39
- # "capabilities": ["elevation", "terrain_analysis"],
40
- # "use_for": ["elevation", "slope", "terrain", "drainage"]
41
- # },
42
- # "pests": {
43
- # "name": "Pest Observation Server (iNaturalist)",
44
- # "description": "Recent pest and insect observations from community reporting",
45
- # "capabilities": ["pest_observations", "disease_reports", "pest_distribution"],
46
- # "use_for": ["pests", "insects", "disease", "outbreak"]
47
- # }
48
- # }
49
-
50
-
51
- # class MCPExecutor:
52
- # """Stage 2: Execute API calls in parallel"""
53
-
54
- # def __init__(self):
55
- # self.servers = {
56
- # "weather": WeatherServer(),
57
- # "soil_properties": SoilPropertiesServer(),
58
- # "water": WaterServer(),
59
- # "elevation": ElevationServer(),
60
- # "pests": PestsServer()
61
- # }
62
-
63
- # async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]:
64
- # """
65
- # Call multiple servers simultaneously
66
-
67
- # Returns:
68
- # {
69
- # "results": {
70
- # "weather": {"status": "success", "data": {...}},
71
- # ...
72
- # },
73
- # "execution_time_seconds": float
74
- # }
75
- # """
76
- # start_time = time.time()
77
-
78
- # tasks = []
79
- # valid_servers = []
80
-
81
- # for name in server_names:
82
- # if name in self.servers:
83
- # tasks.append(self.servers[name].get_data(lat, lon))
84
- # valid_servers.append(name)
85
- # else:
86
- # print(f"⚠️ Unknown server: {name}")
87
-
88
- # # Execute all in parallel
89
- # results = await asyncio.gather(*tasks, return_exceptions=True)
90
-
91
- # # Format results
92
- # formatted_results = {}
93
- # for i, server_name in enumerate(valid_servers):
94
- # result = results[i]
95
- # if isinstance(result, Exception):
96
- # formatted_results[server_name] = {
97
- # "status": "error",
98
- # "error": str(result)
99
- # }
100
- # else:
101
- # formatted_results[server_name] = result
102
-
103
- # elapsed_time = time.time() - start_time
104
-
105
- # return {
106
- # "results": formatted_results,
107
- # "execution_time_seconds": round(elapsed_time, 2)
108
- # }
109
-
110
- # """
111
- # MCP Executor - Stage 2
112
- # Executes parallel calls to MCP servers based on routing decisions
113
- # """
114
-
115
- # from typing import Dict, Any
116
- # from concurrent.futures import ThreadPoolExecutor, as_completed
117
- # import asyncio
118
-
119
-
120
- # class MCPExecutor:
121
- # """
122
- # Executes MCP server calls based on routing decisions.
123
- # Integrates with existing server implementations in src/servers/
124
- # Handles both sync and async server methods.
125
- # """
126
-
127
- # def __init__(self, servers: Dict[str, Any]):
128
- # """
129
- # Initialize executor with MCP server instances.
130
-
131
- # Args:
132
- # servers: Dict mapping server names to initialized server objects
133
- # e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...}
134
- # """
135
- # self.servers = servers
136
-
137
- # def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]:
138
- # """
139
- # Execute MCP server calls in parallel based on routing.
140
-
141
- # Args:
142
- # routing: Simple dict with server names as keys and True/False as values
143
- # location: Dict with 'latitude' and 'longitude' keys
144
-
145
- # Returns:
146
- # Dict mapping server names to their results with metadata
147
- # """
148
- # results = {}
149
- # tasks = []
150
-
151
- # # Prepare tasks for servers marked for querying
152
- # for server_name, should_query in routing.items():
153
- # if should_query and server_name in self.servers:
154
- # tasks.append({
155
- # "server_name": server_name,
156
- # "server": self.servers[server_name],
157
- # "location": location
158
- # })
159
-
160
- # # Execute in parallel using ThreadPoolExecutor
161
- # with ThreadPoolExecutor(max_workers=5) as executor:
162
- # futures = {
163
- # executor.submit(self._call_server_sync, task): task
164
- # for task in tasks
165
- # }
166
-
167
- # for future in as_completed(futures):
168
- # task = futures[future]
169
- # server_name = task["server_name"]
170
-
171
- # try:
172
- # result = future.result(timeout=30)
173
- # results[server_name] = {
174
- # "data": result,
175
- # "status": "success"
176
- # }
177
- # print(f"✓ {server_name.upper()}: Retrieved successfully")
178
-
179
- # except Exception as e:
180
- # results[server_name] = {
181
- # "data": None,
182
- # "status": "error",
183
- # "error": str(e)
184
- # }
185
- # print(f"✗ {server_name.upper()}: Error - {str(e)}")
186
-
187
- # return results
188
-
189
- # def _call_server_sync(self, task: Dict[str, Any]) -> Any:
190
- # """
191
- # Call individual MCP server, handling both sync and async methods.
192
-
193
- # Args:
194
- # task: Dict containing server, location, and metadata
195
-
196
- # Returns:
197
- # Server response data
198
- # """
199
- # server = task["server"]
200
- # location = task["location"]
201
-
202
- # # Try async method first (most of your servers use async)
203
- # if hasattr(server, 'get_data'):
204
- # method = getattr(server, 'get_data')
205
-
206
- # # Check if it's async
207
- # if asyncio.iscoroutinefunction(method):
208
- # # Run async method in new event loop
209
- # try:
210
- # loop = asyncio.new_event_loop()
211
- # asyncio.set_event_loop(loop)
212
- # result = loop.run_until_complete(
213
- # method(location['latitude'], location['longitude'])
214
- # )
215
- # loop.close()
216
- # return result
217
- # except Exception as e:
218
- # raise Exception(f"Async execution failed: {str(e)}")
219
- # else:
220
- # # Sync method
221
- # return method(location['latitude'], location['longitude'])
222
-
223
- # # Fallback to other method names
224
- # elif hasattr(server, 'query'):
225
- # return server.query(location)
226
- # elif hasattr(server, 'fetch_data'):
227
- # return server.fetch_data(location['latitude'], location['longitude'])
228
- # else:
229
- # raise AttributeError(f"Server {task['server_name']} has no compatible query method")
230
-
231
- # def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]:
232
- # """
233
- # Execute MCP server calls sequentially (fallback if parallel fails).
234
-
235
- # Args:
236
- # routing: Simple dict with server names as keys and True/False as values
237
- # location: Dict with 'latitude' and 'longitude' keys
238
-
239
- # Returns:
240
- # Dict mapping server names to their results
241
- # """
242
- # results = {}
243
-
244
- # for server_name, should_query in routing.items():
245
- # if should_query and server_name in self.servers:
246
- # try:
247
- # task = {
248
- # "server_name": server_name,
249
- # "server": self.servers[server_name],
250
- # "location": location
251
- # }
252
-
253
- # result = self._call_server_sync(task)
254
- # results[server_name] = {
255
- # "data": result,
256
- # "status": "success"
257
- # }
258
- # print(f"✓ {server_name.upper()}: Retrieved successfully")
259
-
260
- # except Exception as e:
261
- # results[server_name] = {
262
- # "data": None,
263
- # "status": "error",
264
- # "error": str(e)
265
- # }
266
- # print(f"✗ {server_name.upper()}: Error - {str(e)}")
267
-
268
- # return results
269
-
270
- # return results
271
  """
272
  MCP Executor - Stage 2
273
  Executes parallel calls to MCP servers based on routing decisions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
  MCP Executor - Stage 2
3
  Executes parallel calls to MCP servers based on routing decisions