akashub commited on
Commit
b522293
Β·
1 Parent(s): f4ffd7e

feat(v1): Added pipeline.py, compiler.py

Browse files
Files changed (3) hide show
  1. requirements.txt +31 -0
  2. src/compiler.py +57 -0
  3. src/pipeline.py +93 -0
requirements.txt CHANGED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Farmer.Chat Backend Dependencies
2
+
3
+ # Web Framework
4
+ fastapi==0.109.0
5
+ uvicorn[standard]==0.27.0
6
+ python-multipart==0.0.6
7
+
8
+ # OpenAI
9
+ openai==1.10.0
10
+
11
+ # Async HTTP
12
+ aiohttp==3.9.1
13
+ aiofiles==23.2.1
14
+
15
+ # Data Processing
16
+ pandas==2.1.4
17
+ numpy==1.26.3
18
+ xarray==2023.12.0
19
+ netCDF4==1.6.5
20
+
21
+ # PDF Generation
22
+ reportlab==4.0.9
23
+ pillow==10.2.0
24
+
25
+ # Utilities
26
+ python-dotenv==1.0.0
27
+ pydantic==2.5.3
28
+ pydantic-settings==2.1.0
29
+
30
+ # Requests (for sync operations)
31
+ requests==2.31.0
src/compiler.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Stage 3: Response Compiler - Data Fusion
3
+ """
4
+
5
+ from typing import Dict, Any, List
6
+
7
+
8
+ class ResponseCompiler:
9
+ """Stage 3: Compile results from multiple servers"""
10
+
11
+ def compile(self, raw_results: Dict[str, Any]) -> Dict[str, Any]:
12
+ """
13
+ Merge results into structured format
14
+
15
+ Args:
16
+ raw_results: Dictionary containing results from MCPExecutor
17
+ {
18
+ "results": {
19
+ "weather": {"status": "success", "data": {...}},
20
+ "soil_properties": {"status": "success", "data": {...}},
21
+ ...
22
+ },
23
+ "execution_time_seconds": 3.5
24
+ }
25
+
26
+ Returns:
27
+ {
28
+ "successful_servers": List[str],
29
+ "failed_servers": List[dict],
30
+ "data": Dict[str, Any],
31
+ "execution_time": float,
32
+ "completeness": str
33
+ }
34
+ """
35
+ results_dict = raw_results.get("results", {})
36
+
37
+ successful = []
38
+ failed = []
39
+ compiled_data = {}
40
+
41
+ for server_name, result in results_dict.items():
42
+ if result.get("status") == "success":
43
+ successful.append(server_name)
44
+ compiled_data[server_name] = result.get("data", {})
45
+ else:
46
+ failed.append({
47
+ "server": server_name,
48
+ "error": result.get("error", "Unknown error")
49
+ })
50
+
51
+ return {
52
+ "successful_servers": successful,
53
+ "failed_servers": failed,
54
+ "data": compiled_data,
55
+ "execution_time": raw_results.get("execution_time_seconds", 0),
56
+ "completeness": f"{len(successful)}/{len(results_dict)} servers"
57
+ }
src/pipeline.py CHANGED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Complete Multi-Stage MCP Pipeline
3
+ Orchestrates: Router β†’ Executor β†’ Compiler β†’ Translator
4
+ """
5
+
6
+ import time
7
+ from typing import Dict, Any
8
+ from openai import OpenAI
9
+
10
+ from .router import QueryRouter
11
+ from .executor import MCPExecutor, MCP_SERVER_REGISTRY
12
+ from .compiler import ResponseCompiler
13
+ from .translator import FarmerTranslator
14
+
15
+
16
+ class FarmerChatPipeline:
17
+ """Complete multi-stage MCP pipeline"""
18
+
19
+ def __init__(self, openai_client: OpenAI, location: Dict[str, Any]):
20
+ self.location = location
21
+ self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY)
22
+ self.executor = MCPExecutor()
23
+ self.compiler = ResponseCompiler()
24
+ self.translator = FarmerTranslator(openai_client)
25
+
26
+ async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]:
27
+ """
28
+ Process farmer query through complete pipeline
29
+
30
+ Returns:
31
+ {
32
+ "query": str,
33
+ "routing": dict,
34
+ "compiled_data": dict,
35
+ "advice": str,
36
+ "pipeline_time_seconds": float
37
+ }
38
+ """
39
+ if verbose:
40
+ print(f"\n🌾 Processing: {query}")
41
+ print(f"πŸ“ Location: {self.location['name']}")
42
+
43
+ pipeline_start = time.time()
44
+
45
+ # STAGE 1: Query Routing
46
+ if verbose:
47
+ print("🎯 Stage 1: Routing...")
48
+
49
+ routing = self.router.route(query, self.location)
50
+
51
+ if verbose:
52
+ print(f" β†’ Servers: {', '.join(routing['required_servers'])}")
53
+
54
+ # STAGE 2: MCP Execution (Parallel)
55
+ if verbose:
56
+ print("βš™οΈ Stage 2: Executing MCP servers...")
57
+
58
+ raw_results = await self.executor.execute_parallel(
59
+ routing['required_servers'],
60
+ self.location['lat'],
61
+ self.location['lon']
62
+ )
63
+
64
+ if verbose:
65
+ print(f" β†’ Completed in {raw_results['execution_time_seconds']}s")
66
+
67
+ # STAGE 3: Response Compilation
68
+ if verbose:
69
+ print("πŸ”— Stage 3: Compiling results...")
70
+
71
+ compiled = self.compiler.compile(raw_results)
72
+
73
+ if verbose:
74
+ print(f" β†’ {compiled['completeness']}")
75
+
76
+ # STAGE 4: Farmer Translation
77
+ if verbose:
78
+ print("🌾 Stage 4: Generating advice...")
79
+
80
+ farmer_advice = self.translator.translate(query, compiled, self.location)
81
+
82
+ pipeline_time = time.time() - pipeline_start
83
+
84
+ if verbose:
85
+ print(f"βœ… Complete! Total: {pipeline_time:.2f}s\n")
86
+
87
+ return {
88
+ "query": query,
89
+ "routing": routing,
90
+ "compiled_data": compiled,
91
+ "advice": farmer_advice,
92
+ "pipeline_time_seconds": round(pipeline_time, 2)
93
+ }