""" Farmer.chat Pipeline - Main Orchestrator Coordinates Router → Executor → Compiler stages for alert generation FIXED: Proper async handling for FastAPI integration """ from typing import Dict, Any, Optional from .router import QueryRouter from .executor import MCPExecutor from .compiler import ResponseCompiler class FarmerChatPipeline: """ Main pipeline orchestrating the complete alert generation workflow. Architecture: Stage 1: QueryRouter - Determines which MCP servers to query Stage 2: MCPExecutor - Executes parallel calls to MCP servers Stage 3: ResponseCompiler - Compiles results into actionable alert summary """ def __init__(self, servers: Dict[str, Any], location: Dict[str, float]): """ Initialize pipeline with MCP servers and default location. Args: servers: Dict mapping server names to initialized server instances location: Default location dict with 'latitude' and 'longitude' keys """ self.servers = servers self.location = location # Initialize pipeline stages self.router = QueryRouter() self.executor = MCPExecutor(servers) self.compiler = ResponseCompiler() print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") async def generate_alert( self, location: Optional[Dict[str, float]] = None, location_name: str = "" ) -> Dict[str, Any]: """ Generate comprehensive alert summary for a location. ASYNC version for FastAPI endpoints. """ query_location = location or self.location print(f"\n{'='*60}") print(f"Generating Alert Summary for {location_name or 'Location'}") print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") print(f"{'='*60}\n") # Stage 1: Route - Always query all servers for alerts print("Stage 1: Routing to all MCP servers...") routing = self.router.route_alert_query(query_location) print(f"✓ Routing complete: All 5 servers will be queried") # Stage 2: Execute - Query all MCP servers in parallel (ASYNC) print("\nStage 2: Executing parallel MCP server calls...") mcp_results = await self.executor.execute_parallel_async(routing, query_location) success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully") # Stage 3: Compile - Extract ONLY alerting information print("\nStage 3: Compiling alert summary...") alert_summary = self.compiler.compile_alert_summary( mcp_results, query_location, location_name ) print("✓ Alert summary generated") print(f"\n{'='*60}\n") return { "alert_summary": alert_summary, "location": query_location, "location_name": location_name, "mcp_results": mcp_results } async def process_query( self, query: str, location: Optional[Dict[str, float]] = None ) -> Dict[str, Any]: """ Process a specific farmer query through the pipeline. ASYNC version for FastAPI endpoints. """ query_location = location or self.location print(f"\n{'='*60}") print(f"Processing Query: {query}") print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") print(f"{'='*60}\n") # Stage 1: Route print("Stage 1: Routing to all MCP servers...") routing = self.router.route_query(query, query_location) print(f"✓ Routing complete") # Stage 2: Execute MCP calls (ASYNC) print("\nStage 2: Executing MCP server calls...") mcp_results = await self.executor.execute_parallel_async(routing, query_location) success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded") # Stage 3: Compile response print("\nStage 3: Compiling response...") response = self.compiler.compile_response(query, mcp_results, query_location) print("✓ Response compiled") print(f"\n{'='*60}\n") return { "response": response, "location": query_location, "mcp_results": mcp_results } def update_location(self, location: Dict[str, float]): """Update default location for pipeline.""" self.location = location print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") def get_server_status(self) -> Dict[str, str]: """Get status of all MCP servers.""" status = {} for server_name, server in self.servers.items(): try: if hasattr(server, 'health_check'): status[server_name] = "available" if server.health_check() else "unavailable" else: status[server_name] = "available" except: status[server_name] = "unavailable" return status