File size: 10,011 Bytes
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# """
# Complete Multi-Stage MCP Pipeline
# Orchestrates: Router β†’ Executor β†’ Compiler β†’ Translator
# """

# import time
# from typing import Dict, Any
# from openai import OpenAI

# from .router import QueryRouter
# from .executor import MCPExecutor, MCP_SERVER_REGISTRY
# from .compiler import ResponseCompiler
# from .translator import FarmerTranslator


# class FarmerChatPipeline:
#     """Complete multi-stage MCP pipeline"""

#     def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
#         self.location = location
#         self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
#         self.executor = MCPExecutor()
#         self.compiler = ResponseCompiler()
#         self.translator = FarmerTranslator(openai_client)

#     async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
#         """
#         Process farmer query through complete pipeline
        
#         Returns:
#             {
#                 "query": str,
#                 "routing": dict,
#                 "compiled_data": dict,
#                 "advice": str,
#                 "pipeline_time_seconds": float
#             }
#         """
#         if verbose:
#             print(f"\n🌾 Processing: {query}")
#             print(f"πŸ“ Location: {self.location['name']}")

#         pipeline_start = time.time()

#         # STAGE 1: Query Routing
#         if verbose:
#             print("🎯 Stage 1: Routing...")
        
#         routing = self.router.route(query, self.location)
        
#         if verbose:
#             print(f"   β†’ Servers: {', '.join(routing['required_servers'])}")

#         # STAGE 2: MCP Execution (Parallel)
#         if verbose:
#             print("βš™οΈ Stage 2: Executing MCP servers...")
        
#         raw_results = await self.executor.execute_parallel(
#             routing['required_servers'],
#             self.location['lat'],
#             self.location['lon']
#         )
        
#         if verbose:
#             print(f"   β†’ Completed in {raw_results['execution_time_seconds']}s")

#         # STAGE 3: Response Compilation
#         if verbose:
#             print("πŸ”— Stage 3: Compiling results...")
        
#         compiled = self.compiler.compile(raw_results)
        
#         if verbose:
#             print(f"   β†’ {compiled['completeness']}")

#         # STAGE 4: Farmer Translation
#         if verbose:
#             print("🌾 Stage 4: Generating advice...")
        
#         farmer_advice = self.translator.translate(query, compiled, self.location)

#         pipeline_time = time.time() - pipeline_start

#         if verbose:
#             print(f"βœ… Complete! Total: {pipeline_time:.2f}s\n")

#         return {
#             "query": query,
#             "routing": routing,
#             "compiled_data": compiled,
#             "advice": farmer_advice,
#             "pipeline_time_seconds": round(pipeline_time, 2)
#         }

