aakashdg commited on
Commit
424a0c8
·
verified ·
1 Parent(s): b1d2ecb

fix (double wrapping of server results)

Browse files
Files changed (1) hide show
  1. src/pipeline.py +18 -57
src/pipeline.py CHANGED
@@ -95,6 +95,7 @@
95
  """
96
  Farmer.chat Pipeline - Main Orchestrator
97
  Coordinates Router → Executor → Compiler stages for alert generation
 
98
  """
99
 
100
  from typing import Dict, Any, Optional
@@ -108,8 +109,8 @@ class FarmerChatPipeline:
108
  Main pipeline orchestrating the complete alert generation workflow.
109
 
110
  Architecture:
111
- Stage 1: QueryRouter - Determines which MCP servers to query with priorities
112
- Stage 2: MCPExecutor - Executes parallel calls to selected MCP servers
113
  Stage 3: ResponseCompiler - Compiles results into actionable alert summary
114
  """
115
 
@@ -131,29 +132,15 @@ class FarmerChatPipeline:
131
 
132
  print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
133
 
134
- def generate_alert(
135
  self,
136
  location: Optional[Dict[str, float]] = None,
137
  location_name: str = ""
138
  ) -> Dict[str, Any]:
139
  """
140
  Generate comprehensive alert summary for a location.
141
-
142
- Always queries ALL MCP servers. The compiler extracts only
143
- the alerting/concerning information from the comprehensive data.
144
-
145
- Args:
146
- location: Optional location override. Uses default if not provided.
147
- location_name: Human-readable location name for context
148
-
149
- Returns:
150
- Dict containing:
151
- - alert_summary: Compiled alert text (only concerning info)
152
- - location: Location coordinates used
153
- - mcp_results: Raw results from each MCP server
154
  """
155
-
156
- # Use provided location or default
157
  query_location = location or self.location
158
 
159
  print(f"\n{'='*60}")
@@ -166,21 +153,21 @@ class FarmerChatPipeline:
166
  routing = self.router.route_alert_query(query_location)
167
  print(f"✓ Routing complete: All 5 servers will be queried")
168
 
169
- # Stage 2: Execute - Query all MCP servers in parallel
170
  print("\nStage 2: Executing parallel MCP server calls...")
171
- mcp_results = self.executor.execute_parallel(routing, query_location)
172
 
173
  success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
174
  print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
175
 
176
  # Stage 3: Compile - Extract ONLY alerting information
177
- print("\nStage 3: Compiling alert summary (extracting concerning info only)...")
178
  alert_summary = self.compiler.compile_alert_summary(
179
  mcp_results,
180
  query_location,
181
  location_name
182
  )
183
- print("✓ Alert summary generated (focusing on alerts/concerns only)")
184
 
185
  print(f"\n{'='*60}\n")
186
 
@@ -191,29 +178,15 @@ class FarmerChatPipeline:
191
  "mcp_results": mcp_results
192
  }
193
 
194
- def process_query(
195
  self,
196
  query: str,
197
  location: Optional[Dict[str, float]] = None
198
  ) -> Dict[str, Any]:
199
  """
200
  Process a specific farmer query through the pipeline.
201
-
202
- For now, also queries all servers to ensure comprehensive data.
203
- The compiler extracts query-relevant information.
204
-
205
- Args:
206
- query: Farmer's question or request
207
- location: Optional location override
208
-
209
- Returns:
210
- Dict containing:
211
- - response: Compiled response text
212
- - location: Location coordinates used
213
- - mcp_results: Raw results from MCP servers
214
  """
215
-
216
- # Use provided location or default
217
  query_location = location or self.location
218
 
219
  print(f"\n{'='*60}")
@@ -221,14 +194,14 @@ class FarmerChatPipeline:
221
  print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
222
  print(f"{'='*60}\n")
223
 
224
- # Stage 1: Route - For now, query all servers
225
  print("Stage 1: Routing to all MCP servers...")
226
  routing = self.router.route_query(query, query_location)
227
- print(f"✓ Routing complete: All servers will be queried")
228
 
229
- # Stage 2: Execute MCP calls
230
  print("\nStage 2: Executing MCP server calls...")
231
- mcp_results = self.executor.execute_parallel(routing, query_location)
232
 
233
  success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
234
  print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded")
@@ -247,31 +220,19 @@ class FarmerChatPipeline:
247
  }
248
 
249
  def update_location(self, location: Dict[str, float]):
250
- """
251
- Update default location for pipeline.
252
-
253
- Args:
254
- location: New default location dict with 'latitude' and 'longitude' keys
255
- """
256
  self.location = location
257
  print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
258
 
259
  def get_server_status(self) -> Dict[str, str]:
260
- """
261
- Get status of all MCP servers.
262
-
263
- Returns:
264
- Dict mapping server names to "available" or "unavailable"
265
- """
266
  status = {}
267
  for server_name, server in self.servers.items():
268
  try:
269
- # Try to check if server is responsive
270
  if hasattr(server, 'health_check'):
271
  status[server_name] = "available" if server.health_check() else "unavailable"
272
  else:
273
- status[server_name] = "available" # Assume available if no health check
274
  except:
275
  status[server_name] = "unavailable"
276
-
277
  return status
 
95
  """
