akashub
feat(v1): Added pipeline.py, compiler.py
b522293
raw
history blame
2.84 kB
"""
Complete Multi-Stage MCP Pipeline
Orchestrates: Router β†’ Executor β†’ Compiler β†’ Translator
"""
import time
from typing import Dict, Any
from openai import OpenAI
from .router import QueryRouter
from .executor import MCPExecutor, MCP_SERVER_REGISTRY
from .compiler import ResponseCompiler
from .translator import FarmerTranslator
class FarmerChatPipeline:
"""Complete multi-stage MCP pipeline"""
def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
self.location = location
self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
self.executor = MCPExecutor()
self.compiler = ResponseCompiler()
self.translator = FarmerTranslator(openai_client)
async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
"""
Process farmer query through complete pipeline
Returns:
{
"query": str,
"routing": dict,
"compiled_data": dict,
"advice": str,
"pipeline_time_seconds": float
}
"""
if verbose:
print(f"\n🌾 Processing: {query}")
print(f"πŸ“ Location: {self.location['name']}")
pipeline_start = time.time()
# STAGE 1: Query Routing
if verbose:
print("🎯 Stage 1: Routing...")
routing = self.router.route(query, self.location)
if verbose:
print(f" β†’ Servers: {', '.join(routing['required_servers'])}")
# STAGE 2: MCP Execution (Parallel)
if verbose:
print("βš™οΈ Stage 2: Executing MCP servers...")
raw_results = await self.executor.execute_parallel(
routing['required_servers'],
self.location['lat'],
self.location['lon']
)
if verbose:
print(f" β†’ Completed in {raw_results['execution_time_seconds']}s")
# STAGE 3: Response Compilation
if verbose:
print("πŸ”— Stage 3: Compiling results...")
compiled = self.compiler.compile(raw_results)
if verbose:
print(f" β†’ {compiled['completeness']}")
# STAGE 4: Farmer Translation
if verbose:
print("🌾 Stage 4: Generating advice...")
farmer_advice = self.translator.translate(query, compiled, self.location)
pipeline_time = time.time() - pipeline_start
if verbose:
print(f"βœ… Complete! Total: {pipeline_time:.2f}s\n")
return {
"query": query,
"routing": routing,
"compiled_data": compiled,
"advice": farmer_advice,
"pipeline_time_seconds": round(pipeline_time, 2)
}