"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router β†’ Executor β†’ Compiler stages for alert generation
"""

from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler


class FarmerChatPipeline:
    """
    Main pipeline orchestrating the complete alert generation workflow.
    
    Architecture:
    Stage 1: QueryRouter - Determines which MCP servers to query with priorities
    Stage 2: MCPExecutor - Executes parallel calls to selected MCP servers
    Stage 3: ResponseCompiler - Compiles results into actionable alert summary
    """
    
    def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
        """
        Initialize pipeline with MCP servers and default location.
        
        Args:
            servers: Dict mapping server names to initialized server instances
            location: Default location dict with 'latitude' and 'longitude' keys
        """
        self.servers = servers
        self.location = location
        
        # Initialize pipeline stages
        self.router = QueryRouter()
        self.executor = MCPExecutor(servers)
        self.compiler = ResponseCompiler()
        
        print(f"βœ“ Pipeline initialized for location: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
    
    def generate_alert(
        self, 
        location: Optional[Dict[str, float]] = None,
        location_name: str = ""
    ) -> Dict[str, Any]:
        """
        Generate comprehensive alert summary for a location.
        
        Always queries ALL MCP servers. The compiler extracts only
        the alerting/concerning information from the comprehensive data.
        
        Args:
            location: Optional location override. Uses default if not provided.
            location_name: Human-readable location name for context
            
        Returns:
            Dict containing:
                - alert_summary: Compiled alert text (only concerning info)
                - location: Location coordinates used
                - mcp_results: Raw results from each MCP server
        """
        
        # Use provided location or default
        query_location = location or self.location
        
        print(f"\n{'='*60}")
        print(f"Generating Alert Summary for {location_name or 'Location'}")
        print(f"Coordinates: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
        print(f"{'='*60}\n")
        
        # Stage 1: Route - Always query all servers for alerts
        print("Stage 1: Routing to all MCP servers...")
        routing = self.router.route_alert_query(query_location)
        print(f"βœ“ Routing complete: All 5 servers will be queried")
        
        # Stage 2: Execute - Query all MCP servers in parallel
        print("\nStage 2: Executing parallel MCP server calls...")
        mcp_results = self.executor.execute_parallel(routing, query_location)
        
        success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
        print(f"βœ“ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
        
        # Stage 3: Compile - Extract ONLY alerting information
        print("\nStage 3: Compiling alert summary (extracting concerning info only)...")
        alert_summary = self.compiler.compile_alert_summary(
            mcp_results, 
            query_location,
            location_name
        )
        print("βœ“ Alert summary generated (focusing on alerts/concerns only)")
        
        print(f"\n{'='*60}\n")
        
        return {
            "alert_summary": alert_summary,
            "location": query_location,
            "location_name": location_name,
            "mcp_results": mcp_results
        }
    
    def process_query(
        self, 
        query: str, 
        location: Optional[Dict[str, float]] = None
    ) -> Dict[str, Any]:
        """
        Process a specific farmer query through the pipeline.
        
        For now, also queries all servers to ensure comprehensive data.
        The compiler extracts query-relevant information.
        
        Args:
            query: Farmer's question or request
            location: Optional location override
            
        Returns:
            Dict containing:
                - response: Compiled response text
                - location: Location coordinates used
                - mcp_results: Raw results from MCP servers
        """
        
        # Use provided location or default
        query_location = location or self.location
        
        print(f"\n{'='*60}")
        print(f"Processing Query: {query}")
        print(f"Location: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
        print(f"{'='*60}\n")
        
        # Stage 1: Route - For now, query all servers
        print("Stage 1: Routing to all MCP servers...")
        routing = self.router.route_query(query, query_location)
        print(f"βœ“ Routing complete: All servers will be queried")
        
        # Stage 2: Execute MCP calls
        print("\nStage 2: Executing MCP server calls...")
        mcp_results = self.executor.execute_parallel(routing, query_location)
        
        success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
        print(f"βœ“ Execution complete: {success_count}/{len(mcp_results)} servers responded")
        
        # Stage 3: Compile response
        print("\nStage 3: Compiling response...")
        response = self.compiler.compile_response(query, mcp_results, query_location)
        print("βœ“ Response compiled")
        
        print(f"\n{'='*60}\n")
        
        return {
            "response": response,
            "location": query_location,
            "mcp_results": mcp_results
        }
    
    def update_location(self, location: Dict[str, float]):
        """
        Update default location for pipeline.
        
        Args:
            location: New default location dict with 'latitude' and 'longitude' keys
        """
        self.location = location
        print(f"βœ“ Default location updated to: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
    
    def get_server_status(self) -> Dict[str, str]:
        """
        Get status of all MCP servers.
        
        Returns:
            Dict mapping server names to "available" or "unavailable"
        """
        status = {}
        for server_name, server in self.servers.items():
            try:
                # Try to check if server is responsive
                if hasattr(server, 'health_check'):
                    status[server_name] = "available" if server.health_check() else "unavailable"
                else:
                    status[server_name] = "available"  # Assume available if no health check
            except:
                status[server_name] = "unavailable"
        
        return status