riazmo's picture
Upload 22 files
57026c7 verified
"""
LangGraph Workflow Orchestration
Manages the flow between agents with persistence and breakpoints.
Workflow:
1. Agent 0 (Super Agent): Validate inputs and create test plan
2. Agent 1 (Design Inspector): Capture Figma screenshots + extract elements
3. Agent 2 (Website Inspector): Capture website screenshots + extract DOM elements
4. Agent 3 (Visual Comparator): Pixel-level visual diff analysis
5. Agent 4 (Element Matcher): Match Figma elements to DOM elements
6. Agent 5 (AI Analyzer): AI-powered analysis using HF models
"""
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state_schema import WorkflowState, create_initial_state
from agents import (
agent_0_node,
agent_1_node,
agent_2_node,
agent_3_node,
agent_4_node,
agent_5_node
)
# Global checkpointer to persist state between function calls
_global_checkpointer = MemorySaver()
def create_workflow():
"""Create the LangGraph workflow with persistence."""
global _global_checkpointer
workflow = StateGraph(WorkflowState)
# Add nodes (agents)
workflow.add_node("initialize", agent_0_node) # Validate & plan
workflow.add_node("capture_figma", agent_1_node) # Figma screenshots + elements
workflow.add_node("capture_website", agent_2_node) # Website screenshots + elements
workflow.add_node("visual_compare", agent_3_node) # Visual diff analysis
workflow.add_node("match_elements", agent_4_node) # Element matching
workflow.add_node("ai_analyze", agent_5_node) # AI-powered analysis
# Define flow
workflow.set_entry_point("initialize")
workflow.add_edge("initialize", "capture_figma")
workflow.add_edge("capture_figma", "capture_website")
workflow.add_edge("capture_website", "visual_compare")
workflow.add_edge("visual_compare", "match_elements")
workflow.add_edge("match_elements", "ai_analyze")
workflow.add_edge("ai_analyze", END)
# Compile with persistence
return workflow.compile(
checkpointer=_global_checkpointer
)
def run_workflow_step_1(
figma_id: str,
figma_key: str,
url: str,
execution_id: str,
thread_id: str,
hf_token: str = ""
):
"""
Run the first part of the workflow (capture screenshots).
Stops at the breakpoint before analysis.
"""
print(f" βš™οΈ Initializing workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
execution_id=execution_id,
hf_token=hf_token
)
print(" πŸƒ Running workflow (Step 1: Capture)...")
try:
for event in app.stream(initial_state, config, stream_mode="values"):
if event:
status = event.get("status", "")
if status:
print(f" πŸ“ Status: {status}")
except Exception as e:
print(f" ❌ Workflow error: {str(e)}")
raise
return app.get_state(config)
def resume_workflow(thread_id: str, user_approval: bool = True):
"""
Resume the workflow after human approval.
Continues from the breakpoint to run analysis.
"""
print(f" πŸ”„ Resuming workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
# Update state with approval
app.update_state(config, {"user_approval": user_approval})
print(" πŸƒ Running workflow (Step 2: Analysis)...")
# Resume execution
state = None
for event in app.stream(None, config, stream_mode="values"):
state = event
return app.get_state(config)
def run_workflow(
figma_id: str,
figma_key: str,
url: str,
execution_id: str,
thread_id: str,
hf_token: str = ""
):
"""
Run the complete workflow in one go.
This is the main entry point for the v2 interface.
Runs all agents sequentially: capture -> compare -> match -> analyze
"""
print(f"\n{'='*60}")
print(f"βš™οΈ Starting Complete Workflow")
print(f"{'='*60}")
print(f" Thread: {thread_id}")
print(f" Execution: {execution_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
execution_id=execution_id,
hf_token=hf_token
)
print("\nπŸƒ Running complete workflow...")
try:
final_state = None
for event in app.stream(initial_state, config, stream_mode="values"):
if event:
status = event.get("status", "")
if status:
print(f" πŸ“ {status}")
final_state = event
print(f"\n{'='*60}")
print(f"βœ… Workflow Complete!")
print(f"{'='*60}")
return app.get_state(config)
except Exception as e:
print(f"\n❌ Workflow error: {str(e)}")
import traceback
traceback.print_exc()
raise