Spaces:
Running
Running
File size: 10,011 Bytes
6afc01a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# """
# Complete Multi-Stage MCP Pipeline
# Orchestrates: Router β Executor β Compiler β Translator
# """
# import time
# from typing import Dict, Any
# from openai import OpenAI
# from .router import QueryRouter
# from .executor import MCPExecutor, MCP_SERVER_REGISTRY
# from .compiler import ResponseCompiler
# from .translator import FarmerTranslator
# class FarmerChatPipeline:
# """Complete multi-stage MCP pipeline"""
# def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
# self.location = location
# self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
# self.executor = MCPExecutor()
# self.compiler = ResponseCompiler()
# self.translator = FarmerTranslator(openai_client)
# async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
# """
# Process farmer query through complete pipeline
# Returns:
# {
# "query": str,
# "routing": dict,
# "compiled_data": dict,
# "advice": str,
# "pipeline_time_seconds": float
# }
# """
# if verbose:
# print(f"\nπΎ Processing: {query}")
# print(f"π Location: {self.location['name']}")
# pipeline_start = time.time()
# # STAGE 1: Query Routing
# if verbose:
# print("π― Stage 1: Routing...")
# routing = self.router.route(query, self.location)
# if verbose:
# print(f" β Servers: {', '.join(routing['required_servers'])}")
# # STAGE 2: MCP Execution (Parallel)
# if verbose:
# print("βοΈ Stage 2: Executing MCP servers...")
# raw_results = await self.executor.execute_parallel(
# routing['required_servers'],
# self.location['lat'],
# self.location['lon']
# )
# if verbose:
# print(f" β Completed in {raw_results['execution_time_seconds']}s")
# # STAGE 3: Response Compilation
# if verbose:
# print("π Stage 3: Compiling results...")
# compiled = self.compiler.compile(raw_results)
# if verbose:
# print(f" β {compiled['completeness']}")
# # STAGE 4: Farmer Translation
# if verbose:
# print("πΎ Stage 4: Generating advice...")
# farmer_advice = self.translator.translate(query, compiled, self.location)
# pipeline_time = time.time() - pipeline_start
# if verbose:
# print(f"β
Complete! Total: {pipeline_time:.2f}s\n")
# return {
# "query": query,
# "routing": routing,
# "compiled_data": compiled,
# "advice": farmer_advice,
# "pipeline_time_seconds": round(pipeline_time, 2)
# }
"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router β Executor β Compiler stages for alert generation
"""
from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler
class FarmerChatPipeline:
"""
Main pipeline orchestrating the complete alert generation workflow.
Architecture:
Stage 1: QueryRouter - Determines which MCP servers to query with priorities
Stage 2: MCPExecutor - Executes parallel calls to selected MCP servers
Stage 3: ResponseCompiler - Compiles results into actionable alert summary
"""
def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
"""
Initialize pipeline with MCP servers and default location.
Args:
servers: Dict mapping server names to initialized server instances
location: Default location dict with 'latitude' and 'longitude' keys
"""
self.servers = servers
self.location = location
# Initialize pipeline stages
self.router = QueryRouter()
self.executor = MCPExecutor(servers)
self.compiler = ResponseCompiler()
print(f"β Pipeline initialized for location: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
def generate_alert(
self,
location: Optional[Dict[str, float]] = None,
location_name: str = ""
) -> Dict[str, Any]:
"""
Generate comprehensive alert summary for a location.
Always queries ALL MCP servers. The compiler extracts only
the alerting/concerning information from the comprehensive data.
Args:
location: Optional location override. Uses default if not provided.
location_name: Human-readable location name for context
Returns:
Dict containing:
- alert_summary: Compiled alert text (only concerning info)
- location: Location coordinates used
- mcp_results: Raw results from each MCP server
"""
# Use provided location or default
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Generating Alert Summary for {location_name or 'Location'}")
print(f"Coordinates: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
print(f"{'='*60}\n")
# Stage 1: Route - Always query all servers for alerts
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_alert_query(query_location)
print(f"β Routing complete: All 5 servers will be queried")
# Stage 2: Execute - Query all MCP servers in parallel
print("\nStage 2: Executing parallel MCP server calls...")
mcp_results = self.executor.execute_parallel(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"β Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
# Stage 3: Compile - Extract ONLY alerting information
print("\nStage 3: Compiling alert summary (extracting concerning info only)...")
alert_summary = self.compiler.compile_alert_summary(
mcp_results,
query_location,
location_name
)
print("β Alert summary generated (focusing on alerts/concerns only)")
print(f"\n{'='*60}\n")
return {
"alert_summary": alert_summary,
"location": query_location,
"location_name": location_name,
"mcp_results": mcp_results
}
def process_query(
self,
query: str,
location: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Process a specific farmer query through the pipeline.
For now, also queries all servers to ensure comprehensive data.
The compiler extracts query-relevant information.
Args:
query: Farmer's question or request
location: Optional location override
Returns:
Dict containing:
- response: Compiled response text
- location: Location coordinates used
- mcp_results: Raw results from MCP servers
"""
# Use provided location or default
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Processing Query: {query}")
print(f"Location: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
print(f"{'='*60}\n")
# Stage 1: Route - For now, query all servers
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_query(query, query_location)
print(f"β Routing complete: All servers will be queried")
# Stage 2: Execute MCP calls
print("\nStage 2: Executing MCP server calls...")
mcp_results = self.executor.execute_parallel(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"β Execution complete: {success_count}/{len(mcp_results)} servers responded")
# Stage 3: Compile response
print("\nStage 3: Compiling response...")
response = self.compiler.compile_response(query, mcp_results, query_location)
print("β Response compiled")
print(f"\n{'='*60}\n")
return {
"response": response,
"location": query_location,
"mcp_results": mcp_results
}
def update_location(self, location: Dict[str, float]):
"""
Update default location for pipeline.
Args:
location: New default location dict with 'latitude' and 'longitude' keys
"""
self.location = location
print(f"β Default location updated to: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
def get_server_status(self) -> Dict[str, str]:
"""
Get status of all MCP servers.
Returns:
Dict mapping server names to "available" or "unavailable"
"""
status = {}
for server_name, server in self.servers.items():
try:
# Try to check if server is responsive
if hasattr(server, 'health_check'):
status[server_name] = "available" if server.health_check() else "unavailable"
else:
status[server_name] = "available" # Assume available if no health check
except:
status[server_name] = "unavailable"
return status |