Spaces:
Sleeping
Sleeping
File size: 3,228 Bytes
cfec14d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
"""
LangGraph Workflow Orchestration
Manages the flow between agents with persistence and breakpoints.
"""
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state_schema import WorkflowState, create_initial_state
from agents import (
agent_0_node,
agent_1_node,
agent_2_node,
agent_3_node
)
# Global checkpointer to persist state between function calls
_global_checkpointer = MemorySaver()
def create_workflow():
"""Create the LangGraph workflow with persistence."""
global _global_checkpointer
workflow = StateGraph(WorkflowState)
# Add nodes (agents)
workflow.add_node("initialize", agent_0_node) # Validate & plan
workflow.add_node("capture_figma", agent_1_node) # Figma screenshots
workflow.add_node("capture_website", agent_2_node) # Website screenshots
workflow.add_node("compare", agent_3_node) # Visual comparison
# Define flow
workflow.set_entry_point("initialize")
workflow.add_edge("initialize", "capture_figma")
workflow.add_edge("capture_figma", "capture_website")
workflow.add_edge("capture_website", "compare") # Breakpoint before compare
workflow.add_edge("compare", END)
# Compile with persistence and breakpoint
return workflow.compile(
checkpointer=_global_checkpointer,
interrupt_before=["compare"] # Pause before analysis for human review
)
def run_workflow_step_1(
figma_id: str,
figma_key: str,
url: str,
execution_id: str,
thread_id: str,
hf_token: str = ""
):
"""
Run the first part of the workflow (capture screenshots).
Stops at the breakpoint before analysis.
"""
print(f" βοΈ Initializing workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
execution_id=execution_id,
hf_token=hf_token
)
print(" π Running workflow (Step 1: Capture)...")
try:
for event in app.stream(initial_state, config, stream_mode="values"):
if event:
status = event.get("status", "")
if status:
print(f" π Status: {status}")
except Exception as e:
print(f" β Workflow error: {str(e)}")
raise
return app.get_state(config)
def resume_workflow(thread_id: str, user_approval: bool = True):
"""
Resume the workflow after human approval.
Continues from the breakpoint to run analysis.
"""
print(f" π Resuming workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
# Update state with approval
app.update_state(config, {"user_approval": user_approval})
print(" π Running workflow (Step 2: Analysis)...")
# Resume execution
state = None
for event in app.stream(None, config, stream_mode="values"):
state = event
return app.get_state(config)
|