|
|
""" |
|
|
LangGraph Workflow Orchestration |
|
|
Design System Extractor v2 |
|
|
|
|
|
Defines the main workflow graph with agents, checkpoints, and transitions. |
|
|
""" |
|
|
|
|
|
from typing import Literal |
|
|
from datetime import datetime |
|
|
from langgraph.graph import StateGraph, END |
|
|
from langgraph.checkpoint.memory import MemorySaver |
|
|
|
|
|
from agents.state import AgentState, create_initial_state, get_stage_progress |
|
|
from core.token_schema import Viewport |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def discover_pages(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Agent 1 - Part 1: Discover pages from base URL. |
|
|
|
|
|
This node: |
|
|
1. Takes the base URL |
|
|
2. Crawls to find linked pages |
|
|
3. Classifies page types (homepage, listing, detail, etc.) |
|
|
4. Returns discovered pages for user confirmation |
|
|
""" |
|
|
from agents.crawler import PageDiscoverer |
|
|
|
|
|
state["current_stage"] = "discover" |
|
|
state["stage_started_at"] = datetime.now() |
|
|
|
|
|
try: |
|
|
discoverer = PageDiscoverer() |
|
|
pages = await discoverer.discover(state["base_url"]) |
|
|
|
|
|
state["discovered_pages"] = pages |
|
|
state["awaiting_human_input"] = True |
|
|
state["checkpoint_name"] = "confirm_pages" |
|
|
|
|
|
except Exception as e: |
|
|
state["errors"].append(f"Discovery failed: {str(e)}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
async def extract_tokens_desktop(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Agent 1 - Part 2a: Extract tokens from desktop viewport. |
|
|
""" |
|
|
from agents.extractor import TokenExtractor |
|
|
|
|
|
state["current_stage"] = "extract" |
|
|
|
|
|
try: |
|
|
extractor = TokenExtractor(viewport=Viewport.DESKTOP) |
|
|
result = await extractor.extract( |
|
|
pages=state["pages_to_crawl"], |
|
|
progress_callback=lambda p: state.update({"desktop_crawl_progress": p}) |
|
|
) |
|
|
|
|
|
state["desktop_extraction"] = result |
|
|
|
|
|
except Exception as e: |
|
|
state["errors"].append(f"Desktop extraction failed: {str(e)}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
async def extract_tokens_mobile(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Agent 1 - Part 2b: Extract tokens from mobile viewport. |
|
|
""" |
|
|
from agents.extractor import TokenExtractor |
|
|
|
|
|
try: |
|
|
extractor = TokenExtractor(viewport=Viewport.MOBILE) |
|
|
result = await extractor.extract( |
|
|
pages=state["pages_to_crawl"], |
|
|
progress_callback=lambda p: state.update({"mobile_crawl_progress": p}) |
|
|
) |
|
|
|
|
|
state["mobile_extraction"] = result |
|
|
|
|
|
except Exception as e: |
|
|
state["errors"].append(f"Mobile extraction failed: {str(e)}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
async def normalize_tokens(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Agent 2: Normalize and structure extracted tokens. |
|
|
""" |
|
|
from agents.normalizer import TokenNormalizer |
|
|
|
|
|
state["current_stage"] = "normalize" |
|
|
state["stage_started_at"] = datetime.now() |
|
|
|
|
|
try: |
|
|
normalizer = TokenNormalizer() |
|
|
|
|
|
if state["desktop_extraction"]: |
|
|
state["desktop_normalized"] = normalizer.normalize(state["desktop_extraction"]) |
|
|
|
|
|
if state["mobile_extraction"]: |
|
|
state["mobile_normalized"] = normalizer.normalize(state["mobile_extraction"]) |
|
|
|
|
|
|
|
|
state["awaiting_human_input"] = True |
|
|
state["checkpoint_name"] = "review_tokens" |
|
|
|
|
|
except Exception as e: |
|
|
state["errors"].append(f"Normalization failed: {str(e)}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
async def generate_recommendations(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Agent 3: Generate upgrade recommendations. |
|
|
""" |
|
|
from agents.advisor import DesignSystemAdvisor |
|
|
|
|
|
state["current_stage"] = "advise" |
|
|
state["stage_started_at"] = datetime.now() |
|
|
|
|
|
try: |
|
|
advisor = DesignSystemAdvisor() |
|
|
recommendations = await advisor.analyze_and_recommend( |
|
|
desktop=state["desktop_normalized"], |
|
|
mobile=state["mobile_normalized"], |
|
|
) |
|
|
|
|
|
state["upgrade_recommendations"] = recommendations |
|
|
|
|
|
|
|
|
state["awaiting_human_input"] = True |
|
|
state["checkpoint_name"] = "select_upgrades" |
|
|
|
|
|
except Exception as e: |
|
|
state["errors"].append(f"Recommendation generation failed: {str(e)}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
async def generate_final_tokens(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Agent 4: Generate final token JSON. |
|
|
""" |
|
|
from agents.generator import TokenGenerator |
|
|
|
|
|
state["current_stage"] = "generate" |
|
|
state["stage_started_at"] = datetime.now() |
|
|
|
|
|
try: |
|
|
generator = TokenGenerator() |
|
|
|
|
|
|
|
|
selections = { |
|
|
"type_scale": state["selected_type_scale"], |
|
|
"spacing_system": state["selected_spacing_system"], |
|
|
"naming_convention": state["selected_naming_convention"], |
|
|
"color_ramps": state["selected_color_ramps"], |
|
|
"a11y_fixes": state["selected_a11y_fixes"], |
|
|
} |
|
|
|
|
|
if state["desktop_normalized"]: |
|
|
state["desktop_final"] = generator.generate( |
|
|
normalized=state["desktop_normalized"], |
|
|
selections=selections, |
|
|
version=state["version_label"], |
|
|
) |
|
|
|
|
|
if state["mobile_normalized"]: |
|
|
state["mobile_final"] = generator.generate( |
|
|
normalized=state["mobile_normalized"], |
|
|
selections=selections, |
|
|
version=state["version_label"], |
|
|
) |
|
|
|
|
|
|
|
|
state["awaiting_human_input"] = True |
|
|
state["checkpoint_name"] = "approve_export" |
|
|
|
|
|
except Exception as e: |
|
|
state["errors"].append(f"Token generation failed: {str(e)}") |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
async def complete_workflow(state: AgentState) -> AgentState: |
|
|
""" |
|
|
Final node: Mark workflow as complete. |
|
|
""" |
|
|
state["current_stage"] = "export" |
|
|
state["awaiting_human_input"] = False |
|
|
state["checkpoint_name"] = None |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_page_confirmation(state: AgentState, confirmed_pages: list[str]) -> AgentState: |
|
|
"""Handle human confirmation of pages to crawl.""" |
|
|
state["pages_to_crawl"] = confirmed_pages |
|
|
state["awaiting_human_input"] = False |
|
|
state["checkpoint_name"] = None |
|
|
return state |
|
|
|
|
|
|
|
|
def handle_token_review( |
|
|
state: AgentState, |
|
|
color_decisions: dict[str, bool], |
|
|
typography_decisions: dict[str, bool], |
|
|
spacing_decisions: dict[str, bool], |
|
|
) -> AgentState: |
|
|
"""Handle human review of extracted tokens.""" |
|
|
state["accepted_colors"] = [k for k, v in color_decisions.items() if v] |
|
|
state["rejected_colors"] = [k for k, v in color_decisions.items() if not v] |
|
|
state["accepted_typography"] = [k for k, v in typography_decisions.items() if v] |
|
|
state["rejected_typography"] = [k for k, v in typography_decisions.items() if not v] |
|
|
state["accepted_spacing"] = [k for k, v in spacing_decisions.items() if v] |
|
|
state["rejected_spacing"] = [k for k, v in spacing_decisions.items() if not v] |
|
|
|
|
|
state["awaiting_human_input"] = False |
|
|
state["checkpoint_name"] = None |
|
|
return state |
|
|
|
|
|
|
|
|
def handle_upgrade_selection( |
|
|
state: AgentState, |
|
|
type_scale: str | None, |
|
|
spacing_system: str | None, |
|
|
naming_convention: str | None, |
|
|
color_ramps: dict[str, bool], |
|
|
a11y_fixes: list[str], |
|
|
) -> AgentState: |
|
|
"""Handle human selection of upgrade options.""" |
|
|
state["selected_type_scale"] = type_scale |
|
|
state["selected_spacing_system"] = spacing_system |
|
|
state["selected_naming_convention"] = naming_convention |
|
|
state["selected_color_ramps"] = color_ramps |
|
|
state["selected_a11y_fixes"] = a11y_fixes |
|
|
|
|
|
state["awaiting_human_input"] = False |
|
|
state["checkpoint_name"] = None |
|
|
return state |
|
|
|
|
|
|
|
|
def handle_export_approval(state: AgentState, version_label: str) -> AgentState: |
|
|
"""Handle human approval of final export.""" |
|
|
state["version_label"] = version_label |
|
|
state["awaiting_human_input"] = False |
|
|
state["checkpoint_name"] = None |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def route_after_discovery(state: AgentState) -> Literal["wait_for_pages", "extract"]: |
|
|
"""Route after discovery: wait for human or continue.""" |
|
|
if state["awaiting_human_input"]: |
|
|
return "wait_for_pages" |
|
|
return "extract" |
|
|
|
|
|
|
|
|
def route_after_extraction(state: AgentState) -> Literal["normalize", "error"]: |
|
|
"""Route after extraction: normalize or handle error.""" |
|
|
if state["desktop_extraction"] is None and state["mobile_extraction"] is None: |
|
|
return "error" |
|
|
return "normalize" |
|
|
|
|
|
|
|
|
def route_after_normalization(state: AgentState) -> Literal["wait_for_review", "advise"]: |
|
|
"""Route after normalization: wait for review or continue.""" |
|
|
if state["awaiting_human_input"]: |
|
|
return "wait_for_review" |
|
|
return "advise" |
|
|
|
|
|
|
|
|
def route_after_recommendations(state: AgentState) -> Literal["wait_for_selection", "generate"]: |
|
|
"""Route after recommendations: wait for selection or continue.""" |
|
|
if state["awaiting_human_input"]: |
|
|
return "wait_for_selection" |
|
|
return "generate" |
|
|
|
|
|
|
|
|
def route_after_generation(state: AgentState) -> Literal["wait_for_approval", "complete"]: |
|
|
"""Route after generation: wait for approval or complete.""" |
|
|
if state["awaiting_human_input"]: |
|
|
return "wait_for_approval" |
|
|
return "complete" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_workflow_graph() -> StateGraph: |
|
|
""" |
|
|
Build the main LangGraph workflow. |
|
|
|
|
|
Flow: |
|
|
1. discover_pages -> [human confirms pages] |
|
|
2. extract_desktop + extract_mobile (parallel) |
|
|
3. normalize_tokens -> [human reviews tokens] |
|
|
4. generate_recommendations -> [human selects upgrades] |
|
|
5. generate_final_tokens -> [human approves export] |
|
|
6. complete |
|
|
""" |
|
|
|
|
|
|
|
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workflow.add_node("discover", discover_pages) |
|
|
|
|
|
|
|
|
workflow.add_node("extract_desktop", extract_tokens_desktop) |
|
|
workflow.add_node("extract_mobile", extract_tokens_mobile) |
|
|
|
|
|
|
|
|
workflow.add_node("normalize", normalize_tokens) |
|
|
|
|
|
|
|
|
workflow.add_node("advise", generate_recommendations) |
|
|
|
|
|
|
|
|
workflow.add_node("generate", generate_final_tokens) |
|
|
|
|
|
|
|
|
workflow.add_node("complete", complete_workflow) |
|
|
|
|
|
|
|
|
workflow.add_node("wait_for_pages", lambda s: s) |
|
|
workflow.add_node("wait_for_review", lambda s: s) |
|
|
workflow.add_node("wait_for_selection", lambda s: s) |
|
|
workflow.add_node("wait_for_approval", lambda s: s) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workflow.set_entry_point("discover") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"discover", |
|
|
route_after_discovery, |
|
|
{ |
|
|
"wait_for_pages": "wait_for_pages", |
|
|
"extract": "extract_desktop", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("wait_for_pages", "extract_desktop") |
|
|
|
|
|
|
|
|
workflow.add_edge("extract_desktop", "extract_mobile") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"extract_mobile", |
|
|
route_after_extraction, |
|
|
{ |
|
|
"normalize": "normalize", |
|
|
"error": END, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"normalize", |
|
|
route_after_normalization, |
|
|
{ |
|
|
"wait_for_review": "wait_for_review", |
|
|
"advise": "advise", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("wait_for_review", "advise") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"advise", |
|
|
route_after_recommendations, |
|
|
{ |
|
|
"wait_for_selection": "wait_for_selection", |
|
|
"generate": "generate", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("wait_for_selection", "generate") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"generate", |
|
|
route_after_generation, |
|
|
{ |
|
|
"wait_for_approval": "wait_for_approval", |
|
|
"complete": "complete", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("wait_for_approval", "complete") |
|
|
|
|
|
|
|
|
workflow.add_edge("complete", END) |
|
|
|
|
|
return workflow |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WorkflowRunner: |
|
|
""" |
|
|
Manages workflow execution with human-in-the-loop support. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.graph = build_workflow_graph() |
|
|
self.checkpointer = MemorySaver() |
|
|
self.app = self.graph.compile(checkpointer=self.checkpointer) |
|
|
self.current_state: AgentState | None = None |
|
|
self.thread_id: str | None = None |
|
|
|
|
|
async def start(self, base_url: str, thread_id: str | None = None) -> AgentState: |
|
|
"""Start a new workflow.""" |
|
|
self.thread_id = thread_id or f"workflow_{datetime.now().timestamp()}" |
|
|
self.current_state = create_initial_state(base_url) |
|
|
|
|
|
config = {"configurable": {"thread_id": self.thread_id}} |
|
|
|
|
|
|
|
|
async for event in self.app.astream(self.current_state, config): |
|
|
self.current_state = event |
|
|
if self.current_state.get("awaiting_human_input"): |
|
|
break |
|
|
|
|
|
return self.current_state |
|
|
|
|
|
async def resume(self, human_input: dict) -> AgentState: |
|
|
"""Resume workflow after human input.""" |
|
|
if not self.current_state or not self.thread_id: |
|
|
raise ValueError("No active workflow to resume") |
|
|
|
|
|
checkpoint = self.current_state.get("checkpoint_name") |
|
|
|
|
|
|
|
|
if checkpoint == "confirm_pages": |
|
|
self.current_state = handle_page_confirmation( |
|
|
self.current_state, |
|
|
human_input.get("confirmed_pages", []) |
|
|
) |
|
|
elif checkpoint == "review_tokens": |
|
|
self.current_state = handle_token_review( |
|
|
self.current_state, |
|
|
human_input.get("color_decisions", {}), |
|
|
human_input.get("typography_decisions", {}), |
|
|
human_input.get("spacing_decisions", {}), |
|
|
) |
|
|
elif checkpoint == "select_upgrades": |
|
|
self.current_state = handle_upgrade_selection( |
|
|
self.current_state, |
|
|
human_input.get("type_scale"), |
|
|
human_input.get("spacing_system"), |
|
|
human_input.get("naming_convention"), |
|
|
human_input.get("color_ramps", {}), |
|
|
human_input.get("a11y_fixes", []), |
|
|
) |
|
|
elif checkpoint == "approve_export": |
|
|
self.current_state = handle_export_approval( |
|
|
self.current_state, |
|
|
human_input.get("version_label", "v1") |
|
|
) |
|
|
|
|
|
config = {"configurable": {"thread_id": self.thread_id}} |
|
|
|
|
|
|
|
|
async for event in self.app.astream(self.current_state, config): |
|
|
self.current_state = event |
|
|
if self.current_state.get("awaiting_human_input"): |
|
|
break |
|
|
|
|
|
return self.current_state |
|
|
|
|
|
def get_progress(self) -> dict: |
|
|
"""Get current workflow progress.""" |
|
|
if not self.current_state: |
|
|
return {"status": "not_started"} |
|
|
return get_stage_progress(self.current_state) |
|
|
|
|
|
def get_state(self) -> AgentState | None: |
|
|
"""Get current state.""" |
|
|
return self.current_state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_workflow() -> WorkflowRunner: |
|
|
"""Create a new workflow runner instance.""" |
|
|
return WorkflowRunner() |
|
|
|
|
|
|
|
|
async def run_discovery_only(base_url: str) -> list: |
|
|
"""Run only the discovery phase (for testing).""" |
|
|
from agents.crawler import PageDiscoverer |
|
|
|
|
|
discoverer = PageDiscoverer() |
|
|
return await discoverer.discover(base_url) |
|
|
|
|
|
|
|
|
async def run_extraction_only(pages: list[str], viewport: Viewport) -> dict: |
|
|
"""Run only the extraction phase (for testing).""" |
|
|
from agents.extractor import TokenExtractor |
|
|
|
|
|
extractor = TokenExtractor(viewport=viewport) |
|
|
return await extractor.extract(pages) |
|
|
|