# """ # Complete Multi-Stage MCP Pipeline # Orchestrates: Router → Executor → Compiler → Translator # """ # import time # from typing import Dict, Any # from openai import OpenAI # from .router import QueryRouter # from .executor import MCPExecutor, MCP_SERVER_REGISTRY # from .compiler import ResponseCompiler # from .translator import FarmerTranslator # class FarmerChatPipeline: # """Complete multi-stage MCP pipeline""" # def __init__(self, openai_client: OpenAI, location: Dict[str, Any]): # self.location = location # self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY) # self.executor = MCPExecutor() # self.compiler = ResponseCompiler() # self.translator = FarmerTranslator(openai_client) # async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]: # """ # Process farmer query through complete pipeline # Returns: # { # "query": str, # "routing": dict, # "compiled_data": dict, # "advice": str, # "pipeline_time_seconds": float # } # """ # if verbose: # print(f"\n🌾 Processing: {query}") # print(f"šŸ“ Location: {self.location['name']}") # pipeline_start = time.time() # # STAGE 1: Query Routing # if verbose: # print("šŸŽÆ Stage 1: Routing...") # routing = self.router.route(query, self.location) # if verbose: # print(f" → Servers: {', '.join(routing['required_servers'])}") # # STAGE 2: MCP Execution (Parallel) # if verbose: # print("āš™ļø Stage 2: Executing MCP servers...") # raw_results = await self.executor.execute_parallel( # routing['required_servers'], # self.location['lat'], # self.location['lon'] # ) # if verbose: # print(f" → Completed in {raw_results['execution_time_seconds']}s") # # STAGE 3: Response Compilation # if verbose: # print("šŸ”— Stage 3: Compiling results...") # compiled = self.compiler.compile(raw_results) # if verbose: # print(f" → {compiled['completeness']}") # # STAGE 4: Farmer Translation # if verbose: # print("🌾 Stage 4: Generating advice...") # farmer_advice = self.translator.translate(query, compiled, self.location) # pipeline_time = time.time() - pipeline_start # if verbose: # print(f"āœ… Complete! Total: {pipeline_time:.2f}s\n") # return { # "query": query, # "routing": routing, # "compiled_data": compiled, # "advice": farmer_advice, # "pipeline_time_seconds": round(pipeline_time, 2) # } """ Farmer.chat Pipeline - Main Orchestrator Coordinates Router → Executor → Compiler stages for alert generation FIXED: Proper async handling for FastAPI integration """ from typing import Dict, Any, Optional from .router import QueryRouter from .executor import MCPExecutor from .compiler import ResponseCompiler class FarmerChatPipeline: """ Main pipeline orchestrating the complete alert generation workflow. Architecture: Stage 1: QueryRouter - Determines which MCP servers to query Stage 2: MCPExecutor - Executes parallel calls to MCP servers Stage 3: ResponseCompiler - Compiles results into actionable alert summary """ def __init__(self, servers: Dict[str, Any], location: Dict[str, float]): """ Initialize pipeline with MCP servers and default location. Args: servers: Dict mapping server names to initialized server instances location: Default location dict with 'latitude' and 'longitude' keys """ self.servers = servers self.location = location # Initialize pipeline stages self.router = QueryRouter() self.executor = MCPExecutor(servers) self.compiler = ResponseCompiler() print(f"āœ“ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") async def generate_alert( self, location: Optional[Dict[str, float]] = None, location_name: str = "" ) -> Dict[str, Any]: """ Generate comprehensive alert summary for a location. ASYNC version for FastAPI endpoints. """ query_location = location or self.location print(f"\n{'='*60}") print(f"Generating Alert Summary for {location_name or 'Location'}") print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") print(f"{'='*60}\n") # Stage 1: Route - Always query all servers for alerts print("Stage 1: Routing to all MCP servers...") routing = self.router.route_alert_query(query_location) print(f"āœ“ Routing complete: All 5 servers will be queried") # Stage 2: Execute - Query all MCP servers in parallel (ASYNC) print("\nStage 2: Executing parallel MCP server calls...") mcp_results = await self.executor.execute_parallel_async(routing, query_location) success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") print(f"āœ“ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully") # Stage 3: Compile - Extract ONLY alerting information print("\nStage 3: Compiling alert summary...") alert_summary = self.compiler.compile_alert_summary( mcp_results, query_location, location_name ) print("āœ“ Alert summary generated") print(f"\n{'='*60}\n") return { "alert_summary": alert_summary, "location": query_location, "location_name": location_name, "mcp_results": mcp_results } async def process_query( self, query: str, location: Optional[Dict[str, float]] = None ) -> Dict[str, Any]: """ Process a specific farmer query through the pipeline. ASYNC version for FastAPI endpoints. """ query_location = location or self.location print(f"\n{'='*60}") print(f"Processing Query: {query}") print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") print(f"{'='*60}\n") # Stage 1: Route print("Stage 1: Routing to all MCP servers...") routing = self.router.route_query(query, query_location) print(f"āœ“ Routing complete") # Stage 2: Execute MCP calls (ASYNC) print("\nStage 2: Executing MCP server calls...") mcp_results = await self.executor.execute_parallel_async(routing, query_location) success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") print(f"āœ“ Execution complete: {success_count}/{len(mcp_results)} servers responded") # Stage 3: Compile response print("\nStage 3: Compiling response...") response = self.compiler.compile_response(query, mcp_results, query_location) print("āœ“ Response compiled") print(f"\n{'='*60}\n") return { "response": response, "location": query_location, "mcp_results": mcp_results } def update_location(self, location: Dict[str, float]): """Update default location for pipeline.""" self.location = location print(f"āœ“ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") def get_server_status(self) -> Dict[str, str]: """Get status of all MCP servers.""" status = {} for server_name, server in self.servers.items(): try: if hasattr(server, 'health_check'): status[server_name] = "available" if server.health_check() else "unavailable" else: status[server_name] = "available" except: status[server_name] = "unavailable" return status