SparrowAgent / src /nodes /masterNode.py
nivakaran's picture
Create masterNode.py
7e66c6c verified
from langchain_core.messages import SystemMessage, HumanMessage
from src.llms.groqllm import GroqLLM
from src.states.masterState import MasterState, ExecutorState
from src.nodes.actionNode import ExecutorNode
import asyncio
from typing import List, Dict
from src.utils.prompts import master_agent_prompt
from src.states.masterState import PlannerOutput
from langgraph.constants import Send
from src.graphs.actionGraph import graph
class MasterOrchestrator:
def __init__(self, llm):
self.llm = llm
self.master_planner = llm.with_structured_output(PlannerOutput)
self.compiled_worker_graph = graph
def classify_execution_job(self, job_description: str) -> str:
"""Map job description to specific action identifier - now works with MongoDB tools"""
job_lower = job_description.lower()
# Updated to match the new MongoDB tools
if 'track' in job_lower and ('package' in job_lower or 'parcel' in job_lower or 'shipment' in job_lower):
return 'track_package'
elif 'user' in job_lower and ('info' in job_lower or 'information' in job_lower or 'details' in job_lower):
return 'get_user_information'
elif 'estimate' in job_lower and ('time' in job_lower or 'delivery' in job_lower):
return 'estimated_time_analysis'
elif 'search' in job_lower and 'package' in job_lower:
return 'search_packages'
elif 'alert' in job_lower or 'service' in job_lower:
return 'get_service_alerts'
else:
return 'general_query'
def orchestrator(self, state: MasterState):
"""Generate a plan by breaking down the query into execution jobs"""
system_prompt = """You are a master task planner for a parcel logistics system. Given a query, break it down into specific, actionable execution jobs.
Available specialized tools include:
- Track packages using tracking numbers
- Get user information and shipping history
- Estimate delivery times between locations
- Search packages by various criteria
- Check service alerts and delays
Each job should be:
1. Clear and specific
2. Actionable by a specialized worker with database access
3. Independent or clearly sequenced
4. Focused on a single logistics objective
Return a list of execution jobs as strings that can be completed using the available parcel logistics tools."""
planner_result = self.master_planner.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=f"Here is the query brief: {state['query_brief']}")
])
print("MongoDB-aware Execution Jobs Generated:", planner_result.executor_jobs)
return {"execution_jobs": planner_result.executor_jobs}
def worker_executor(self, worker_input: dict):
"""Execute a single job using the MongoDB-enabled worker graph"""
job_description = worker_input["execution_job"]
action_type = self.classify_execution_job(job_description)
# Prepare the initial state for the worker
worker_state = {
"executor_messages": [HumanMessage(content=job_description)],
"execution_job": action_type, # This maps to MongoDB tool names
"executor_data": []
}
print(f"Executing MongoDB job: {job_description} -> Action: {action_type}")
# Execute the worker graph with MongoDB tools
try:
result = self.compiled_worker_graph.invoke(worker_state)
# Return the completed job info
return {
"completed_jobs": [f"Job: {job_description} - Action: {action_type} - Status: Completed (MongoDB)"],
"worker_outputs": [result]
}
except Exception as e:
error_result = {
"output": f"Error executing MongoDB job: {str(e)}",
"executor_data": [f"MongoDB Error: {str(e)}"],
"executor_messages": []
}
return {
"completed_jobs": [f"Job: {job_description} - Action: {action_type} - Status: Failed - {str(e)}"],
"worker_outputs": [error_result]
}
def assign_workers(self, state: MasterState):
"""Assign a worker to each execution job using Send"""
return [
Send("worker_executor", {"execution_job": job})
for job in state["execution_jobs"]
]
def synthesizer(self, state: MasterState):
"""Combine all completed jobs into a final output"""
# Create a synthesis prompt
synthesis_prompt = f"""
Original Customer Query: {state['query_brief']}
Completed Database Operations Summary:
{chr(10).join([f"- {job}" for job in state['completed_jobs']])}
Detailed Results from MongoDB Tools:
{chr(10).join([f"Result {i+1}: {output.get('output', 'No output')}" for i, output in enumerate(state['worker_outputs'])])}
Please synthesize all the database results into a comprehensive, friendly response that addresses the original customer query.
Focus on providing helpful, actionable information from the logistics database.
"""
synthesis_result = self.llm.invoke([
SystemMessage(content="You are a friendly synthesis expert. Combine the MongoDB database results into a helpful final response for the customer."),
HumanMessage(content=synthesis_prompt)
])
return {"final_output": synthesis_result.content}