File size: 5,750 Bytes
7e66c6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from langchain_core.messages import SystemMessage, HumanMessage
from src.llms.groqllm import GroqLLM
from src.states.masterState import MasterState, ExecutorState
from src.nodes.actionNode import ExecutorNode
import asyncio
from typing import List, Dict
from src.utils.prompts import master_agent_prompt
from src.states.masterState import PlannerOutput
from langgraph.constants import Send
from src.graphs.actionGraph import graph

class MasterOrchestrator:
    def __init__(self, llm):
        self.llm = llm
        self.master_planner = llm.with_structured_output(PlannerOutput)
        self.compiled_worker_graph = graph

    def classify_execution_job(self, job_description: str) -> str:
        """Map job description to specific action identifier - now works with MongoDB tools"""
        job_lower = job_description.lower()
        
        # Updated to match the new MongoDB tools
        if 'track' in job_lower and ('package' in job_lower or 'parcel' in job_lower or 'shipment' in job_lower):
            return 'track_package'
        elif 'user' in job_lower and ('info' in job_lower or 'information' in job_lower or 'details' in job_lower):
            return 'get_user_information'
        elif 'estimate' in job_lower and ('time' in job_lower or 'delivery' in job_lower):
            return 'estimated_time_analysis'
        elif 'search' in job_lower and 'package' in job_lower:
            return 'search_packages'
        elif 'alert' in job_lower or 'service' in job_lower:
            return 'get_service_alerts'
        else:
            return 'general_query'

    def orchestrator(self, state: MasterState):
        """Generate a plan by breaking down the query into execution jobs"""
        
        system_prompt = """You are a master task planner for a parcel logistics system. Given a query, break it down into specific, actionable execution jobs.
        
        Available specialized tools include:
        - Track packages using tracking numbers
        - Get user information and shipping history
        - Estimate delivery times between locations
        - Search packages by various criteria
        - Check service alerts and delays
        
        Each job should be:
        1. Clear and specific
        2. Actionable by a specialized worker with database access
        3. Independent or clearly sequenced
        4. Focused on a single logistics objective
        
        Return a list of execution jobs as strings that can be completed using the available parcel logistics tools."""
        
        planner_result = self.master_planner.invoke([
            SystemMessage(content=system_prompt),
            HumanMessage(content=f"Here is the query brief: {state['query_brief']}")
        ])

        print("MongoDB-aware Execution Jobs Generated:", planner_result.executor_jobs)
        return {"execution_jobs": planner_result.executor_jobs}

    def worker_executor(self, worker_input: dict):
        """Execute a single job using the MongoDB-enabled worker graph"""
        
        job_description = worker_input["execution_job"]
        action_type = self.classify_execution_job(job_description)
        
        # Prepare the initial state for the worker
        worker_state = {
            "executor_messages": [HumanMessage(content=job_description)],
            "execution_job": action_type,  # This maps to MongoDB tool names
            "executor_data": []
        }
        
        print(f"Executing MongoDB job: {job_description} -> Action: {action_type}")
        
        # Execute the worker graph with MongoDB tools
        try:
            result = self.compiled_worker_graph.invoke(worker_state)
            
            # Return the completed job info
            return {
                "completed_jobs": [f"Job: {job_description} - Action: {action_type} - Status: Completed (MongoDB)"],
                "worker_outputs": [result]
            }
        except Exception as e:
            error_result = {
                "output": f"Error executing MongoDB job: {str(e)}",
                "executor_data": [f"MongoDB Error: {str(e)}"],
                "executor_messages": []
            }
            return {
                "completed_jobs": [f"Job: {job_description} - Action: {action_type} - Status: Failed - {str(e)}"],
                "worker_outputs": [error_result]
            }

    def assign_workers(self, state: MasterState):
        """Assign a worker to each execution job using Send"""
        return [
            Send("worker_executor", {"execution_job": job}) 
            for job in state["execution_jobs"]
        ]

    def synthesizer(self, state: MasterState):
        """Combine all completed jobs into a final output"""
        
        # Create a synthesis prompt
        synthesis_prompt = f"""
        Original Customer Query: {state['query_brief']}
        
        Completed Database Operations Summary:
        {chr(10).join([f"- {job}" for job in state['completed_jobs']])}
        
        Detailed Results from MongoDB Tools:
        {chr(10).join([f"Result {i+1}: {output.get('output', 'No output')}" for i, output in enumerate(state['worker_outputs'])])}
        
        Please synthesize all the database results into a comprehensive, friendly response that addresses the original customer query.
        Focus on providing helpful, actionable information from the logistics database.
        """
        
        synthesis_result = self.llm.invoke([
            SystemMessage(content="You are a friendly synthesis expert. Combine the MongoDB database results into a helpful final response for the customer."),
            HumanMessage(content=synthesis_prompt)
        ])
        
        return {"final_output": synthesis_result.content}