|
|
""" |
|
|
Complete Multi-Stage MCP Pipeline |
|
|
Orchestrates: Router β Executor β Compiler β Translator |
|
|
""" |
|
|
|
|
|
import time |
|
|
from typing import Dict, Any |
|
|
from openai import OpenAI |
|
|
|
|
|
from .router import QueryRouter |
|
|
from .executor import MCPExecutor, MCP_SERVER_REGISTRY |
|
|
from .compiler import ResponseCompiler |
|
|
from .translator import FarmerTranslator |
|
|
|
|
|
|
|
|
class FarmerChatPipeline: |
|
|
"""Complete multi-stage MCP pipeline""" |
|
|
|
|
|
def __init__(self, openai_client: OpenAI, location: Dict[str, Any]): |
|
|
self.location = location |
|
|
self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY) |
|
|
self.executor = MCPExecutor() |
|
|
self.compiler = ResponseCompiler() |
|
|
self.translator = FarmerTranslator(openai_client) |
|
|
|
|
|
async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]: |
|
|
""" |
|
|
Process farmer query through complete pipeline |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"query": str, |
|
|
"routing": dict, |
|
|
"compiled_data": dict, |
|
|
"advice": str, |
|
|
"pipeline_time_seconds": float |
|
|
} |
|
|
""" |
|
|
if verbose: |
|
|
print(f"\nπΎ Processing: {query}") |
|
|
print(f"π Location: {self.location['name']}") |
|
|
|
|
|
pipeline_start = time.time() |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print("π― Stage 1: Routing...") |
|
|
|
|
|
routing = self.router.route(query, self.location) |
|
|
|
|
|
if verbose: |
|
|
print(f" β Servers: {', '.join(routing['required_servers'])}") |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print("βοΈ Stage 2: Executing MCP servers...") |
|
|
|
|
|
raw_results = await self.executor.execute_parallel( |
|
|
routing['required_servers'], |
|
|
self.location['lat'], |
|
|
self.location['lon'] |
|
|
) |
|
|
|
|
|
if verbose: |
|
|
print(f" β Completed in {raw_results['execution_time_seconds']}s") |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print("π Stage 3: Compiling results...") |
|
|
|
|
|
compiled = self.compiler.compile(raw_results) |
|
|
|
|
|
if verbose: |
|
|
print(f" β {compiled['completeness']}") |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print("πΎ Stage 4: Generating advice...") |
|
|
|
|
|
farmer_advice = self.translator.translate(query, compiled, self.location) |
|
|
|
|
|
pipeline_time = time.time() - pipeline_start |
|
|
|
|
|
if verbose: |
|
|
print(f"β
Complete! Total: {pipeline_time:.2f}s\n") |
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"routing": routing, |
|
|
"compiled_data": compiled, |
|
|
"advice": farmer_advice, |
|
|
"pipeline_time_seconds": round(pipeline_time, 2) |
|
|
} |