riazmo's picture
Update workflow.py
e151643 verified
"""
LangGraph Workflow Orchestrations
Updated for dictionary-based state and persistence.
"""
from typing import Dict, Any, List
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state_schema import WorkflowState, create_initial_state
from agents import (
agent_0_node,
agent_1_node,
agent_2_node,
agent_3_integrated_node
)
# Global checkpointer to persist state between function calls
_global_checkpointer = MemorySaver()
def create_workflow():
"""Create the LangGraph workflow."""
global _global_checkpointer
workflow = StateGraph(WorkflowState)
# Add Nodes
workflow.add_node("setup", agent_0_node)
workflow.add_node("capture_design", agent_1_node)
workflow.add_node("capture_website", agent_2_node)
workflow.add_node("analysis", agent_3_integrated_node)
# Define Edges
workflow.set_entry_point("setup")
workflow.add_edge("setup", "capture_design")
workflow.add_edge("capture_design", "capture_website")
# Human-in-the-loop breakpoint before analysis
workflow.add_edge("capture_website", "analysis")
workflow.add_edge("analysis", END)
return workflow.compile(
checkpointer=_global_checkpointer,
interrupt_before=["analysis"]
)
def run_workflow_step_1(figma_id, figma_key, url, execution_id, thread_id, hf_token=""):
"""Run the first part of the workflow until the breakpoint."""
print(f" βš™οΈ Initializing workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
hf_token=hf_token,
execution_id=execution_id
)
# Run until interrupt
print(" πŸƒ Running workflow nodes...")
try:
for event in app.stream(initial_state, config, stream_mode="values"):
# Log the current node if possible
if event:
print(f" πŸ“ Current state updated")
except Exception as e:
print(f" ❌ Error during workflow execution: {str(e)}")
raise
return app.get_state(config)
def resume_workflow(thread_id, user_approval=True):
"""Resume the workflow after human approval."""
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
# Update state with approval
app.update_state(config, {"user_approval": user_approval})
# Resume execution
for event in app.stream(None, config, stream_mode="values"):
state = event
return app.get_state(config)