File size: 2,835 Bytes
b522293
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Complete Multi-Stage MCP Pipeline
Orchestrates: Router β†’ Executor β†’ Compiler β†’ Translator
"""

import time
from typing import Dict, Any
from openai import OpenAI

from .router import QueryRouter
from .executor import MCPExecutor, MCP_SERVER_REGISTRY
from .compiler import ResponseCompiler
from .translator import FarmerTranslator


class FarmerChatPipeline:
    """Complete multi-stage MCP pipeline"""

    def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
        self.location = location
        self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
        self.executor = MCPExecutor()
        self.compiler = ResponseCompiler()
        self.translator = FarmerTranslator(openai_client)

    async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
        """
        Process farmer query through complete pipeline
        
        Returns:
            {
                "query": str,
                "routing": dict,
                "compiled_data": dict,
                "advice": str,
                "pipeline_time_seconds": float
            }
        """
        if verbose:
            print(f"\n🌾 Processing: {query}")
            print(f"πŸ“ Location: {self.location['name']}")

        pipeline_start = time.time()

        # STAGE 1: Query Routing
        if verbose:
            print("🎯 Stage 1: Routing...")
        
        routing = self.router.route(query, self.location)
        
        if verbose:
            print(f"   β†’ Servers: {', '.join(routing['required_servers'])}")

        # STAGE 2: MCP Execution (Parallel)
        if verbose:
            print("βš™οΈ Stage 2: Executing MCP servers...")
        
        raw_results = await self.executor.execute_parallel(
            routing['required_servers'],
            self.location['lat'],
            self.location['lon']
        )
        
        if verbose:
            print(f"   β†’ Completed in {raw_results['execution_time_seconds']}s")

        # STAGE 3: Response Compilation
        if verbose:
            print("πŸ”— Stage 3: Compiling results...")
        
        compiled = self.compiler.compile(raw_results)
        
        if verbose:
            print(f"   β†’ {compiled['completeness']}")

        # STAGE 4: Farmer Translation
        if verbose:
            print("🌾 Stage 4: Generating advice...")
        
        farmer_advice = self.translator.translate(query, compiled, self.location)

        pipeline_time = time.time() - pipeline_start

        if verbose:
            print(f"βœ… Complete! Total: {pipeline_time:.2f}s\n")

        return {
            "query": query,
            "routing": routing,
            "compiled_data": compiled,
            "advice": farmer_advice,
            "pipeline_time_seconds": round(pipeline_time, 2)
        }