96
  Farmer.chat Pipeline - Main Orchestrator
97
  Coordinates Router → Executor → Compiler stages for alert generation
98
+ FIXED: Proper async handling for FastAPI integration
99
  """
100
 
101
  from typing import Dict, Any, Optional
 
109
  Main pipeline orchestrating the complete alert generation workflow.
110
 
111
  Architecture:
112
+ Stage 1: QueryRouter - Determines which MCP servers to query
113
+ Stage 2: MCPExecutor - Executes parallel calls to MCP servers
114
  Stage 3: ResponseCompiler - Compiles results into actionable alert summary
115
  """
116
 
 
132
 
133
  print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
134
 
135
+ async def generate_alert(
136
  self,
137
  location: Optional[Dict[str, float]] = None,
138
  location_name: str = ""
139
  ) -> Dict[str, Any]:
140
  """
141
  Generate comprehensive alert summary for a location.
142
+ ASYNC version for FastAPI endpoints.
 
 
 
 
 
 
 
 
 
 
 
 
143
  """
 
 
144
  query_location = location or self.location
145
 
146
  print(f"\n{'='*60}")
 
153
  routing = self.router.route_alert_query(query_location)
154
  print(f"✓ Routing complete: All 5 servers will be queried")
155
 
156
+ # Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
157
  print("\nStage 2: Executing parallel MCP server calls...")
158
+ mcp_results = await self.executor.execute_parallel_async(routing, query_location)
159
 
160
  success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
161
  print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
162
 
163
  # Stage 3: Compile - Extract ONLY alerting information
164
+ print("\nStage 3: Compiling alert summary...")
165
  alert_summary = self.compiler.compile_alert_summary(
166
  mcp_results,
167
  query_location,
168
  location_name
169
  )
170
+ print("✓ Alert summary generated")
171
 
172
  print(f"\n{'='*60}\n")
173
 
 
178
  "mcp_results": mcp_results
179
  }
180
 
181
+ async def process_query(
182
  self,
183
  query: str,
184
  location: Optional[Dict[str, float]] = None
185
  ) -> Dict[str, Any]:
186
  """
187
  Process a specific farmer query through the pipeline.
188
+ ASYNC version for FastAPI endpoints.
 
 
 
 
 
 
 
 
 
 
 
 
189
  """
 
 
190
  query_location = location or self.location
191
 
192
  print(f"\n{'='*60}")
 
194
  print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
195
  print(f"{'='*60}\n")
196
 
197
+ # Stage 1: Route
198
  print("Stage 1: Routing to all MCP servers...")
199
  routing = self.router.route_query(query, query_location)
200
+ print(f"✓ Routing complete")
201
 
202
+ # Stage 2: Execute MCP calls (ASYNC)
203
  print("\nStage 2: Executing MCP server calls...")
204
+ mcp_results = await self.executor.execute_parallel_async(routing, query_location)
205
 
206
  success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
207
  print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded")
 
220
  }
221
 
222
  def update_location(self, location: Dict[str, float]):
223
+ """Update default location for pipeline."""
 
 
 
 
 
224
  self.location = location
225
  print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
226
 
227
  def get_server_status(self) -> Dict[str, str]:
228
+ """Get status of all MCP servers."""
 
 
 
 
 
229
  status = {}
230
  for server_name, server in self.servers.items():
231
  try:
 
232
  if hasattr(server, 'health_check'):
233
  status[server_name] = "available" if server.health_check() else "unavailable"
234
  else:
235
+ status[server_name] = "available"
236
  except:
237
  status[server_name] = "unavailable"
 
238
  return status