mcp-alert-generator / src /pipeline.py
aakashdg's picture
cleanup
6fbe737 verified
"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router → Executor → Compiler stages for alert generation
FIXED: Proper async handling for FastAPI integration
"""
from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler
class FarmerChatPipeline:
"""
Main pipeline orchestrating the complete alert generation workflow.
Architecture:
Stage 1: QueryRouter - Determines which MCP servers to query
Stage 2: MCPExecutor - Executes parallel calls to MCP servers
Stage 3: ResponseCompiler - Compiles results into actionable alert summary
"""
def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
"""
Initialize pipeline with MCP servers and default location.
Args:
servers: Dict mapping server names to initialized server instances
location: Default location dict with 'latitude' and 'longitude' keys
"""
self.servers = servers
self.location = location
# Initialize pipeline stages
self.router = QueryRouter()
self.executor = MCPExecutor(servers)
self.compiler = ResponseCompiler()
print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
async def generate_alert(
self,
location: Optional[Dict[str, float]] = None,
location_name: str = ""
) -> Dict[str, Any]:
"""
Generate comprehensive alert summary for a location.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Generating Alert Summary for {location_name or 'Location'}")
print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
print(f"{'='*60}\n")
# Stage 1: Route - Always query all servers for alerts
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_alert_query(query_location)
print(f"✓ Routing complete: All 5 servers will be queried")
# Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
print("\nStage 2: Executing parallel MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
# Stage 3: Compile - Extract ONLY alerting information
print("\nStage 3: Compiling alert summary...")
alert_summary = self.compiler.compile_alert_summary(
mcp_results,
query_location,
location_name
)
print("✓ Alert summary generated")
print(f"\n{'='*60}\n")
return {
"alert_summary": alert_summary,
"location": query_location,
"location_name": location_name,
"mcp_results": mcp_results
}
async def process_query(
self,
query: str,
location: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Process a specific farmer query through the pipeline.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Processing Query: {query}")
print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
print(f"{'='*60}\n")
# Stage 1: Route
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_query(query, query_location)
print(f"✓ Routing complete")
# Stage 2: Execute MCP calls (ASYNC)
print("\nStage 2: Executing MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded")
# Stage 3: Compile response
print("\nStage 3: Compiling response...")
response = self.compiler.compile_response(query, mcp_results, query_location)
print("✓ Response compiled")
print(f"\n{'='*60}\n")
return {
"response": response,
"location": query_location,
"mcp_results": mcp_results
}
def update_location(self, location: Dict[str, float]):
"""Update default location for pipeline."""
self.location = location
print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
def get_server_status(self) -> Dict[str, str]:
"""Get status of all MCP servers."""
status = {}
for server_name, server in self.servers.items():
try:
if hasattr(server, 'health_check'):
status[server_name] = "available" if server.health_check() else "unavailable"
else:
status[server_name] = "available"
except:
status[server_name] = "unavailable"
return status