Spaces:
Sleeping
Sleeping
File size: 5,360 Bytes
57026c7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | """
LangGraph Workflow Orchestration
Manages the flow between agents with persistence and breakpoints.
Workflow:
1. Agent 0 (Super Agent): Validate inputs and create test plan
2. Agent 1 (Design Inspector): Capture Figma screenshots + extract elements
3. Agent 2 (Website Inspector): Capture website screenshots + extract DOM elements
4. Agent 3 (Visual Comparator): Pixel-level visual diff analysis
5. Agent 4 (Element Matcher): Match Figma elements to DOM elements
6. Agent 5 (AI Analyzer): AI-powered analysis using HF models
"""
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state_schema import WorkflowState, create_initial_state
from agents import (
agent_0_node,
agent_1_node,
agent_2_node,
agent_3_node,
agent_4_node,
agent_5_node
)
# Global checkpointer to persist state between function calls
_global_checkpointer = MemorySaver()
def create_workflow():
"""Create the LangGraph workflow with persistence."""
global _global_checkpointer
workflow = StateGraph(WorkflowState)
# Add nodes (agents)
workflow.add_node("initialize", agent_0_node) # Validate & plan
workflow.add_node("capture_figma", agent_1_node) # Figma screenshots + elements
workflow.add_node("capture_website", agent_2_node) # Website screenshots + elements
workflow.add_node("visual_compare", agent_3_node) # Visual diff analysis
workflow.add_node("match_elements", agent_4_node) # Element matching
workflow.add_node("ai_analyze", agent_5_node) # AI-powered analysis
# Define flow
workflow.set_entry_point("initialize")
workflow.add_edge("initialize", "capture_figma")
workflow.add_edge("capture_figma", "capture_website")
workflow.add_edge("capture_website", "visual_compare")
workflow.add_edge("visual_compare", "match_elements")
workflow.add_edge("match_elements", "ai_analyze")
workflow.add_edge("ai_analyze", END)
# Compile with persistence
return workflow.compile(
checkpointer=_global_checkpointer
)
def run_workflow_step_1(
figma_id: str,
figma_key: str,
url: str,
execution_id: str,
thread_id: str,
hf_token: str = ""
):
"""
Run the first part of the workflow (capture screenshots).
Stops at the breakpoint before analysis.
"""
print(f" βοΈ Initializing workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
execution_id=execution_id,
hf_token=hf_token
)
print(" π Running workflow (Step 1: Capture)...")
try:
for event in app.stream(initial_state, config, stream_mode="values"):
if event:
status = event.get("status", "")
if status:
print(f" π Status: {status}")
except Exception as e:
print(f" β Workflow error: {str(e)}")
raise
return app.get_state(config)
def resume_workflow(thread_id: str, user_approval: bool = True):
"""
Resume the workflow after human approval.
Continues from the breakpoint to run analysis.
"""
print(f" π Resuming workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
# Update state with approval
app.update_state(config, {"user_approval": user_approval})
print(" π Running workflow (Step 2: Analysis)...")
# Resume execution
state = None
for event in app.stream(None, config, stream_mode="values"):
state = event
return app.get_state(config)
def run_workflow(
figma_id: str,
figma_key: str,
url: str,
execution_id: str,
thread_id: str,
hf_token: str = ""
):
"""
Run the complete workflow in one go.
This is the main entry point for the v2 interface.
Runs all agents sequentially: capture -> compare -> match -> analyze
"""
print(f"\n{'='*60}")
print(f"βοΈ Starting Complete Workflow")
print(f"{'='*60}")
print(f" Thread: {thread_id}")
print(f" Execution: {execution_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
execution_id=execution_id,
hf_token=hf_token
)
print("\nπ Running complete workflow...")
try:
final_state = None
for event in app.stream(initial_state, config, stream_mode="values"):
if event:
status = event.get("status", "")
if status:
print(f" π {status}")
final_state = event
print(f"\n{'='*60}")
print(f"β
Workflow Complete!")
print(f"{'='*60}")
return app.get_state(config)
except Exception as e:
print(f"\nβ Workflow error: {str(e)}")
import traceback
traceback.print_exc()
raise
|