File size: 5,574 Bytes
6afc01a
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
 
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
424a0c8
6afc01a
 
 
 
 
424a0c8
6afc01a
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
424a0c8
6afc01a
 
424a0c8
6afc01a
424a0c8
6afc01a
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
424a0c8
6afc01a
 
 
 
 
 
424a0c8
6afc01a
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router → Executor → Compiler stages for alert generation
FIXED: Proper async handling for FastAPI integration
"""

from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler


class FarmerChatPipeline:
    """
    Main pipeline orchestrating the complete alert generation workflow.
    
    Architecture:
    Stage 1: QueryRouter - Determines which MCP servers to query
    Stage 2: MCPExecutor - Executes parallel calls to MCP servers  
    Stage 3: ResponseCompiler - Compiles results into actionable alert summary
    """
    
    def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
        """
        Initialize pipeline with MCP servers and default location.
        
        Args:
            servers: Dict mapping server names to initialized server instances
            location: Default location dict with 'latitude' and 'longitude' keys
        """
        self.servers = servers
        self.location = location
        
        # Initialize pipeline stages
        self.router = QueryRouter()
        self.executor = MCPExecutor(servers)
        self.compiler = ResponseCompiler()
        
        print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
    
    async def generate_alert(
        self, 
        location: Optional[Dict[str, float]] = None,
        location_name: str = ""
    ) -> Dict[str, Any]:
        """
        Generate comprehensive alert summary for a location.
        ASYNC version for FastAPI endpoints.
        """
        query_location = location or self.location
        
        print(f"\n{'='*60}")
        print(f"Generating Alert Summary for {location_name or 'Location'}")
        print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
        print(f"{'='*60}\n")
        
        # Stage 1: Route - Always query all servers for alerts
        print("Stage 1: Routing to all MCP servers...")
        routing = self.router.route_alert_query(query_location)
        print(f"✓ Routing complete: All 5 servers will be queried")
        
        # Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
        print("\nStage 2: Executing parallel MCP server calls...")
        mcp_results = await self.executor.execute_parallel_async(routing, query_location)
        
        success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
        print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
        
        # Stage 3: Compile - Extract ONLY alerting information
        print("\nStage 3: Compiling alert summary...")
        alert_summary = self.compiler.compile_alert_summary(
            mcp_results, 
            query_location,
            location_name
        )
        print("✓ Alert summary generated")
        
        print(f"\n{'='*60}\n")
        
        return {
            "alert_summary": alert_summary,
            "location": query_location,
            "location_name": location_name,
            "mcp_results": mcp_results
        }
    
    async def process_query(
        self, 
        query: str, 
        location: Optional[Dict[str, float]] = None
    ) -> Dict[str, Any]:
        """
        Process a specific farmer query through the pipeline.
        ASYNC version for FastAPI endpoints.
        """
        query_location = location or self.location
        
        print(f"\n{'='*60}")
        print(f"Processing Query: {query}")
        print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
        print(f"{'='*60}\n")
        
        # Stage 1: Route
        print("Stage 1: Routing to all MCP servers...")
        routing = self.router.route_query(query, query_location)
        print(f"✓ Routing complete")
        
        # Stage 2: Execute MCP calls (ASYNC)
        print("\nStage 2: Executing MCP server calls...")
        mcp_results = await self.executor.execute_parallel_async(routing, query_location)
        
        success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
        print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded")
        
        # Stage 3: Compile response
        print("\nStage 3: Compiling response...")
        response = self.compiler.compile_response(query, mcp_results, query_location)
        print("✓ Response compiled")
        
        print(f"\n{'='*60}\n")
        
        return {
            "response": response,
            "location": query_location,
            "mcp_results": mcp_results
        }
    
    def update_location(self, location: Dict[str, float]):
        """Update default location for pipeline."""
        self.location = location
        print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
    
    def get_server_status(self) -> Dict[str, str]:
        """Get status of all MCP servers."""
        status = {}
        for server_name, server in self.servers.items():
            try:
                if hasattr(server, 'health_check'):
                    status[server_name] = "available" if server.health_check() else "unavailable"
                else:
                    status[server_name] = "available"
            except:
                status[server_name] = "unavailable"
        return status