operations / agent /graph.py
jbbove's picture
🎯 Complete OMIRL web services refactoring with workflow enhancement
36f8fda
# agent/graph.py
"""
LangGraph Workflow Definition
This module defines the LangGraph workflow for the operations agent.
It specifies how nodes are connected and under what conditions the
workflow transitions between different processing steps.
Purpose:
- Define the agent workflow as a LangGraph StateGraph
- Specify node connections and conditional routing
- Handle workflow compilation and execution
- Support different execution paths based on intent and results
Dependencies:
- langgraph: Core graph framework
- state.py: Agent state definitions
- nodes.py: Node function implementations
Used by:
- agent.py: Main agent execution
- Direct usage: Workflow testing and debugging
"""
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from .state import AgentState
from .nodes import (
llm_routing_node,
tool_execution_node,
response_generation_node,
error_handling_node,
llm_summarization_node
)
def create_operations_workflow() -> Any:
"""
Create and compile the operations agent workflow with LLM router
This function defines the complete workflow for processing user requests,
including LLM-based routing, tool execution, and response generation.
Returns:
Compiled LangGraph workflow ready for execution
"""
print("πŸ”§ Creating operations agent workflow with LLM router...")
# Create the state graph
workflow = StateGraph(AgentState)
# Add nodes to the workflow (tool_planning node removed as redundant)
workflow.add_node("llm_routing", llm_routing_node)
workflow.add_node("tool_execution", tool_execution_node)
workflow.add_node("llm_summarization", llm_summarization_node)
workflow.add_node("response_generation", response_generation_node)
workflow.add_node("error_handling", error_handling_node)
# Set the entry point
workflow.set_entry_point("llm_routing")
# Define workflow transitions using simplified routing
workflow.add_conditional_edges(
"llm_routing",
_simplified_llm_router,
{
"execute_tools": "tool_execution",
"direct_response": "response_generation",
"error": "error_handling"
}
)
# Tool execution can lead to error handling or LLM summarization
workflow.add_conditional_edges(
"tool_execution",
_simple_execution_router,
{
"has_errors": "error_handling",
"success": "llm_summarization"
}
)
# LLM summarization leads to response generation
workflow.add_edge("llm_summarization", "response_generation")
# Both response generation and error handling end the workflow
workflow.add_edge("response_generation", END)
workflow.add_edge("error_handling", END)
# Compile the workflow
compiled_workflow = workflow.compile()
print("βœ… Operations workflow compiled successfully")
return compiled_workflow
def create_simple_workflow() -> Any:
"""
Create a simplified workflow for testing and development with LLM router
This workflow is useful for debugging and development as it
follows a more linear path with the LLM router.
Returns:
Compiled simplified workflow
"""
print("πŸ”§ Creating simple operations workflow with LLM router...")
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("llm_routing", llm_routing_node)
workflow.add_node("tool_execution", tool_execution_node)
workflow.add_node("llm_summarization", llm_summarization_node)
workflow.add_node("response_generation", response_generation_node)
# Set entry point
workflow.set_entry_point("llm_routing")
# Simple conditional flow
workflow.add_conditional_edges(
"llm_routing",
_simple_router,
{
"execute_tools": "tool_execution",
"direct_response": "response_generation"
}
)
workflow.add_edge("tool_execution", "llm_summarization")
workflow.add_edge("llm_summarization", "response_generation")
workflow.add_edge("response_generation", END)
compiled_workflow = workflow.compile()
print("βœ… Simple workflow compiled successfully")
return compiled_workflow
def _simple_router(state: AgentState) -> str:
"""Simple router for the simplified workflow"""
if state.get("processing_status") == "approved" and state.get("planned_tools"):
return "execute_tools"
return "direct_response"
def _simplified_llm_router(state: AgentState) -> str:
"""
Simplified router function to determine path after LLM routing
Trust the LLM router decisions with minimal additional logic.
Args:
state: Current agent state after LLM routing
Returns:
"execute_tools" if tools are planned, "direct_response" for help/clarification, "error" for errors
"""
status = state.get("processing_status", "unknown")
# Check for errors from LLM routing
if status == "error":
print("🚨 Routing to error handling")
return "error"
# Check if tools are planned and approved
if status == "approved" and state.get("planned_tools"):
print("🎯 Routing to tool execution")
return "execute_tools"
# Help requests, clarification needs, or rejections go to direct response
if status in ["needs_clarification", "help_requested", "rejected"]:
print(f"ℹ️ Routing to direct response ({status})")
return "direct_response"
# Default fallback
print("❓ Routing to direct response (fallback)")
return "direct_response"
def _simple_execution_router(state: AgentState) -> str:
"""
Simplified router function to determine post-execution path
Args:
state: Current agent state after tool execution
Returns:
"has_errors" if errors occurred, "success" otherwise
"""
# Check for errors in execution
errors = state.get("errors", [])
if errors:
print("🚨 Routing to error handling")
return "has_errors"
# Check if all tool results are successful
tool_results = state.get("tool_results", [])
if tool_results and all(result.success for result in tool_results):
print("βœ… Routing to success response")
return "success"
# If no tool results or mixed results, treat as partial success
print("⚠️ Routing to response generation (partial success)")
return "success"
# Workflow introspection functions
def get_workflow_nodes(workflow: Any) -> list:
"""
Get list of nodes in the workflow
Args:
workflow: Compiled workflow
Returns:
List of node names
"""
# Return the actual nodes in the simplified workflow
return [
"llm_routing",
"tool_execution",
"response_generation",
"error_handling"
]
def print_workflow_summary(workflow: Any):
"""
Print a summary of the workflow structure
Args:
workflow: Compiled workflow to summarize
"""
print("πŸ”„ Operations Agent Workflow Summary:")
print(" Entry Point: llm_routing")
print(" Nodes:")
print(" β€’ llm_routing β†’ [tool_execution | response_generation | error_handling]")
print(" β€’ tool_execution β†’ [error_handling | llm_summarization]")
print(" β€’ llm_summarization β†’ response_generation")
print(" β€’ response_generation β†’ END")
print(" β€’ error_handling β†’ END")
print()
print(" Conditional Routing:")
print(" β€’ LLM approved with tools? β†’ Tool Execution")
print(" β€’ LLM clarification/help/rejected? β†’ Direct Response")
print(" β€’ Tool execution errors? β†’ Error Handling")
print(" β€’ Tool execution success? β†’ LLM Summarization β†’ Response")
print()
# Workflow validation
def validate_workflow_state(state: AgentState, node_name: str) -> bool:
"""
Validate that the state is appropriate for a given node
Args:
state: Agent state to validate
node_name: Name of the node about to be executed
Returns:
True if state is valid for the node, False otherwise
"""
if node_name == "llm_routing":
return "user_message" in state and state["user_message"]
elif node_name == "tool_execution":
return "planned_tools" in state and state.get("planned_tools")
elif node_name == "response_generation":
return "tool_results" in state or "processing_status" in state
elif node_name == "error_handling":
return "errors" in state and len(state["errors"]) > 0
return True
# Factory functions for different use cases
def create_omirl_focused_workflow() -> Any:
"""
Create a workflow optimized for OMIRL operations
This workflow assumes most requests are OMIRL-related
and optimizes the routing accordingly.
Returns:
Compiled OMIRL-focused workflow
"""
# For now, use the standard workflow
# Could be customized with different routing logic
return create_operations_workflow()
def create_debug_workflow() -> Any:
"""
Create a workflow with extensive debugging and logging
Returns:
Compiled debug workflow
"""
# For now, use the simple workflow which is easier to debug
return create_simple_workflow()
# Default workflow factory
def get_default_workflow() -> Any:
"""
Get the default operations workflow
Returns:
Default compiled workflow
"""
return create_operations_workflow()
if __name__ == "__main__":
# Test workflow compilation
workflow = create_operations_workflow()
print_workflow_summary(workflow)
# Test simple workflow
simple_workflow = create_simple_workflow()
print("\n" + "="*50)
print("Simple workflow compiled successfully")