""" LangGraph Workflow Orchestrations Updated for dictionary-based state and persistence. """ from typing import Dict, Any, List from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from state_schema import WorkflowState, create_initial_state from agents import ( agent_0_node, agent_1_node, agent_2_node, agent_3_integrated_node ) # Global checkpointer to persist state between function calls _global_checkpointer = MemorySaver() def create_workflow(): """Create the LangGraph workflow.""" global _global_checkpointer workflow = StateGraph(WorkflowState) # Add Nodes workflow.add_node("setup", agent_0_node) workflow.add_node("capture_design", agent_1_node) workflow.add_node("capture_website", agent_2_node) workflow.add_node("analysis", agent_3_integrated_node) # Define Edges workflow.set_entry_point("setup") workflow.add_edge("setup", "capture_design") workflow.add_edge("capture_design", "capture_website") # Human-in-the-loop breakpoint before analysis workflow.add_edge("capture_website", "analysis") workflow.add_edge("analysis", END) return workflow.compile( checkpointer=_global_checkpointer, interrupt_before=["analysis"] ) def run_workflow_step_1(figma_id, figma_key, url, execution_id, thread_id, hf_token=""): """Run the first part of the workflow until the breakpoint.""" print(f" ⚙️ Initializing workflow for thread: {thread_id}") app = create_workflow() config = {"configurable": {"thread_id": thread_id}} initial_state = create_initial_state( figma_file_key=figma_id, figma_access_token=figma_key, website_url=url, hf_token=hf_token, execution_id=execution_id ) # Run until interrupt print(" 🏃 Running workflow nodes...") try: for event in app.stream(initial_state, config, stream_mode="values"): # Log the current node if possible if event: print(f" 📍 Current state updated") except Exception as e: print(f" ❌ Error during workflow execution: {str(e)}") raise return app.get_state(config) def resume_workflow(thread_id, user_approval=True): """Resume the workflow after human approval.""" app = create_workflow() config = {"configurable": {"thread_id": thread_id}} # Update state with approval app.update_state(config, {"user_approval": user_approval}) # Resume execution for event in app.stream(None, config, stream_mode="values"): state = event return app.get_state(config)