File size: 8,553 Bytes
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
 
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
424a0c8
6afc01a
 
 
 
 
424a0c8
6afc01a
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
424a0c8
6afc01a
 
 
 
 
 
 
 
424a0c8
6afc01a
 
424a0c8
6afc01a
424a0c8
6afc01a
424a0c8
6afc01a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424a0c8
6afc01a
 
 
 
424a0c8
6afc01a
 
 
 
 
 
424a0c8
6afc01a
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# """
# Complete Multi-Stage MCP Pipeline
# Orchestrates: Router β†’ Executor β†’ Compiler β†’ Translator
# """

# import time
# from typing import Dict, Any
# from openai import OpenAI

# from .router import QueryRouter
# from .executor import MCPExecutor, MCP_SERVER_REGISTRY
# from .compiler import ResponseCompiler
# from .translator import FarmerTranslator


# class FarmerChatPipeline:
#     """Complete multi-stage MCP pipeline"""

#     def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
#         self.location = location
#         self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
#         self.executor = MCPExecutor()
#         self.compiler = ResponseCompiler()
#         self.translator = FarmerTranslator(openai_client)

#     async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
#         """
#         Process farmer query through complete pipeline
        
#         Returns:
#             {
#                 "query": str,
#                 "routing": dict,
#                 "compiled_data": dict,
#                 "advice": str,
#                 "pipeline_time_seconds": float
#             }
#         """
#         if verbose:
#             print(f"\n🌾 Processing: {query}")
#             print(f"πŸ“ Location: {self.location['name']}")

#         pipeline_start = time.time()

#         # STAGE 1: Query Routing
#         if verbose:
#             print("🎯 Stage 1: Routing...")
        
#         routing = self.router.route(query, self.location)
        
#         if verbose:
#             print(f"   β†’ Servers: {', '.join(routing['required_servers'])}")

#         # STAGE 2: MCP Execution (Parallel)
#         if verbose:
#             print("βš™οΈ Stage 2: Executing MCP servers...")
        
#         raw_results = await self.executor.execute_parallel(
#             routing['required_servers'],
#             self.location['lat'],
#             self.location['lon']
#         )
        
#         if verbose:
#             print(f"   β†’ Completed in {raw_results['execution_time_seconds']}s")

#         # STAGE 3: Response Compilation
#         if verbose:
#             print("πŸ”— Stage 3: Compiling results...")
        
#         compiled = self.compiler.compile(raw_results)
        
#         if verbose:
#             print(f"   β†’ {compiled['completeness']}")

#         # STAGE 4: Farmer Translation
#         if verbose:
#             print("🌾 Stage 4: Generating advice...")
        
#         farmer_advice = self.translator.translate(query, compiled, self.location)

#         pipeline_time = time.time() - pipeline_start

#         if verbose:
#             print(f"βœ… Complete! Total: {pipeline_time:.2f}s\n")

#         return {
#             "query": query,
#             "routing": routing,
#             "compiled_data": compiled,
#             "advice": farmer_advice,
#             "pipeline_time_seconds": round(pipeline_time, 2)
#         }

"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router β†’ Executor β†’ Compiler stages for alert generation
FIXED: Proper async handling for FastAPI integration
"""

from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler


class FarmerChatPipeline:
    """
    Main pipeline orchestrating the complete alert generation workflow.
    
    Architecture:
    Stage 1: QueryRouter - Determines which MCP servers to query
    Stage 2: MCPExecutor - Executes parallel calls to MCP servers  
    Stage 3: ResponseCompiler - Compiles results into actionable alert summary
    """
    
    def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
        """
        Initialize pipeline with MCP servers and default location.
        
        Args:
            servers: Dict mapping server names to initialized server instances
            location: Default location dict with 'latitude' and 'longitude' keys
        """
        self.servers = servers
        self.location = location
        
        # Initialize pipeline stages
        self.router = QueryRouter()
        self.executor = MCPExecutor(servers)
        self.compiler = ResponseCompiler()
        
        print(f"βœ“ Pipeline initialized for location: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
    
    async def generate_alert(
        self, 
        location: Optional[Dict[str, float]] = None,
        location_name: str = ""
    ) -> Dict[str, Any]:
        """
        Generate comprehensive alert summary for a location.
        ASYNC version for FastAPI endpoints.
        """
        query_location = location or self.location
        
        print(f"\n{'='*60}")
        print(f"Generating Alert Summary for {location_name or 'Location'}")
        print(f"Coordinates: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
        print(f"{'='*60}\n")
        
        # Stage 1: Route - Always query all servers for alerts
        print("Stage 1: Routing to all MCP servers...")
        routing = self.router.route_alert_query(query_location)
        print(f"βœ“ Routing complete: All 5 servers will be queried")
        
        # Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
        print("\nStage 2: Executing parallel MCP server calls...")
        mcp_results = await self.executor.execute_parallel_async(routing, query_location)
        
        success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
        print(f"βœ“ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
        
        # Stage 3: Compile - Extract ONLY alerting information
        print("\nStage 3: Compiling alert summary...")
        alert_summary = self.compiler.compile_alert_summary(
            mcp_results, 
            query_location,
            location_name
        )
        print("βœ“ Alert summary generated")
        
        print(f"\n{'='*60}\n")
        
        return {
            "alert_summary": alert_summary,
            "location": query_location,
            "location_name": location_name,
            "mcp_results": mcp_results
        }
    
    async def process_query(
        self, 
        query: str, 
        location: Optional[Dict[str, float]] = None
    ) -> Dict[str, Any]:
        """
        Process a specific farmer query through the pipeline.
        ASYNC version for FastAPI endpoints.
        """
        query_location = location or self.location
        
        print(f"\n{'='*60}")
        print(f"Processing Query: {query}")
        print(f"Location: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
        print(f"{'='*60}\n")
        
        # Stage 1: Route
        print("Stage 1: Routing to all MCP servers...")
        routing = self.router.route_query(query, query_location)
        print(f"βœ“ Routing complete")
        
        # Stage 2: Execute MCP calls (ASYNC)
        print("\nStage 2: Executing MCP server calls...")
        mcp_results = await self.executor.execute_parallel_async(routing, query_location)
        
        success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
        print(f"βœ“ Execution complete: {success_count}/{len(mcp_results)} servers responded")
        
        # Stage 3: Compile response
        print("\nStage 3: Compiling response...")
        response = self.compiler.compile_response(query, mcp_results, query_location)
        print("βœ“ Response compiled")
        
        print(f"\n{'='*60}\n")
        
        return {
            "response": response,
            "location": query_location,
            "mcp_results": mcp_results
        }
    
    def update_location(self, location: Dict[str, float]):
        """Update default location for pipeline."""
        self.location = location
        print(f"βœ“ Default location updated to: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
    
    def get_server_status(self) -> Dict[str, str]:
        """Get status of all MCP servers."""
        status = {}
        for server_name, server in self.servers.items():
            try:
                if hasattr(server, 'health_check'):
                    status[server_name] = "available" if server.health_check() else "unavailable"
                else:
                    status[server_name] = "available"
            except:
                status[server_name] = "unavailable"
        return status