mcp-alert-generator / src /pipeline.py
aakashdg's picture
fix (double wrapping of server results)
424a0c8 verified
raw
history blame
8.55 kB
# """
# Complete Multi-Stage MCP Pipeline
# Orchestrates: Router → Executor → Compiler → Translator
# """
# import time
# from typing import Dict, Any
# from openai import OpenAI
# from .router import QueryRouter
# from .executor import MCPExecutor, MCP_SERVER_REGISTRY
# from .compiler import ResponseCompiler
# from .translator import FarmerTranslator
# class FarmerChatPipeline:
# """Complete multi-stage MCP pipeline"""
# def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
# self.location = location
# self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
# self.executor = MCPExecutor()
# self.compiler = ResponseCompiler()
# self.translator = FarmerTranslator(openai_client)
# async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
# """
# Process farmer query through complete pipeline
# Returns:
# {
# "query": str,
# "routing": dict,
# "compiled_data": dict,
# "advice": str,
# "pipeline_time_seconds": float
# }
# """
# if verbose:
# print(f"\n🌾 Processing: {query}")
# print(f"📍 Location: {self.location['name']}")
# pipeline_start = time.time()
# # STAGE 1: Query Routing
# if verbose:
# print("🎯 Stage 1: Routing...")
# routing = self.router.route(query, self.location)
# if verbose:
# print(f" → Servers: {', '.join(routing['required_servers'])}")
# # STAGE 2: MCP Execution (Parallel)
# if verbose:
# print("⚙️ Stage 2: Executing MCP servers...")
# raw_results = await self.executor.execute_parallel(
# routing['required_servers'],
# self.location['lat'],
# self.location['lon']
# )
# if verbose:
# print(f" → Completed in {raw_results['execution_time_seconds']}s")
# # STAGE 3: Response Compilation
# if verbose:
# print("🔗 Stage 3: Compiling results...")
# compiled = self.compiler.compile(raw_results)
# if verbose:
# print(f" → {compiled['completeness']}")
# # STAGE 4: Farmer Translation
# if verbose:
# print("🌾 Stage 4: Generating advice...")
# farmer_advice = self.translator.translate(query, compiled, self.location)
# pipeline_time = time.time() - pipeline_start
# if verbose:
# print(f"✅ Complete! Total: {pipeline_time:.2f}s\n")
# return {
# "query": query,
# "routing": routing,
# "compiled_data": compiled,
# "advice": farmer_advice,
# "pipeline_time_seconds": round(pipeline_time, 2)
# }
"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router → Executor → Compiler stages for alert generation
FIXED: Proper async handling for FastAPI integration
"""
from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler
class FarmerChatPipeline:
"""
Main pipeline orchestrating the complete alert generation workflow.
Architecture:
Stage 1: QueryRouter - Determines which MCP servers to query
Stage 2: MCPExecutor - Executes parallel calls to MCP servers
Stage 3: ResponseCompiler - Compiles results into actionable alert summary
"""
def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
"""
Initialize pipeline with MCP servers and default location.
Args:
servers: Dict mapping server names to initialized server instances
location: Default location dict with 'latitude' and 'longitude' keys
"""
self.servers = servers
self.location = location
# Initialize pipeline stages
self.router = QueryRouter()
self.executor = MCPExecutor(servers)
self.compiler = ResponseCompiler()
print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
async def generate_alert(
self,
location: Optional[Dict[str, float]] = None,
location_name: str = ""
) -> Dict[str, Any]:
"""
Generate comprehensive alert summary for a location.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Generating Alert Summary for {location_name or 'Location'}")
print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
print(f"{'='*60}\n")
# Stage 1: Route - Always query all servers for alerts
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_alert_query(query_location)
print(f"✓ Routing complete: All 5 servers will be queried")
# Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
print("\nStage 2: Executing parallel MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
# Stage 3: Compile - Extract ONLY alerting information
print("\nStage 3: Compiling alert summary...")
alert_summary = self.compiler.compile_alert_summary(
mcp_results,
query_location,
location_name
)
print("✓ Alert summary generated")
print(f"\n{'='*60}\n")
return {
"alert_summary": alert_summary,
"location": query_location,
"location_name": location_name,
"mcp_results": mcp_results
}
async def process_query(
self,
query: str,
location: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Process a specific farmer query through the pipeline.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Processing Query: {query}")
print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
print(f"{'='*60}\n")
# Stage 1: Route
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_query(query, query_location)
print(f"✓ Routing complete")
# Stage 2: Execute MCP calls (ASYNC)
print("\nStage 2: Executing MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded")
# Stage 3: Compile response
print("\nStage 3: Compiling response...")
response = self.compiler.compile_response(query, mcp_results, query_location)
print("✓ Response compiled")
print(f"\n{'='*60}\n")
return {
"response": response,
"location": query_location,
"mcp_results": mcp_results
}
def update_location(self, location: Dict[str, float]):
"""Update default location for pipeline."""
self.location = location
print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
def get_server_status(self) -> Dict[str, str]:
"""Get status of all MCP servers."""
status = {}
for server_name, server in self.servers.items():
try:
if hasattr(server, 'health_check'):
status[server_name] = "available" if server.health_check() else "unavailable"
else:
status[server_name] = "available"
except:
status[server_name] = "unavailable"
return status