""" LangGraph Workflow Orchestration Manages the flow between agents with persistence and breakpoints. Workflow: 1. Agent 0 (Super Agent): Validate inputs and create test plan 2. Agent 1 (Design Inspector): Capture Figma screenshots + extract elements 3. Agent 2 (Website Inspector): Capture website screenshots + extract DOM elements 4. Agent 3 (Visual Comparator): Pixel-level visual diff analysis 5. Agent 4 (Element Matcher): Match Figma elements to DOM elements 6. Agent 5 (AI Analyzer): AI-powered analysis using HF models """ from typing import Dict, Any from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from state_schema import WorkflowState, create_initial_state from agents import ( agent_0_node, agent_1_node, agent_2_node, agent_3_node, agent_4_node, agent_5_node ) # Global checkpointer to persist state between function calls _global_checkpointer = MemorySaver() def create_workflow(): """Create the LangGraph workflow with persistence.""" global _global_checkpointer workflow = StateGraph(WorkflowState) # Add nodes (agents) workflow.add_node("initialize", agent_0_node) # Validate & plan workflow.add_node("capture_figma", agent_1_node) # Figma screenshots + elements workflow.add_node("capture_website", agent_2_node) # Website screenshots + elements workflow.add_node("visual_compare", agent_3_node) # Visual diff analysis workflow.add_node("match_elements", agent_4_node) # Element matching workflow.add_node("ai_analyze", agent_5_node) # AI-powered analysis # Define flow workflow.set_entry_point("initialize") workflow.add_edge("initialize", "capture_figma") workflow.add_edge("capture_figma", "capture_website") workflow.add_edge("capture_website", "visual_compare") workflow.add_edge("visual_compare", "match_elements") workflow.add_edge("match_elements", "ai_analyze") workflow.add_edge("ai_analyze", END) # Compile with persistence return workflow.compile( checkpointer=_global_checkpointer ) def run_workflow_step_1( figma_id: str, figma_key: str, url: str, execution_id: str, thread_id: str, hf_token: str = "" ): """ Run the first part of the workflow (capture screenshots). Stops at the breakpoint before analysis. """ print(f" āš™ļø Initializing workflow for thread: {thread_id}") app = create_workflow() config = {"configurable": {"thread_id": thread_id}} initial_state = create_initial_state( figma_file_key=figma_id, figma_access_token=figma_key, website_url=url, execution_id=execution_id, hf_token=hf_token ) print(" šŸƒ Running workflow (Step 1: Capture)...") try: for event in app.stream(initial_state, config, stream_mode="values"): if event: status = event.get("status", "") if status: print(f" šŸ“ Status: {status}") except Exception as e: print(f" āŒ Workflow error: {str(e)}") raise return app.get_state(config) def resume_workflow(thread_id: str, user_approval: bool = True): """ Resume the workflow after human approval. Continues from the breakpoint to run analysis. """ print(f" šŸ”„ Resuming workflow for thread: {thread_id}") app = create_workflow() config = {"configurable": {"thread_id": thread_id}} # Update state with approval app.update_state(config, {"user_approval": user_approval}) print(" šŸƒ Running workflow (Step 2: Analysis)...") # Resume execution state = None for event in app.stream(None, config, stream_mode="values"): state = event return app.get_state(config) def run_workflow( figma_id: str, figma_key: str, url: str, execution_id: str, thread_id: str, hf_token: str = "" ): """ Run the complete workflow in one go. This is the main entry point for the v2 interface. Runs all agents sequentially: capture -> compare -> match -> analyze """ print(f"\n{'='*60}") print(f"āš™ļø Starting Complete Workflow") print(f"{'='*60}") print(f" Thread: {thread_id}") print(f" Execution: {execution_id}") app = create_workflow() config = {"configurable": {"thread_id": thread_id}} initial_state = create_initial_state( figma_file_key=figma_id, figma_access_token=figma_key, website_url=url, execution_id=execution_id, hf_token=hf_token ) print("\nšŸƒ Running complete workflow...") try: final_state = None for event in app.stream(initial_state, config, stream_mode="values"): if event: status = event.get("status", "") if status: print(f" šŸ“ {status}") final_state = event print(f"\n{'='*60}") print(f"āœ… Workflow Complete!") print(f"{'='*60}") return app.get_state(config) except Exception as e: print(f"\nāŒ Workflow error: {str(e)}") import traceback traceback.print_exc() raise