File size: 4,172 Bytes
782bbd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from langchain_core.messages import SystemMessage, HumanMessage
from src.llms.groqllm import GroqLLM
from src.states.masterState import MasterState, ExecutorState
from src.nodes.actionNode import ExecutorNode
import asyncio
from typing import List, Dict
from src.utils.prompts import master_agent_prompt
from src.utils.utils import get_today_str
from src.states.masterState import PlannerOutput
from langgraph.constants import Send
from src.graphs.actionGraph import graph

class MasterOrchestrator:
    def __init__(self, llm):
        self.llm = llm
        self.master_planner = llm.with_structured_output(PlannerOutput)
        self.compiled_worker_graph = graph
        self.master_agent_prompt_template = master_agent_prompt

    def orchestrator(self, state: MasterState):
        """Generate a plan by breaking down the query into execution jobs"""
        
        system_prompt = """You are a master task planner. Given a query, break it down into specific, actionable execution jobs.
        
        Each job should be:
        1. Clear and specific
        2. Actionable by a specialized worker
        3. Independent or clearly sequenced
        4. Focused on a single objective
        
        Return a list of execution jobs as strings."""
        
        planner_result = self.master_planner.invoke([
            SystemMessage(content=system_prompt),
            HumanMessage(content=f"Here is the query brief: {state['query_brief']}")
        ])

        print("Execution Jobs Generated:", planner_result.executor_jobs)
        return {"execution_jobs": planner_result.executor_jobs}

    def worker_executor(self, worker_input: dict):
        """Execute a single job using the worker graph"""
        
        job_description = worker_input["execution_job"]
        
        # Prepare the initial state for the worker
        # Pass the full job description as the execution_job - the worker will use available tools
        worker_state = {
            "executor_messages": [HumanMessage(content=job_description)],
            "execution_job": job_description,  # Pass the full job description
            "executor_data": []
        }
        
        print(f"Executing job: {job_description}")
        
        # Execute the worker graph
        try:
            result = self.compiled_worker_graph.invoke(worker_state)
            
            # Return the completed job info
            return {
                "completed_jobs": [f"Job: {job_description} - Status: Completed"],
                "worker_outputs": [result]
            }
        except Exception as e:
            error_result = {
                "output": f"Error executing job: {str(e)}",
                "executor_data": [f"Error: {str(e)}"],
                "executor_messages": []
            }
            return {
                "completed_jobs": [f"Job: {job_description} - Status: Failed - {str(e)}"],
                "worker_outputs": [error_result]
            }

    def assign_workers(self, state: MasterState):
        """Assign a worker to each execution job using Send"""
        return [
            Send("worker_executor", {"execution_job": job}) 
            for job in state["execution_jobs"]
        ]

    def synthesizer(self, state: MasterState):
        """Combine all completed jobs into a final output"""
        
        # Create a synthesis prompt
        synthesis_prompt = f"""
        Original Query: {state['query_brief']}
        
        Completed Jobs Summary:
        {chr(10).join([f"- {job}" for job in state['completed_jobs']])}
        
        Detailed Worker Outputs:
        {chr(10).join([f"Output {i+1}: {output.get('output', 'No output')}" for i, output in enumerate(state['worker_outputs'])])}
        
        Please synthesize all the work into a comprehensive final response that addresses the original query.
        """
        
        synthesis_result = self.llm.invoke([
            SystemMessage(content="You are a synthesis expert. Combine the worker outputs into a coherent final response."),
            HumanMessage(content=synthesis_prompt)
        ])
        
        return {"final_output": synthesis_result.content}