riazmo's picture
Upload 17 files
cfec14d verified
"""
LangGraph Workflow Orchestration
Manages the flow between agents with persistence and breakpoints.
"""
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from state_schema import WorkflowState, create_initial_state
from agents import (
agent_0_node,
agent_1_node,
agent_2_node,
agent_3_node
)
# Global checkpointer to persist state between function calls
_global_checkpointer = MemorySaver()
def create_workflow():
"""Create the LangGraph workflow with persistence."""
global _global_checkpointer
workflow = StateGraph(WorkflowState)
# Add nodes (agents)
workflow.add_node("initialize", agent_0_node) # Validate & plan
workflow.add_node("capture_figma", agent_1_node) # Figma screenshots
workflow.add_node("capture_website", agent_2_node) # Website screenshots
workflow.add_node("compare", agent_3_node) # Visual comparison
# Define flow
workflow.set_entry_point("initialize")
workflow.add_edge("initialize", "capture_figma")
workflow.add_edge("capture_figma", "capture_website")
workflow.add_edge("capture_website", "compare") # Breakpoint before compare
workflow.add_edge("compare", END)
# Compile with persistence and breakpoint
return workflow.compile(
checkpointer=_global_checkpointer,
interrupt_before=["compare"] # Pause before analysis for human review
)
def run_workflow_step_1(
figma_id: str,
figma_key: str,
url: str,
execution_id: str,
thread_id: str,
hf_token: str = ""
):
"""
Run the first part of the workflow (capture screenshots).
Stops at the breakpoint before analysis.
"""
print(f" βš™οΈ Initializing workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
initial_state = create_initial_state(
figma_file_key=figma_id,
figma_access_token=figma_key,
website_url=url,
execution_id=execution_id,
hf_token=hf_token
)
print(" πŸƒ Running workflow (Step 1: Capture)...")
try:
for event in app.stream(initial_state, config, stream_mode="values"):
if event:
status = event.get("status", "")
if status:
print(f" πŸ“ Status: {status}")
except Exception as e:
print(f" ❌ Workflow error: {str(e)}")
raise
return app.get_state(config)
def resume_workflow(thread_id: str, user_approval: bool = True):
"""
Resume the workflow after human approval.
Continues from the breakpoint to run analysis.
"""
print(f" πŸ”„ Resuming workflow for thread: {thread_id}")
app = create_workflow()
config = {"configurable": {"thread_id": thread_id}}
# Update state with approval
app.update_state(config, {"user_approval": user_approval})
print(" πŸƒ Running workflow (Step 2: Analysis)...")
# Resume execution
state = None
for event in app.stream(None, config, stream_mode="values"):
state = event
return app.get_state(config)