Spaces:
Running
Running
File size: 5,574 Bytes
6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a 424a0c8 6afc01a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
"""
Farmer.chat Pipeline - Main Orchestrator
Coordinates Router → Executor → Compiler stages for alert generation
FIXED: Proper async handling for FastAPI integration
"""
from typing import Dict, Any, Optional
from .router import QueryRouter
from .executor import MCPExecutor
from .compiler import ResponseCompiler
class FarmerChatPipeline:
"""
Main pipeline orchestrating the complete alert generation workflow.
Architecture:
Stage 1: QueryRouter - Determines which MCP servers to query
Stage 2: MCPExecutor - Executes parallel calls to MCP servers
Stage 3: ResponseCompiler - Compiles results into actionable alert summary
"""
def __init__(self, servers: Dict[str, Any], location: Dict[str, float]):
"""
Initialize pipeline with MCP servers and default location.
Args:
servers: Dict mapping server names to initialized server instances
location: Default location dict with 'latitude' and 'longitude' keys
"""
self.servers = servers
self.location = location
# Initialize pipeline stages
self.router = QueryRouter()
self.executor = MCPExecutor(servers)
self.compiler = ResponseCompiler()
print(f"✓ Pipeline initialized for location: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
async def generate_alert(
self,
location: Optional[Dict[str, float]] = None,
location_name: str = ""
) -> Dict[str, Any]:
"""
Generate comprehensive alert summary for a location.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Generating Alert Summary for {location_name or 'Location'}")
print(f"Coordinates: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
print(f"{'='*60}\n")
# Stage 1: Route - Always query all servers for alerts
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_alert_query(query_location)
print(f"✓ Routing complete: All 5 servers will be queried")
# Stage 2: Execute - Query all MCP servers in parallel (ASYNC)
print("\nStage 2: Executing parallel MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded successfully")
# Stage 3: Compile - Extract ONLY alerting information
print("\nStage 3: Compiling alert summary...")
alert_summary = self.compiler.compile_alert_summary(
mcp_results,
query_location,
location_name
)
print("✓ Alert summary generated")
print(f"\n{'='*60}\n")
return {
"alert_summary": alert_summary,
"location": query_location,
"location_name": location_name,
"mcp_results": mcp_results
}
async def process_query(
self,
query: str,
location: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Process a specific farmer query through the pipeline.
ASYNC version for FastAPI endpoints.
"""
query_location = location or self.location
print(f"\n{'='*60}")
print(f"Processing Query: {query}")
print(f"Location: {query_location['latitude']:.4f}°N, {query_location['longitude']:.4f}°E")
print(f"{'='*60}\n")
# Stage 1: Route
print("Stage 1: Routing to all MCP servers...")
routing = self.router.route_query(query, query_location)
print(f"✓ Routing complete")
# Stage 2: Execute MCP calls (ASYNC)
print("\nStage 2: Executing MCP server calls...")
mcp_results = await self.executor.execute_parallel_async(routing, query_location)
success_count = sum(1 for r in mcp_results.values() if r.get("status") == "success")
print(f"✓ Execution complete: {success_count}/{len(mcp_results)} servers responded")
# Stage 3: Compile response
print("\nStage 3: Compiling response...")
response = self.compiler.compile_response(query, mcp_results, query_location)
print("✓ Response compiled")
print(f"\n{'='*60}\n")
return {
"response": response,
"location": query_location,
"mcp_results": mcp_results
}
def update_location(self, location: Dict[str, float]):
"""Update default location for pipeline."""
self.location = location
print(f"✓ Default location updated to: {location['latitude']:.4f}°N, {location['longitude']:.4f}°E")
def get_server_status(self) -> Dict[str, str]:
"""Get status of all MCP servers."""
status = {}
for server_name, server in self.servers.items():
try:
if hasattr(server, 'health_check'):
status[server_name] = "available" if server.health_check() else "unavailable"
else:
status[server_name] = "available"
except:
status[server_name] = "unavailable"
return status |