nivakaran commited on
Commit
7e66c6c
·
verified ·
1 Parent(s): 77017c6

Create masterNode.py

Browse files
Files changed (1) hide show
  1. src/nodes/masterNode.py +128 -0
src/nodes/masterNode.py ADDED
@@ -0,0 +1,128 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langchain_core.messages import SystemMessage, HumanMessage
2
+ from src.llms.groqllm import GroqLLM
3
+ from src.states.masterState import MasterState, ExecutorState
4
+ from src.nodes.actionNode import ExecutorNode
5
+ import asyncio
6
+ from typing import List, Dict
7
+ from src.utils.prompts import master_agent_prompt
8
+ from src.states.masterState import PlannerOutput
9
+ from langgraph.constants import Send
10
+ from src.graphs.actionGraph import graph
11
+
12
+ class MasterOrchestrator:
13
+ def __init__(self, llm):
14
+ self.llm = llm
15
+ self.master_planner = llm.with_structured_output(PlannerOutput)
16
+ self.compiled_worker_graph = graph
17
+
18
+ def classify_execution_job(self, job_description: str) -> str:
19
+ """Map job description to specific action identifier - now works with MongoDB tools"""
20
+ job_lower = job_description.lower()
21
+
22
+ # Updated to match the new MongoDB tools
23
+ if 'track' in job_lower and ('package' in job_lower or 'parcel' in job_lower or 'shipment' in job_lower):
24
+ return 'track_package'
25
+ elif 'user' in job_lower and ('info' in job_lower or 'information' in job_lower or 'details' in job_lower):
26
+ return 'get_user_information'
27
+ elif 'estimate' in job_lower and ('time' in job_lower or 'delivery' in job_lower):
28
+ return 'estimated_time_analysis'
29
+ elif 'search' in job_lower and 'package' in job_lower:
30
+ return 'search_packages'
31
+ elif 'alert' in job_lower or 'service' in job_lower:
32
+ return 'get_service_alerts'
33
+ else:
34
+ return 'general_query'
35
+
36
+ def orchestrator(self, state: MasterState):
37
+ """Generate a plan by breaking down the query into execution jobs"""
38
+
39
+ system_prompt = """You are a master task planner for a parcel logistics system. Given a query, break it down into specific, actionable execution jobs.
40
+
41
+ Available specialized tools include:
42
+ - Track packages using tracking numbers
43
+ - Get user information and shipping history
44
+ - Estimate delivery times between locations
45
+ - Search packages by various criteria
46
+ - Check service alerts and delays
47
+
48
+ Each job should be:
49
+ 1. Clear and specific
50
+ 2. Actionable by a specialized worker with database access
51
+ 3. Independent or clearly sequenced
52
+ 4. Focused on a single logistics objective
53
+
54
+ Return a list of execution jobs as strings that can be completed using the available parcel logistics tools."""
55
+
56
+ planner_result = self.master_planner.invoke([
57
+ SystemMessage(content=system_prompt),
58
+ HumanMessage(content=f"Here is the query brief: {state['query_brief']}")
59
+ ])
60
+
61
+ print("MongoDB-aware Execution Jobs Generated:", planner_result.executor_jobs)
62
+ return {"execution_jobs": planner_result.executor_jobs}
63
+
64
+ def worker_executor(self, worker_input: dict):
65
+ """Execute a single job using the MongoDB-enabled worker graph"""
66
+
67
+ job_description = worker_input["execution_job"]
68
+ action_type = self.classify_execution_job(job_description)
69
+
70
+ # Prepare the initial state for the worker
71
+ worker_state = {
72
+ "executor_messages": [HumanMessage(content=job_description)],
73
+ "execution_job": action_type, # This maps to MongoDB tool names
74
+ "executor_data": []
75
+ }
76
+
77
+ print(f"Executing MongoDB job: {job_description} -> Action: {action_type}")
78
+
79
+ # Execute the worker graph with MongoDB tools
80
+ try:
81
+ result = self.compiled_worker_graph.invoke(worker_state)
82
+
83
+ # Return the completed job info
84
+ return {
85
+ "completed_jobs": [f"Job: {job_description} - Action: {action_type} - Status: Completed (MongoDB)"],
86
+ "worker_outputs": [result]
87
+ }
88
+ except Exception as e:
89
+ error_result = {
90
+ "output": f"Error executing MongoDB job: {str(e)}",
91
+ "executor_data": [f"MongoDB Error: {str(e)}"],
92
+ "executor_messages": []
93
+ }
94
+ return {
95
+ "completed_jobs": [f"Job: {job_description} - Action: {action_type} - Status: Failed - {str(e)}"],
96
+ "worker_outputs": [error_result]
97
+ }
98
+
99
+ def assign_workers(self, state: MasterState):
100
+ """Assign a worker to each execution job using Send"""
101
+ return [
102
+ Send("worker_executor", {"execution_job": job})
103
+ for job in state["execution_jobs"]
104
+ ]
105
+
106
+ def synthesizer(self, state: MasterState):
107
+ """Combine all completed jobs into a final output"""
108
+
109
+ # Create a synthesis prompt
110
+ synthesis_prompt = f"""
111
+ Original Customer Query: {state['query_brief']}
112
+
113
+ Completed Database Operations Summary:
114
+ {chr(10).join([f"- {job}" for job in state['completed_jobs']])}
115
+
116
+ Detailed Results from MongoDB Tools:
117
+ {chr(10).join([f"Result {i+1}: {output.get('output', 'No output')}" for i, output in enumerate(state['worker_outputs'])])}
118
+
119
+ Please synthesize all the database results into a comprehensive, friendly response that addresses the original customer query.
120
+ Focus on providing helpful, actionable information from the logistics database.
121
+ """
122
+
123
+ synthesis_result = self.llm.invoke([
124
+ SystemMessage(content="You are a friendly synthesis expert. Combine the MongoDB database results into a helpful final response for the customer."),
125
+ HumanMessage(content=synthesis_prompt)
126
+ ])
127
+
128
+ return {"final_output": synthesis_result.content}