Spaces:
Runtime error
Runtime error
| # agent/graph.py | |
| """ | |
| LangGraph Workflow Definition | |
| This module defines the LangGraph workflow for the operations agent. | |
| It specifies how nodes are connected and under what conditions the | |
| workflow transitions between different processing steps. | |
| Purpose: | |
| - Define the agent workflow as a LangGraph StateGraph | |
| - Specify node connections and conditional routing | |
| - Handle workflow compilation and execution | |
| - Support different execution paths based on intent and results | |
| Dependencies: | |
| - langgraph: Core graph framework | |
| - state.py: Agent state definitions | |
| - nodes.py: Node function implementations | |
| Used by: | |
| - agent.py: Main agent execution | |
| - Direct usage: Workflow testing and debugging | |
| """ | |
| from typing import Dict, Any | |
| from langgraph.graph import StateGraph, END | |
| from .state import AgentState | |
| from .nodes import ( | |
| llm_routing_node, | |
| tool_execution_node, | |
| response_generation_node, | |
| error_handling_node, | |
| llm_summarization_node | |
| ) | |
| def create_operations_workflow() -> Any: | |
| """ | |
| Create and compile the operations agent workflow with LLM router | |
| This function defines the complete workflow for processing user requests, | |
| including LLM-based routing, tool execution, and response generation. | |
| Returns: | |
| Compiled LangGraph workflow ready for execution | |
| """ | |
| print("π§ Creating operations agent workflow with LLM router...") | |
| # Create the state graph | |
| workflow = StateGraph(AgentState) | |
| # Add nodes to the workflow (tool_planning node removed as redundant) | |
| workflow.add_node("llm_routing", llm_routing_node) | |
| workflow.add_node("tool_execution", tool_execution_node) | |
| workflow.add_node("llm_summarization", llm_summarization_node) | |
| workflow.add_node("response_generation", response_generation_node) | |
| workflow.add_node("error_handling", error_handling_node) | |
| # Set the entry point | |
| workflow.set_entry_point("llm_routing") | |
| # Define workflow transitions using simplified routing | |
| workflow.add_conditional_edges( | |
| "llm_routing", | |
| _simplified_llm_router, | |
| { | |
| "execute_tools": "tool_execution", | |
| "direct_response": "response_generation", | |
| "error": "error_handling" | |
| } | |
| ) | |
| # Tool execution can lead to error handling or LLM summarization | |
| workflow.add_conditional_edges( | |
| "tool_execution", | |
| _simple_execution_router, | |
| { | |
| "has_errors": "error_handling", | |
| "success": "llm_summarization" | |
| } | |
| ) | |
| # LLM summarization leads to response generation | |
| workflow.add_edge("llm_summarization", "response_generation") | |
| # Both response generation and error handling end the workflow | |
| workflow.add_edge("response_generation", END) | |
| workflow.add_edge("error_handling", END) | |
| # Compile the workflow | |
| compiled_workflow = workflow.compile() | |
| print("β Operations workflow compiled successfully") | |
| return compiled_workflow | |
| def create_simple_workflow() -> Any: | |
| """ | |
| Create a simplified workflow for testing and development with LLM router | |
| This workflow is useful for debugging and development as it | |
| follows a more linear path with the LLM router. | |
| Returns: | |
| Compiled simplified workflow | |
| """ | |
| print("π§ Creating simple operations workflow with LLM router...") | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("llm_routing", llm_routing_node) | |
| workflow.add_node("tool_execution", tool_execution_node) | |
| workflow.add_node("llm_summarization", llm_summarization_node) | |
| workflow.add_node("response_generation", response_generation_node) | |
| # Set entry point | |
| workflow.set_entry_point("llm_routing") | |
| # Simple conditional flow | |
| workflow.add_conditional_edges( | |
| "llm_routing", | |
| _simple_router, | |
| { | |
| "execute_tools": "tool_execution", | |
| "direct_response": "response_generation" | |
| } | |
| ) | |
| workflow.add_edge("tool_execution", "llm_summarization") | |
| workflow.add_edge("llm_summarization", "response_generation") | |
| workflow.add_edge("response_generation", END) | |
| compiled_workflow = workflow.compile() | |
| print("β Simple workflow compiled successfully") | |
| return compiled_workflow | |
| def _simple_router(state: AgentState) -> str: | |
| """Simple router for the simplified workflow""" | |
| if state.get("processing_status") == "approved" and state.get("planned_tools"): | |
| return "execute_tools" | |
| return "direct_response" | |
| def _simplified_llm_router(state: AgentState) -> str: | |
| """ | |
| Simplified router function to determine path after LLM routing | |
| Trust the LLM router decisions with minimal additional logic. | |
| Args: | |
| state: Current agent state after LLM routing | |
| Returns: | |
| "execute_tools" if tools are planned, "direct_response" for help/clarification, "error" for errors | |
| """ | |
| status = state.get("processing_status", "unknown") | |
| # Check for errors from LLM routing | |
| if status == "error": | |
| print("π¨ Routing to error handling") | |
| return "error" | |
| # Check if tools are planned and approved | |
| if status == "approved" and state.get("planned_tools"): | |
| print("π― Routing to tool execution") | |
| return "execute_tools" | |
| # Help requests, clarification needs, or rejections go to direct response | |
| if status in ["needs_clarification", "help_requested", "rejected"]: | |
| print(f"βΉοΈ Routing to direct response ({status})") | |
| return "direct_response" | |
| # Default fallback | |
| print("β Routing to direct response (fallback)") | |
| return "direct_response" | |
| def _simple_execution_router(state: AgentState) -> str: | |
| """ | |
| Simplified router function to determine post-execution path | |
| Args: | |
| state: Current agent state after tool execution | |
| Returns: | |
| "has_errors" if errors occurred, "success" otherwise | |
| """ | |
| # Check for errors in execution | |
| errors = state.get("errors", []) | |
| if errors: | |
| print("π¨ Routing to error handling") | |
| return "has_errors" | |
| # Check if all tool results are successful | |
| tool_results = state.get("tool_results", []) | |
| if tool_results and all(result.success for result in tool_results): | |
| print("β Routing to success response") | |
| return "success" | |
| # If no tool results or mixed results, treat as partial success | |
| print("β οΈ Routing to response generation (partial success)") | |
| return "success" | |
| # Workflow introspection functions | |
| def get_workflow_nodes(workflow: Any) -> list: | |
| """ | |
| Get list of nodes in the workflow | |
| Args: | |
| workflow: Compiled workflow | |
| Returns: | |
| List of node names | |
| """ | |
| # Return the actual nodes in the simplified workflow | |
| return [ | |
| "llm_routing", | |
| "tool_execution", | |
| "response_generation", | |
| "error_handling" | |
| ] | |
| def print_workflow_summary(workflow: Any): | |
| """ | |
| Print a summary of the workflow structure | |
| Args: | |
| workflow: Compiled workflow to summarize | |
| """ | |
| print("π Operations Agent Workflow Summary:") | |
| print(" Entry Point: llm_routing") | |
| print(" Nodes:") | |
| print(" β’ llm_routing β [tool_execution | response_generation | error_handling]") | |
| print(" β’ tool_execution β [error_handling | llm_summarization]") | |
| print(" β’ llm_summarization β response_generation") | |
| print(" β’ response_generation β END") | |
| print(" β’ error_handling β END") | |
| print() | |
| print(" Conditional Routing:") | |
| print(" β’ LLM approved with tools? β Tool Execution") | |
| print(" β’ LLM clarification/help/rejected? β Direct Response") | |
| print(" β’ Tool execution errors? β Error Handling") | |
| print(" β’ Tool execution success? β LLM Summarization β Response") | |
| print() | |
| # Workflow validation | |
| def validate_workflow_state(state: AgentState, node_name: str) -> bool: | |
| """ | |
| Validate that the state is appropriate for a given node | |
| Args: | |
| state: Agent state to validate | |
| node_name: Name of the node about to be executed | |
| Returns: | |
| True if state is valid for the node, False otherwise | |
| """ | |
| if node_name == "llm_routing": | |
| return "user_message" in state and state["user_message"] | |
| elif node_name == "tool_execution": | |
| return "planned_tools" in state and state.get("planned_tools") | |
| elif node_name == "response_generation": | |
| return "tool_results" in state or "processing_status" in state | |
| elif node_name == "error_handling": | |
| return "errors" in state and len(state["errors"]) > 0 | |
| return True | |
| # Factory functions for different use cases | |
| def create_omirl_focused_workflow() -> Any: | |
| """ | |
| Create a workflow optimized for OMIRL operations | |
| This workflow assumes most requests are OMIRL-related | |
| and optimizes the routing accordingly. | |
| Returns: | |
| Compiled OMIRL-focused workflow | |
| """ | |
| # For now, use the standard workflow | |
| # Could be customized with different routing logic | |
| return create_operations_workflow() | |
| def create_debug_workflow() -> Any: | |
| """ | |
| Create a workflow with extensive debugging and logging | |
| Returns: | |
| Compiled debug workflow | |
| """ | |
| # For now, use the simple workflow which is easier to debug | |
| return create_simple_workflow() | |
| # Default workflow factory | |
| def get_default_workflow() -> Any: | |
| """ | |
| Get the default operations workflow | |
| Returns: | |
| Default compiled workflow | |
| """ | |
| return create_operations_workflow() | |
| if __name__ == "__main__": | |
| # Test workflow compilation | |
| workflow = create_operations_workflow() | |
| print_workflow_summary(workflow) | |
| # Test simple workflow | |
| simple_workflow = create_simple_workflow() | |
| print("\n" + "="*50) | |
| print("Simple workflow compiled successfully") | |