Spaces:
Running
Running
File size: 8,553 Bytes
6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# """
# Complete Multi-Stage MCP Pipeline
# Orchestrates: Router β Executor β Compiler β Translator
# """
# import time
# from typing import Dict, Any
# from openai import OpenAI
# from .router import QueryRouter
# from .executor import MCPExecutor, MCP_SERVER_REGISTRY
# from .compiler import ResponseCompiler
# from .translator import FarmerTranslator
# class FarmerChatPipeline:
# """Complete multi-stage MCP pipeline"""
# def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
# self.location = location
# self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
# self.executor = MCPExecutor()
# self.compiler = ResponseCompiler()
# self.translator = FarmerTranslator(openai_client)
# async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
# """
# Process farmer query through complete pipeline
# Returns:
# {
# "query": str,
# "routing": dict,
# "compiled_data": dict,
# "advice": str,
# "pipeline_time_seconds": float
# }
# """
# if verbose:
# print(f"\nπΎ Processing: {query}")
# print(f"π Location: {self.location['name']}")
# pipeline_start = time.time()
# # STAGE 1: Query Routing
# if verbose:
# print("π― Stage 1: Routing...")
# routing = self.router.route(query, self.location)
# if verbose:
# print(f" β Servers: {', '.join(routing['required_servers'])}")
# # STAGE 2: MCP Execution (Parallel)
# if verbose:
# print("βοΈ Stage 2: Executing MCP servers...")
# raw_results = await self.executor.execute_parallel(
# routing['required_servers'],
# self.location['lat'],
# self.location['lon']
# )
# if verbose:
# print(f" β Completed in {raw_results['execution_time_seconds']}s")
# # STAGE 3: Response Compilation
# if verbose:
# print("π Stage 3: Compiling results...")
# compiled = self.compiler.compile(raw_results)
# if verbose:
# print(f" β {compiled['completeness']}")
# # STAGE 4: Farmer Translation
# if verbose:
# print("πΎ Stage 4: Generating advice...")
# farmer_advice = self.translator.translate(query, compiled, self.location)
# pipeline_time = time.time() - pipeline_start
# if verbose:
# print(f"β
Complete! Total: {pipeline_time:.2f}s\n")
# return {
# "query": query,
# "routing": routing,
# "compiled_data": compiled,
# "advice": farmer_advice,
# "pipeline_time_seconds": round(pipeline_time, 2)
# }
"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router β Executor β Compiler stages for alert generation
FIXED: Proper async handling for FastAPI integration
"""
from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler
class FarmerChatPipeline:
"""
Main pipeline orchestrating the complete alert generation workflow.
Architecture:
Stage 1: QueryRouter - Determines which MCP servers to query
Stage 2: MCPExecutor - Executes parallel calls to MCP servers
Stage 3: ResponseCompiler - Compiles results into actionable alert summary
"""
def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
"""
Initialize pipeline with MCP servers and default location.
Args:
servers: Dict mapping server names to initialized server instances
location: Default location dict with 'latitude' and 'longitude' keys
"""
self.servers = servers
self.location = location
# Initialize pipeline stages
self.router = QueryRouter()
self.executor = MCPExecutor(servers)
self.compiler = ResponseCompiler()
print(f"β Pipeline initialized for location: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
async def generate_alert(
self,
location: Optional[Dict[str, float]] = None,
location_name: str = ""
) -> Dict[str, Any]:
"""
Generate comprehensive alert summary for a location.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Generating Alert Summary for {location_name or 'Location'}")
print(f"Coordinates: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
print(f"{'='*60}\n")
# Stage 1: Route - Always query all servers for alerts
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_alert_query(query_location)
print(f"β Routing complete: All 5 servers will be queried")
# Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
print("\nStage 2: Executing parallel MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"β Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
# Stage 3: Compile - Extract ONLY alerting information
print("\nStage 3: Compiling alert summary...")
alert_summary = self.compiler.compile_alert_summary(
mcp_results,
query_location,
location_name
)
print("β Alert summary generated")
print(f"\n{'='*60}\n")
return {
"alert_summary": alert_summary,
"location": query_location,
"location_name": location_name,
"mcp_results": mcp_results
}
async def process_query(
self,
query: str,
location: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Process a specific farmer query through the pipeline.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Processing Query: {query}")
print(f"Location: {query_location['latitude']:.4f}Β°N, {query_location['longitude']:.4f}Β°E")
print(f"{'='*60}\n")
# Stage 1: Route
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_query(query, query_location)
print(f"β Routing complete")
# Stage 2: Execute MCP calls (ASYNC)
print("\nStage 2: Executing MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"β Execution complete: {success_count}/{len(mcp_results)} servers responded")
# Stage 3: Compile response
print("\nStage 3: Compiling response...")
response = self.compiler.compile_response(query, mcp_results, query_location)
print("β Response compiled")
print(f"\n{'='*60}\n")
return {
"response": response,
"location": query_location,
"mcp_results": mcp_results
}
def update_location(self, location: Dict[str, float]):
"""Update default location for pipeline."""
self.location = location
print(f"β Default location updated to: {location['latitude']:.4f}Β°N, {location['longitude']:.4f}Β°E")
def get_server_status(self) -> Dict[str, str]:
"""Get status of all MCP servers."""
status = {}
for server_name, server in self.servers.items():
try:
if hasattr(server, 'health_check'):
status[server_name] = "available" if server.health_check() else "unavailable"
else:
status[server_name] = "available"
except:
status[server_name] = "unavailable"
return status |