Spaces:
Sleeping
Sleeping
| # """ | |
| # Complete Multi-Stage MCP Pipeline | |
| # Orchestrates: Router → Executor → Compiler → Translator | |
| # """ | |
| # import time | |
| # from typing import Dict, Any | |
| # from openai import OpenAI | |
| # from .router import QueryRouter | |
| # from .executor import MCPExecutor, MCP_SERVER_REGISTRY | |
| # from .compiler import ResponseCompiler | |
| # from .translator import FarmerTranslator | |
| # class FarmerChatPipeline: | |
| # """Complete multi-stage MCP pipeline""" | |
| # def __init__(self, openai_client: OpenAI, location: Dict[str, Any]): | |
| # self.location = location | |
| # self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY) | |
| # self.executor = MCPExecutor() | |
| # self.compiler = ResponseCompiler() | |
| # self.translator = FarmerTranslator(openai_client) | |
| # async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]: | |
| # """ | |
| # Process farmer query through complete pipeline | |
| # Returns: | |
| # { | |
| # "query": str, | |
| # "routing": dict, | |
| # "compiled_data": dict, | |
| # "advice": str, | |
| # "pipeline_time_seconds": float | |
| # } | |
| # """ | |
| # if verbose: | |
| # print(f"\n🌾 Processing: {query}") | |
| # print(f"📍 Location: {self.location['name']}") | |
| # pipeline_start = time.time() | |
| # # STAGE 1: Query Routing | |
| # if verbose: | |
| # print("🎯 Stage 1: Routing...") | |
| # routing = self.router.route(query, self.location) | |
| # if verbose: | |
| # print(f" → Servers: {', '.join(routing['required_servers'])}") | |
| # # STAGE 2: MCP Execution (Parallel) | |
| # if verbose: | |
| # print("⚙️ Stage 2: Executing MCP servers...") | |
| # raw_results = await self.executor.execute_parallel( | |
| # routing['required_servers'], | |
| # self.location['lat'], | |
| # self.location['lon'] | |
| # ) | |
| # if verbose: | |
| # print(f" → Completed in {raw_results['execution_time_seconds']}s") | |
| # # STAGE 3: Response Compilation | |
| # if verbose: | |
| # print("🔗 Stage 3: Compiling results...") | |
| # compiled = self.compiler.compile(raw_results) | |
| # if verbose: | |
| # print(f" → {compiled['completeness']}") | |
| # # STAGE 4: Farmer Translation | |
| # if verbose: | |
| # print("🌾 Stage 4: Generating advice...") | |
| # farmer_advice = self.translator.translate(query, compiled, self.location) | |
| # pipeline_time = time.time() - pipeline_start | |
| # if verbose: | |
| # print(f"✅ Complete! Total: {pipeline_time:.2f}s\n") | |
| # return { | |
| # "query": query, | |
| # "routing": routing, | |
| # "compiled_data": compiled, | |
| # "advice": farmer_advice, | |
| # "pipeline_time_seconds": round(pipeline_time, 2) | |
| # } | |
| """ | |
| Farmer.chat Pipeline - Main Orchestrator | |
| Coordinates Router → Executor → Compiler stages for alert generation | |
| FIXED: Proper async handling for FastAPI integration | |
| """ | |
| from typing import Dict, Any, Optional | |
| from .router import QueryRouter | |
| from .executor import MCPExecutor | |
| from .compiler import ResponseCompiler | |
| class FarmerChatPipeline: | |
| """ | |
| Main pipeline orchestrating the complete alert generation workflow. | |
| Architecture: | |
| Stage 1: QueryRouter - Determines which MCP servers to query | |
| Stage 2: MCPExecutor - Executes parallel calls to MCP servers | |
| Stage 3: ResponseCompiler - Compiles results into actionable alert summary | |
| """ | |
| def __init__(self, servers: Dict[str, Any], location: Dict[str, float]): | |
| """ | |
| Initialize pipeline with MCP servers and default location. | |
| Args: | |
| servers: Dict mapping server names to initialized server instances | |
| location: Default location dict with 'latitude' and 'longitude' keys | |
| """ | |
| self.servers = servers | |
| self.location = location | |
| # Initialize pipeline stages | |
| self.router = QueryRouter() | |
| self.executor = MCPExecutor(servers) | |
| self.compiler = ResponseCompiler() | |
| print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") | |
| async def generate_alert( | |
| self, | |
| location: Optional[Dict[str, float]] = None, | |
| location_name: str = "" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate comprehensive alert summary for a location. | |
| ASYNC version for FastAPI endpoints. | |
| """ | |
| query_location = location or self.location | |
| print(f"\n{'='*60}") | |
| print(f"Generating Alert Summary for {location_name or 'Location'}") | |
| print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") | |
| print(f"{'='*60}\n") | |
| # Stage 1: Route - Always query all servers for alerts | |
| print("Stage 1: Routing to all MCP servers...") | |
| routing = self.router.route_alert_query(query_location) | |
| print(f"✓ Routing complete: All 5 servers will be queried") | |
| # Stage 2: Execute - Query all MCP servers in parallel (ASYNC) | |
| print("\nStage 2: Executing parallel MCP server calls...") | |
| mcp_results = await self.executor.execute_parallel_async(routing, query_location) | |
| success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") | |
| print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully") | |
| # Stage 3: Compile - Extract ONLY alerting information | |
| print("\nStage 3: Compiling alert summary...") | |
| alert_summary = self.compiler.compile_alert_summary( | |
| mcp_results, | |
| query_location, | |
| location_name | |
| ) | |
| print("✓ Alert summary generated") | |
| print(f"\n{'='*60}\n") | |
| return { | |
| "alert_summary": alert_summary, | |
| "location": query_location, | |
| "location_name": location_name, | |
| "mcp_results": mcp_results | |
| } | |
| async def process_query( | |
| self, | |
| query: str, | |
| location: Optional[Dict[str, float]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a specific farmer query through the pipeline. | |
| ASYNC version for FastAPI endpoints. | |
| """ | |
| query_location = location or self.location | |
| print(f"\n{'='*60}") | |
| print(f"Processing Query: {query}") | |
| print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E") | |
| print(f"{'='*60}\n") | |
| # Stage 1: Route | |
| print("Stage 1: Routing to all MCP servers...") | |
| routing = self.router.route_query(query, query_location) | |
| print(f"✓ Routing complete") | |
| # Stage 2: Execute MCP calls (ASYNC) | |
| print("\nStage 2: Executing MCP server calls...") | |
| mcp_results = await self.executor.execute_parallel_async(routing, query_location) | |
| success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success") | |
| print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded") | |
| # Stage 3: Compile response | |
| print("\nStage 3: Compiling response...") | |
| response = self.compiler.compile_response(query, mcp_results, query_location) | |
| print("✓ Response compiled") | |
| print(f"\n{'='*60}\n") | |
| return { | |
| "response": response, | |
| "location": query_location, | |
| "mcp_results": mcp_results | |
| } | |
| def update_location(self, location: Dict[str, float]): | |
| """Update default location for pipeline.""" | |
| self.location = location | |
| print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E") | |
| def get_server_status(self) -> Dict[str, str]: | |
| """Get status of all MCP servers.""" | |
| status = {} | |
| for server_name, server in self.servers.items(): | |
| try: | |
| if hasattr(server, 'health_check'): | |
| status[server_name] = "available" if server.health_check() else "unavailable" | |
| else: | |
| status[server_name] = "available" | |
| except: | |
| status[server_name] = "unavailable" | |
| return status |