Spaces:
Sleeping
Sleeping
File size: 2,835 Bytes
b522293 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
"""
Complete Multi-Stage MCP Pipeline
Orchestrates: Router β Executor β Compiler β Translator
"""
import time
from typing import Dict, Any
from openai import OpenAI
from .router import QueryRouter
from .executor import MCPExecutor, MCP_SERVER_REGISTRY
from .compiler import ResponseCompiler
from .translator import FarmerTranslator
class FarmerChatPipeline:
"""Complete multi-stage MCP pipeline"""
def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
self.location = location
self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
self.executor = MCPExecutor()
self.compiler = ResponseCompiler()
self.translator = FarmerTranslator(openai_client)
async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
"""
Process farmer query through complete pipeline
Returns:
{
"query": str,
"routing": dict,
"compiled_data": dict,
"advice": str,
"pipeline_time_seconds": float
}
"""
if verbose:
print(f"\nπΎ Processing: {query}")
print(f"π Location: {self.location['name']}")
pipeline_start = time.time()
# STAGE 1: Query Routing
if verbose:
print("π― Stage 1: Routing...")
routing = self.router.route(query, self.location)
if verbose:
print(f" β Servers: {', '.join(routing['required_servers'])}")
# STAGE 2: MCP Execution (Parallel)
if verbose:
print("βοΈ Stage 2: Executing MCP servers...")
raw_results = await self.executor.execute_parallel(
routing['required_servers'],
self.location['lat'],
self.location['lon']
)
if verbose:
print(f" β Completed in {raw_results['execution_time_seconds']}s")
# STAGE 3: Response Compilation
if verbose:
print("π Stage 3: Compiling results...")
compiled = self.compiler.compile(raw_results)
if verbose:
print(f" β {compiled['completeness']}")
# STAGE 4: Farmer Translation
if verbose:
print("πΎ Stage 4: Generating advice...")
farmer_advice = self.translator.translate(query, compiled, self.location)
pipeline_time = time.time() - pipeline_start
if verbose:
print(f"β
Complete! Total: {pipeline_time:.2f}s\n")
return {
"query": query,
"routing": routing,
"compiled_data": compiled,
"advice": farmer_advice,
"pipeline_time_seconds": round(pipeline_time, 2)
} |