Spaces:
Running
Running
| """ | |
| Farmer.chat Pipeline - Main Orchestrator | |
| Coordinates Router → Executor → Compiler stages for alert generation | |
| FIXED: Proper async handling for FastAPI integration | |
| """ | |
| from typing import Dict, Any, Optional | |
| from .router import QueryRouter | |
| from .executor import MCPExecutor | |
| from .compiler import ResponseCompiler | |
| class FarmerChatPipeline: | |
| """ | |
| Main pipeline orchestrating the complete alert generation workflow. | |
| Architecture: | |
| Stage 1: QueryRouter - Determines which MCP servers to query | |
| Stage 2: MCPExecutor - Executes parallel calls to MCP servers | |
| Stage 3: ResponseCompiler - Compiles results into actionable alert summary | |
| """ | |
| def __init__(self, servers: Dict[str, Any], location: Dict[str, float]): | |
| """ | |
| Initialize pipeline with MCP servers and default location. | |
| Args: | |
| servers: Dict mapping server names to initialized server instances | |
| location: Default location dict with 'latitude' and 'longitude' keys | |
| """ | |
| self.servers = servers | |
| self.location = location | |
| # Initialize pipeline stages | |
| self.router = QueryRouter() | |
| self.executor = MCPExecutor(servers) | |
| self.compiler = ResponseCompiler() | |
| print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") | |
| async def generate_alert( | |
| self, | |
| location: Optional[Dict[str, float]] = None, | |
| location_name: str = "" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate comprehensive alert summary for a location. | |
| ASYNC version for FastAPI endpoints. | |
| """ | |
| query_location = location or self.location | |
| print(f"\n{'='*60}") | |
| print(f"Generating Alert Summary for {location_name or 'Location'}") | |
| print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") | |
| print(f"{'='*60}\n") | |
| # Stage 1: Route - Always query all servers for alerts | |
| print("Stage 1: Routing to all MCP servers...") | |
| routing = self.router.route_alert_query(query_location) | |
| print(f"✓ Routing complete: All 5 servers will be queried") | |
| # Stage 2: Execute - Query all MCP servers in parallel (ASYNC) | |
| print("\nStage 2: Executing parallel MCP server calls...") | |
| mcp_results = await self.executor.execute_parallel_async(routing, query_location) | |
| success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") | |
| print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully") | |
| # Stage 3: Compile - Extract ONLY alerting information | |
| print("\nStage 3: Compiling alert summary...") | |
| alert_summary = self.compiler.compile_alert_summary( | |
| mcp_results, | |
| query_location, | |
| location_name | |
| ) | |
| print("✓ Alert summary generated") | |
| print(f"\n{'='*60}\n") | |
| return { | |
| "alert_summary": alert_summary, | |
| "location": query_location, | |
| "location_name": location_name, | |
| "mcp_results": mcp_results | |
| } | |
| async def process_query( | |
| self, | |
| query: str, | |
| location: Optional[Dict[str, float]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a specific farmer query through the pipeline. | |
| ASYNC version for FastAPI endpoints. | |
| """ | |
| query_location = location or self.location | |
| print(f"\n{'='*60}") | |
| print(f"Processing Query: {query}") | |
| print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") | |
| print(f"{'='*60}\n") | |
| # Stage 1: Route | |
| print("Stage 1: Routing to all MCP servers...") | |
| routing = self.router.route_query(query, query_location) | |
| print(f"✓ Routing complete") | |
| # Stage 2: Execute MCP calls (ASYNC) | |
| print("\nStage 2: Executing MCP server calls...") | |
| mcp_results = await self.executor.execute_parallel_async(routing, query_location) | |
| success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") | |
| print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded") | |
| # Stage 3: Compile response | |
| print("\nStage 3: Compiling response...") | |
| response = self.compiler.compile_response(query, mcp_results, query_location) | |
| print("✓ Response compiled") | |
| print(f"\n{'='*60}\n") | |
| return { | |
| "response": response, | |
| "location": query_location, | |
| "mcp_results": mcp_results | |
| } | |
| def update_location(self, location: Dict[str, float]): | |
| """Update default location for pipeline.""" | |
| self.location = location | |
| print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") | |
| def get_server_status(self) -> Dict[str, str]: | |
| """Get status of all MCP servers.""" | |
| status = {} | |
| for server_name, server in self.servers.items(): | |
| try: | |
| if hasattr(server, 'health_check'): | |
| status[server_name] = "available" if server.health_check() else "unavailable" | |
| else: | |
| status[server_name] = "available" | |
| except: | |
| status[server_name] = "unavailable" | |
| return status |