Spaces:
Running
Running
| # """ | |
| # Complete Multi-Stage MCP Pipeline | |
| # Orchestrates: Router → Executor → Compiler → Translator | |
| # """ | |
| # import time | |
| # from typing import Dict, Any | |
| # from openai import OpenAI | |
| # from .router import QueryRouter | |
| # from .executor import MCPExecutor, MCP_SERVER_REGISTRY | |
| # from .compiler import ResponseCompiler | |
| # from .translator import FarmerTranslator | |
| # class FarmerChatPipeline: | |
| # """Complete multi-stage MCP pipeline""" | |
| # def __init__(self, openai_client: OpenAI, location: Dict[str, Any]): | |
| # self.location = location | |
| # self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY) | |
| # self.executor = MCPExecutor() | |
| # self.compiler = ResponseCompiler() | |
| # self.translator = FarmerTranslator(openai_client) | |
| # async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]: | |
| # """ | |
| # Process farmer query through complete pipeline | |
| # Returns: | |
| # { | |
| # "query": str, | |
| # "routing": dict, | |
| # "compiled_data": dict, | |
| # "advice": str, | |
| # "pipeline_time_seconds": float | |
| # } | |
| # """ | |
| # if verbose: | |
| # print(f"\n🌾 Processing: {query}") | |
| # print(f"📍 Location: {self.location['name']}") | |
| # pipeline_start = time.time() | |
| # # STAGE 1: Query Routing | |
| # if verbose: | |
| # print("🎯 Stage 1: Routing...") | |
| # routing = self.router.route(query, self.location) | |
| # if verbose: | |
| # print(f" → Servers: {', '.join(routing['required_servers'])}") | |
| # # STAGE 2: MCP Execution (Parallel) | |
| # if verbose: | |
| # print("⚙️ Stage 2: Executing MCP servers...") | |
| # raw_results = await self.executor.execute_parallel( | |
| # routing['required_servers'], | |
| # self.location['lat'], | |
| # self.location['lon'] | |
| # ) | |
| # if verbose: | |
| # print(f" → Completed in {raw_results['execution_time_seconds']}s") | |
| # # STAGE 3: Response Compilation | |
| # if verbose: | |
| # print("🔗 Stage 3: Compiling results...") | |
| # compiled = self.compiler.compile(raw_results) | |
| # if verbose: | |
| # print(f" → {compiled['completeness']}") | |
| # # STAGE 4: Farmer Translation | |
| # if verbose: | |
| # print("🌾 Stage 4: Generating advice...") | |
| # farmer_advice = self.translator.translate(query, compiled, self.location) | |
| # pipeline_time = time.time() - pipeline_start | |
| # if verbose: | |
| # print(f"✅ Complete! Total: {pipeline_time:.2f}s\n") | |
| # return { | |
| # "query": query, | |
| # "routing": routing, | |
| # "compiled_data": compiled, | |
| # "advice": farmer_advice, | |
| # "pipeline_time_seconds": round(pipeline_time, 2) | |
| # } | |
| """ | |
| Farmer.chat Pipeline - Main Orchestrator | |
| Coordinates Router → Executor → Compiler stages for alert generation | |
| """ | |
| from typing import Dict, Any, Optional | |
| from .router import QueryRouter | |
| from .executor import MCPExecutor | |
| from .compiler import ResponseCompiler | |
| class FarmerChatPipeline: | |
| """ | |
| Main pipeline orchestrating the complete alert generation workflow. | |
| Architecture: | |
| Stage 1: QueryRouter - Determines which MCP servers to query with priorities | |
| Stage 2: MCPExecutor - Executes parallel calls to selected MCP servers | |
| Stage 3: ResponseCompiler - Compiles results into actionable alert summary | |
| """ | |
| def __init__(self, servers: Dict[str, Any], location: Dict[str, float]): | |
| """ | |
| Initialize pipeline with MCP servers and default location. | |
| Args: | |
| servers: Dict mapping server names to initialized server instances | |
| location: Default location dict with 'latitude' and 'longitude' keys | |
| """ | |
| self.servers = servers | |
| self.location = location | |
| # Initialize pipeline stages | |
| self.router = QueryRouter() | |
| self.executor = MCPExecutor(servers) | |
| self.compiler = ResponseCompiler() | |
| print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") | |
| def generate_alert( | |
| self, | |
| location: Optional[Dict[str, float]] = None, | |
| location_name: str = "" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate comprehensive alert summary for a location. | |
| Always queries ALL MCP servers. The compiler extracts only | |
| the alerting/concerning information from the comprehensive data. | |
| Args: | |
| location: Optional location override. Uses default if not provided. | |
| location_name: Human-readable location name for context | |
| Returns: | |
| Dict containing: | |
| - alert_summary: Compiled alert text (only concerning info) | |
| - location: Location coordinates used | |
| - mcp_results: Raw results from each MCP server | |
| """ | |
| # Use provided location or default | |
| query_location = location or self.location | |
| print(f"\n{'='*60}") | |
| print(f"Generating Alert Summary for {location_name or 'Location'}") | |
| print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") | |
| print(f"{'='*60}\n") | |
| # Stage 1: Route - Always query all servers for alerts | |
| print("Stage 1: Routing to all MCP servers...") | |
| routing = self.router.route_alert_query(query_location) | |
| print(f"✓ Routing complete: All 5 servers will be queried") | |
| # Stage 2: Execute - Query all MCP servers in parallel | |
| print("\nStage 2: Executing parallel MCP server calls...") | |
| mcp_results = self.executor.execute_parallel(routing, query_location) | |
| success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") | |
| print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully") | |
| # Stage 3: Compile - Extract ONLY alerting information | |
| print("\nStage 3: Compiling alert summary (extracting concerning info only)...") | |
| alert_summary = self.compiler.compile_alert_summary( | |
| mcp_results, | |
| query_location, | |
| location_name | |
| ) | |
| print("✓ Alert summary generated (focusing on alerts/concerns only)") | |
| print(f"\n{'='*60}\n") | |
| return { | |
| "alert_summary": alert_summary, | |
| "location": query_location, | |
| "location_name": location_name, | |
| "mcp_results": mcp_results | |
| } | |
| def process_query( | |
| self, | |
| query: str, | |
| location: Optional[Dict[str, float]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a specific farmer query through the pipeline. | |
| For now, also queries all servers to ensure comprehensive data. | |
| The compiler extracts query-relevant information. | |
| Args: | |
| query: Farmer's question or request | |
| location: Optional location override | |
| Returns: | |
| Dict containing: | |
| - response: Compiled response text | |
| - location: Location coordinates used | |
| - mcp_results: Raw results from MCP servers | |
| """ | |
| # Use provided location or default | |
| query_location = location or self.location | |
| print(f"\n{'='*60}") | |
| print(f"Processing Query: {query}") | |
| print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") | |
| print(f"{'='*60}\n") | |
| # Stage 1: Route - For now, query all servers | |
| print("Stage 1: Routing to all MCP servers...") | |
| routing = self.router.route_query(query, query_location) | |
| print(f"✓ Routing complete: All servers will be queried") | |
| # Stage 2: Execute MCP calls | |
| print("\nStage 2: Executing MCP server calls...") | |
| mcp_results = self.executor.execute_parallel(routing, query_location) | |
| success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") | |
| print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded") | |
| # Stage 3: Compile response | |
| print("\nStage 3: Compiling response...") | |
| response = self.compiler.compile_response(query, mcp_results, query_location) | |
| print("✓ Response compiled") | |
| print(f"\n{'='*60}\n") | |
| return { | |
| "response": response, | |
| "location": query_location, | |
| "mcp_results": mcp_results | |
| } | |
| def update_location(self, location: Dict[str, float]): | |
| """ | |
| Update default location for pipeline. | |
| Args: | |
| location: New default location dict with 'latitude' and 'longitude' keys | |
| """ | |
| self.location = location | |
| print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") | |
| def get_server_status(self) -> Dict[str, str]: | |
| """ | |
| Get status of all MCP servers. | |
| Returns: | |
| Dict mapping server names to "available" or "unavailable" | |
| """ | |
| status = {} | |
| for server_name, server in self.servers.items(): | |
| try: | |
| # Try to check if server is responsive | |
| if hasattr(server, 'health_check'): | |
| status[server_name] = "available" if server.health_check() else "unavailable" | |
| else: | |
| status[server_name] = "available" # Assume available if no health check | |
| except: | |
| status[server_name] = "unavailable" | |
| return status |