""" LangGraph Workflow Orchestration Manages the flow between agents with persistence and breakpoints. """ from typing import Dict, Any from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from state_schema import WorkflowState, create_initial_state from agents import ( agent_0_node, agent_1_node, agent_2_node, agent_3_node ) # Global checkpointer to persist state between function calls _global_checkpointer = MemorySaver() def create_workflow(): """Create the LangGraph workflow with persistence.""" global _global_checkpointer workflow = StateGraph(WorkflowState) # Add nodes (agents) workflow.add_node("initialize", agent_0_node) # Validate & plan workflow.add_node("capture_figma", agent_1_node) # Figma screenshots workflow.add_node("capture_website", agent_2_node) # Website screenshots workflow.add_node("compare", agent_3_node) # Visual comparison # Define flow workflow.set_entry_point("initialize") workflow.add_edge("initialize", "capture_figma") workflow.add_edge("capture_figma", "capture_website") workflow.add_edge("capture_website", "compare") # Breakpoint before compare workflow.add_edge("compare", END) # Compile with persistence and breakpoint return workflow.compile( checkpointer=_global_checkpointer, interrupt_before=["compare"] # Pause before analysis for human review ) def run_workflow_step_1( figma_id: str, figma_key: str, url: str, execution_id: str, thread_id: str, hf_token: str = "" ): """ Run the first part of the workflow (capture screenshots). Stops at the breakpoint before analysis. """ print(f" ⚙️ Initializing workflow for thread: {thread_id}") app = create_workflow() config = {"configurable": {"thread_id": thread_id}} initial_state = create_initial_state( figma_file_key=figma_id, figma_access_token=figma_key, website_url=url, execution_id=execution_id, hf_token=hf_token ) print(" 🏃 Running workflow (Step 1: Capture)...") try: for event in app.stream(initial_state, config, stream_mode="values"): if event: status = event.get("status", "") if status: print(f" 📍 Status: {status}") except Exception as e: print(f" ❌ Workflow error: {str(e)}") raise return app.get_state(config) def resume_workflow(thread_id: str, user_approval: bool = True): """ Resume the workflow after human approval. Continues from the breakpoint to run analysis. """ print(f" 🔄 Resuming workflow for thread: {thread_id}") app = create_workflow() config = {"configurable": {"thread_id": thread_id}} # Update state with approval app.update_state(config, {"user_approval": user_approval}) print(" 🏃 Running workflow (Step 2: Analysis)...") # Resume execution state = None for event in app.stream(None, config, stream_mode="values"): state = event return app.get_state(config)