Spaces:
Sleeping
Sleeping
| from langgraph.graph import StateGraph, START, END | |
| from src.states.masterState import ExecutorState, ExecutorOutputState | |
| from src.nodes.actionNode import ExecutorNode | |
| from src.llms.groqllm import GroqLLM | |
| from src.utils.prompts import execution_agent_prompt, compress_execution_human_message, compress_execution_system_prompt | |
| from src.utils.utils import think_tool, track_package, estimated_time_analysis | |
| tools = [think_tool, track_package, estimated_time_analysis] | |
| class ExecutorGraphBuilder: | |
| def __init__(self, llm ): | |
| self.llm = llm | |
| self.graph = StateGraph(ExecutorState, output=ExecutorOutputState) | |
| self.tools = tools | |
| self.execution_agent_prompt = execution_agent_prompt | |
| self.compress_execution_system_prompt = compress_execution_system_prompt | |
| self.compress_execution_human_message = compress_execution_human_message | |
| def build_executor_graph(self): | |
| """Build a graph to build the executor""" | |
| self.executor_node_obj = ExecutorNode( | |
| self.llm, | |
| ) | |
| self.graph.add_node("llm_call", self.executor_node_obj.llm_call) | |
| self.graph.add_node("tool_node", self.executor_node_obj.tool_node) | |
| self.graph.add_node("compress_execution", self.executor_node_obj.compress_execution) | |
| # Flow | |
| self.graph.add_edge(START, "llm_call") | |
| self.graph.add_conditional_edges( | |
| "llm_call", | |
| self.executor_node_obj.guard_llm, | |
| { | |
| "tool_node": "tool_node", | |
| "compress_execution": "compress_execution", | |
| }, | |
| ) | |
| self.graph.add_edge("tool_node", "llm_call") | |
| self.graph.add_edge("compress_execution", END) | |
| return self.graph | |
| def setup_graph(self): | |
| return self.graph.compile() | |
| llm=GroqLLM().get_llm() | |
| ## Creating the graph | |
| graph_builder=ExecutorGraphBuilder(llm) | |
| graph=graph_builder.build_executor_graph().compile() | |