mcp-alert-generator / src /executor.py
akashub
fix: re-adding local code files
6afc01a
raw
history blame
18.5 kB
# """
# Stage 2: MCP Executor - Parallel API Execution
# """
# import asyncio
# import time
# from typing import List, Dict, Any
# from .servers.weather import WeatherServer
# from .servers.soil import SoilPropertiesServer
# from .servers.water import WaterServer
# from .servers.elevation import ElevationServer
# from .servers.pests import PestsServer
# # MCP Server Registry
# MCP_SERVER_REGISTRY = {
# "weather": {
# "name": "Weather Server (Open-Meteo)",
# "description": "Current weather and 7-day forecasts: temperature, precipitation, wind, humidity",
# "capabilities": ["current_weather", "weather_forecast", "rainfall_prediction", "temperature_trends"],
# "use_for": ["rain", "temperature", "weather", "forecast", "frost", "wind"]
# },
# "soil_properties": {
# "name": "Soil Properties Server (SoilGrids)",
# "description": "Soil composition: clay, sand, silt, pH, organic matter from global soil database",
# "capabilities": ["soil_texture", "soil_ph", "clay_content", "sand_content", "nutrients"],
# "use_for": ["soil", "pH", "texture", "clay", "sand", "composition", "fertility", "nutrients"]
# },
# "water": {
# "name": "Groundwater Server (GRACE)",
# "description": "Groundwater levels and drought indicators from NASA GRACE satellite data",
# "capabilities": ["groundwater_levels", "drought_status", "water_storage", "soil_moisture"],
# "use_for": ["groundwater", "drought", "water", "irrigation", "water stress", "moisture"]
# },
# "elevation": {
# "name": "Elevation Server (OpenElevation)",
# "description": "Field elevation and terrain data for irrigation planning",
# "capabilities": ["elevation", "terrain_analysis"],
# "use_for": ["elevation", "slope", "terrain", "drainage"]
# },
# "pests": {
# "name": "Pest Observation Server (iNaturalist)",
# "description": "Recent pest and insect observations from community reporting",
# "capabilities": ["pest_observations", "disease_reports", "pest_distribution"],
# "use_for": ["pests", "insects", "disease", "outbreak"]
# }
# }
# class MCPExecutor:
# """Stage 2: Execute API calls in parallel"""
# def __init__(self):
# self.servers = {
# "weather": WeatherServer(),
# "soil_properties": SoilPropertiesServer(),
# "water": WaterServer(),
# "elevation": ElevationServer(),
# "pests": PestsServer()
# }
# async def execute_parallel(self, server_names: List[str], lat: float, lon: float) -> Dict[str, Any]:
# """
# Call multiple servers simultaneously
# Returns:
# {
# "results": {
# "weather": {"status": "success", "data": {...}},
# ...
# },
# "execution_time_seconds": float
# }
# """
# start_time = time.time()
# tasks = []
# valid_servers = []
# for name in server_names:
# if name in self.servers:
# tasks.append(self.servers[name].get_data(lat, lon))
# valid_servers.append(name)
# else:
# print(f"⚠️ Unknown server: {name}")
# # Execute all in parallel
# results = await asyncio.gather(*tasks, return_exceptions=True)
# # Format results
# formatted_results = {}
# for i, server_name in enumerate(valid_servers):
# result = results[i]
# if isinstance(result, Exception):
# formatted_results[server_name] = {
# "status": "error",
# "error": str(result)
# }
# else:
# formatted_results[server_name] = result
# elapsed_time = time.time() - start_time
# return {
# "results": formatted_results,
# "execution_time_seconds": round(elapsed_time, 2)
# }
# """
# MCP Executor - Stage 2
# Executes parallel calls to MCP servers based on routing decisions
# """
# from typing import Dict, Any
# from concurrent.futures import ThreadPoolExecutor, as_completed
# import asyncio
# class MCPExecutor:
# """
# Executes MCP server calls based on routing decisions.
# Integrates with existing server implementations in src/servers/
# Handles both sync and async server methods.
# """
# def __init__(self, servers: Dict[str, Any]):
# """
# Initialize executor with MCP server instances.
# Args:
# servers: Dict mapping server names to initialized server objects
# e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...}
# """
# self.servers = servers
# def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]:
# """
# Execute MCP server calls in parallel based on routing.
# Args:
# routing: Simple dict with server names as keys and True/False as values
# location: Dict with 'latitude' and 'longitude' keys
# Returns:
# Dict mapping server names to their results with metadata
# """
# results = {}
# tasks = []
# # Prepare tasks for servers marked for querying
# for server_name, should_query in routing.items():
# if should_query and server_name in self.servers:
# tasks.append({
# "server_name": server_name,
# "server": self.servers[server_name],
# "location": location
# })
# # Execute in parallel using ThreadPoolExecutor
# with ThreadPoolExecutor(max_workers=5) as executor:
# futures = {
# executor.submit(self._call_server_sync, task): task
# for task in tasks
# }
# for future in as_completed(futures):
# task = futures[future]
# server_name = task["server_name"]
# try:
# result = future.result(timeout=30)
# results[server_name] = {
# "data": result,
# "status": "success"
# }
# print(f"βœ“ {server_name.upper()}: Retrieved successfully")
# except Exception as e:
# results[server_name] = {
# "data": None,
# "status": "error",
# "error": str(e)
# }
# print(f"βœ— {server_name.upper()}: Error - {str(e)}")
# return results
# def _call_server_sync(self, task: Dict[str, Any]) -> Any:
# """
# Call individual MCP server, handling both sync and async methods.
# Args:
# task: Dict containing server, location, and metadata
# Returns:
# Server response data
# """
# server = task["server"]
# location = task["location"]
# # Try async method first (most of your servers use async)
# if hasattr(server, 'get_data'):
# method = getattr(server, 'get_data')
# # Check if it's async
# if asyncio.iscoroutinefunction(method):
# # Run async method in new event loop
# try:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# result = loop.run_until_complete(
# method(location['latitude'], location['longitude'])
# )
# loop.close()
# return result
# except Exception as e:
# raise Exception(f"Async execution failed: {str(e)}")
# else:
# # Sync method
# return method(location['latitude'], location['longitude'])
# # Fallback to other method names
# elif hasattr(server, 'query'):
# return server.query(location)
# elif hasattr(server, 'fetch_data'):
# return server.fetch_data(location['latitude'], location['longitude'])
# else:
# raise AttributeError(f"Server {task['server_name']} has no compatible query method")
# def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]:
# """
# Execute MCP server calls sequentially (fallback if parallel fails).
# Args:
# routing: Simple dict with server names as keys and True/False as values
# location: Dict with 'latitude' and 'longitude' keys
# Returns:
# Dict mapping server names to their results
# """
# results = {}
# for server_name, should_query in routing.items():
# if should_query and server_name in self.servers:
# try:
# task = {
# "server_name": server_name,
# "server": self.servers[server_name],
# "location": location
# }
# result = self._call_server_sync(task)
# results[server_name] = {
# "data": result,
# "status": "success"
# }
# print(f"βœ“ {server_name.upper()}: Retrieved successfully")
# except Exception as e:
# results[server_name] = {
# "data": None,
# "status": "error",
# "error": str(e)
# }
# print(f"βœ— {server_name.upper()}: Error - {str(e)}")
# return results
# return results
"""
MCP Executor - Stage 2
Executes parallel calls to MCP servers based on routing decisions
FIXED: Simpler async handling to prevent deadlocks
"""
from typing import Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
import inspect
class MCPExecutor:
"""
Executes MCP server calls based on routing decisions.
Integrates with existing server implementations in src/servers/
Handles both sync and async server methods safely.
"""
def __init__(self, servers: Dict[str, Any]):
"""
Initialize executor with MCP server instances.
Args:
servers: Dict mapping server names to initialized server objects
e.g., {"weather": WeatherServer(), "soil": SoilPropertiesServer(), ...}
"""
self.servers = servers
def execute_parallel(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]:
"""
Execute MCP server calls in parallel based on routing.
Args:
routing: Simple dict with server names as keys and True/False as values
location: Dict with 'latitude' and 'longitude' keys
Returns:
Dict mapping server names to their results with metadata
"""
results = {}
# For async servers, we need to run them differently
# Separate sync and async servers
sync_tasks = []
async_tasks = []
for server_name, should_query in routing.items():
if should_query and server_name in self.servers:
server = self.servers[server_name]
task = {
"server_name": server_name,
"server": server,
"location": location
}
# Check if server method is async
if hasattr(server, 'get_data'):
method = getattr(server, 'get_data')
if inspect.iscoroutinefunction(method):
async_tasks.append(task)
else:
sync_tasks.append(task)
else:
sync_tasks.append(task)
# Execute sync servers in parallel with ThreadPoolExecutor
if sync_tasks:
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(self._call_sync_server, task): task
for task in sync_tasks
}
for future in as_completed(futures):
task = futures[future]
server_name = task["server_name"]
try:
result = future.result(timeout=30)
results[server_name] = {
"data": result,
"status": "success"
}
print(f"βœ“ {server_name.upper()}: Retrieved successfully")
except Exception as e:
results[server_name] = {
"data": None,
"status": "error",
"error": str(e)
}
print(f"βœ— {server_name.upper()}: Error - {str(e)}")
# Execute async servers together in single event loop
if async_tasks:
try:
async_results = asyncio.run(self._execute_async_batch(async_tasks))
results.update(async_results)
except Exception as e:
# If batch fails, mark all as failed
for task in async_tasks:
results[task["server_name"]] = {
"data": None,
"status": "error",
"error": f"Async batch execution failed: {str(e)}"
}
print(f"βœ— {task['server_name'].upper()}: Async batch error")
return results
async def _execute_async_batch(self, tasks: list) -> Dict[str, Any]:
"""
Execute multiple async server calls concurrently in a single event loop.
This is safer than creating multiple event loops.
"""
results = {}
# Create async tasks for all servers
async_calls = []
for task in tasks:
async_calls.append(self._call_async_server(task))
# Execute all async calls concurrently
task_results = await asyncio.gather(*async_calls, return_exceptions=True)
# Process results
for task, result in zip(tasks, task_results):
server_name = task["server_name"]
if isinstance(result, Exception):
results[server_name] = {
"data": None,
"status": "error",
"error": str(result)
}
print(f"βœ— {server_name.upper()}: Error - {str(result)}")
else:
results[server_name] = {
"data": result,
"status": "success"
}
print(f"βœ“ {server_name.upper()}: Retrieved successfully")
return results
async def _call_async_server(self, task: Dict[str, Any]) -> Any:
"""Call individual async MCP server"""
server = task["server"]
location = task["location"]
if hasattr(server, 'get_data'):
return await server.get_data(location['latitude'], location['longitude'])
else:
raise AttributeError(f"Server {task['server_name']} has no get_data method")
def _call_sync_server(self, task: Dict[str, Any]) -> Any:
"""Call individual sync MCP server"""
server = task["server"]
location = task["location"]
if hasattr(server, 'get_data'):
return server.get_data(location['latitude'], location['longitude'])
elif hasattr(server, 'query'):
return server.query(location)
elif hasattr(server, 'fetch_data'):
return server.fetch_data(location['latitude'], location['longitude'])
else:
raise AttributeError(f"Server {task['server_name']} has no compatible query method")
def execute_sequential(self, routing: Dict[str, bool], location: Dict[str, float]) -> Dict[str, Any]:
"""
Execute MCP server calls sequentially (fallback if parallel fails).
"""
results = {}
for server_name, should_query in routing.items():
if should_query and server_name in self.servers:
try:
server = self.servers[server_name]
# Check if async
if hasattr(server, 'get_data') and inspect.iscoroutinefunction(server.get_data):
# Run async method
result = asyncio.run(server.get_data(location['latitude'], location['longitude']))
else:
# Run sync method
task = {
"server_name": server_name,
"server": server,
"location": location
}
result = self._call_sync_server(task)
results[server_name] = {
"data": result,
"status": "success"
}
print(f"βœ“ {server_name.upper()}: Retrieved successfully")
except Exception as e:
results[server_name] = {
"data": None,
"status": "error",
"error": str(e)
}
print(f"βœ— {server_name.upper()}: Error - {str(e)}")
return results
return results