""" LangGraph Workflow Orchestration Design System Extractor v2 Defines the main workflow graph with agents, checkpoints, and transitions. """ from typing import Literal from datetime import datetime from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from agents.state import AgentState, create_initial_state, get_stage_progress from core.token_schema import Viewport # ============================================================================= # NODE FUNCTIONS (Agent Entry Points) # ============================================================================= async def discover_pages(state: AgentState) -> AgentState: """ Agent 1 - Part 1: Discover pages from base URL. This node: 1. Takes the base URL 2. Crawls to find linked pages 3. Classifies page types (homepage, listing, detail, etc.) 4. Returns discovered pages for user confirmation """ from agents.crawler import PageDiscoverer state["current_stage"] = "discover" state["stage_started_at"] = datetime.now() try: discoverer = PageDiscoverer() pages = await discoverer.discover(state["base_url"]) state["discovered_pages"] = pages state["awaiting_human_input"] = True state["checkpoint_name"] = "confirm_pages" except Exception as e: state["errors"].append(f"Discovery failed: {str(e)}") return state async def extract_tokens_desktop(state: AgentState) -> AgentState: """ Agent 1 - Part 2a: Extract tokens from desktop viewport. """ from agents.extractor import TokenExtractor state["current_stage"] = "extract" try: extractor = TokenExtractor(viewport=Viewport.DESKTOP) result = await extractor.extract( pages=state["pages_to_crawl"], progress_callback=lambda p: state.update({"desktop_crawl_progress": p}) ) state["desktop_extraction"] = result except Exception as e: state["errors"].append(f"Desktop extraction failed: {str(e)}") return state async def extract_tokens_mobile(state: AgentState) -> AgentState: """ Agent 1 - Part 2b: Extract tokens from mobile viewport. """ from agents.extractor import TokenExtractor try: extractor = TokenExtractor(viewport=Viewport.MOBILE) result = await extractor.extract( pages=state["pages_to_crawl"], progress_callback=lambda p: state.update({"mobile_crawl_progress": p}) ) state["mobile_extraction"] = result except Exception as e: state["errors"].append(f"Mobile extraction failed: {str(e)}") return state async def normalize_tokens(state: AgentState) -> AgentState: """ Agent 2: Normalize and structure extracted tokens. """ from agents.normalizer import TokenNormalizer state["current_stage"] = "normalize" state["stage_started_at"] = datetime.now() try: normalizer = TokenNormalizer() if state["desktop_extraction"]: state["desktop_normalized"] = normalizer.normalize(state["desktop_extraction"]) if state["mobile_extraction"]: state["mobile_normalized"] = normalizer.normalize(state["mobile_extraction"]) # After normalization, wait for human review state["awaiting_human_input"] = True state["checkpoint_name"] = "review_tokens" except Exception as e: state["errors"].append(f"Normalization failed: {str(e)}") return state async def generate_recommendations(state: AgentState) -> AgentState: """ Agent 3: Generate upgrade recommendations. """ from agents.advisor import DesignSystemAdvisor state["current_stage"] = "advise" state["stage_started_at"] = datetime.now() try: advisor = DesignSystemAdvisor() recommendations = await advisor.analyze_and_recommend( desktop=state["desktop_normalized"], mobile=state["mobile_normalized"], ) state["upgrade_recommendations"] = recommendations # Wait for human to select upgrades state["awaiting_human_input"] = True state["checkpoint_name"] = "select_upgrades" except Exception as e: state["errors"].append(f"Recommendation generation failed: {str(e)}") return state async def generate_final_tokens(state: AgentState) -> AgentState: """ Agent 4: Generate final token JSON. """ from agents.generator import TokenGenerator state["current_stage"] = "generate" state["stage_started_at"] = datetime.now() try: generator = TokenGenerator() # Build selection config from user choices selections = { "type_scale": state["selected_type_scale"], "spacing_system": state["selected_spacing_system"], "naming_convention": state["selected_naming_convention"], "color_ramps": state["selected_color_ramps"], "a11y_fixes": state["selected_a11y_fixes"], } if state["desktop_normalized"]: state["desktop_final"] = generator.generate( normalized=state["desktop_normalized"], selections=selections, version=state["version_label"], ) if state["mobile_normalized"]: state["mobile_final"] = generator.generate( normalized=state["mobile_normalized"], selections=selections, version=state["version_label"], ) # Wait for human to approve export state["awaiting_human_input"] = True state["checkpoint_name"] = "approve_export" except Exception as e: state["errors"].append(f"Token generation failed: {str(e)}") return state async def complete_workflow(state: AgentState) -> AgentState: """ Final node: Mark workflow as complete. """ state["current_stage"] = "export" state["awaiting_human_input"] = False state["checkpoint_name"] = None return state # ============================================================================= # HUMAN CHECKPOINT HANDLERS # ============================================================================= def handle_page_confirmation(state: AgentState, confirmed_pages: list[str]) -> AgentState: """Handle human confirmation of pages to crawl.""" state["pages_to_crawl"] = confirmed_pages state["awaiting_human_input"] = False state["checkpoint_name"] = None return state def handle_token_review( state: AgentState, color_decisions: dict[str, bool], typography_decisions: dict[str, bool], spacing_decisions: dict[str, bool], ) -> AgentState: """Handle human review of extracted tokens.""" state["accepted_colors"] = [k for k, v in color_decisions.items() if v] state["rejected_colors"] = [k for k, v in color_decisions.items() if not v] state["accepted_typography"] = [k for k, v in typography_decisions.items() if v] state["rejected_typography"] = [k for k, v in typography_decisions.items() if not v] state["accepted_spacing"] = [k for k, v in spacing_decisions.items() if v] state["rejected_spacing"] = [k for k, v in spacing_decisions.items() if not v] state["awaiting_human_input"] = False state["checkpoint_name"] = None return state def handle_upgrade_selection( state: AgentState, type_scale: str | None, spacing_system: str | None, naming_convention: str | None, color_ramps: dict[str, bool], a11y_fixes: list[str], ) -> AgentState: """Handle human selection of upgrade options.""" state["selected_type_scale"] = type_scale state["selected_spacing_system"] = spacing_system state["selected_naming_convention"] = naming_convention state["selected_color_ramps"] = color_ramps state["selected_a11y_fixes"] = a11y_fixes state["awaiting_human_input"] = False state["checkpoint_name"] = None return state def handle_export_approval(state: AgentState, version_label: str) -> AgentState: """Handle human approval of final export.""" state["version_label"] = version_label state["awaiting_human_input"] = False state["checkpoint_name"] = None return state # ============================================================================= # ROUTING FUNCTIONS # ============================================================================= def route_after_discovery(state: AgentState) -> Literal["wait_for_pages", "extract"]: """Route after discovery: wait for human or continue.""" if state["awaiting_human_input"]: return "wait_for_pages" return "extract" def route_after_extraction(state: AgentState) -> Literal["normalize", "error"]: """Route after extraction: normalize or handle error.""" if state["desktop_extraction"] is None and state["mobile_extraction"] is None: return "error" return "normalize" def route_after_normalization(state: AgentState) -> Literal["wait_for_review", "advise"]: """Route after normalization: wait for review or continue.""" if state["awaiting_human_input"]: return "wait_for_review" return "advise" def route_after_recommendations(state: AgentState) -> Literal["wait_for_selection", "generate"]: """Route after recommendations: wait for selection or continue.""" if state["awaiting_human_input"]: return "wait_for_selection" return "generate" def route_after_generation(state: AgentState) -> Literal["wait_for_approval", "complete"]: """Route after generation: wait for approval or complete.""" if state["awaiting_human_input"]: return "wait_for_approval" return "complete" # ============================================================================= # GRAPH BUILDER # ============================================================================= def build_workflow_graph() -> StateGraph: """ Build the main LangGraph workflow. Flow: 1. discover_pages -> [human confirms pages] 2. extract_desktop + extract_mobile (parallel) 3. normalize_tokens -> [human reviews tokens] 4. generate_recommendations -> [human selects upgrades] 5. generate_final_tokens -> [human approves export] 6. complete """ # Create the graph workflow = StateGraph(AgentState) # ------------------------------------------------------------------------- # ADD NODES # ------------------------------------------------------------------------- # Discovery workflow.add_node("discover", discover_pages) # Extraction (will be parallel in subgraph) workflow.add_node("extract_desktop", extract_tokens_desktop) workflow.add_node("extract_mobile", extract_tokens_mobile) # Normalization workflow.add_node("normalize", normalize_tokens) # Advisor workflow.add_node("advise", generate_recommendations) # Generator workflow.add_node("generate", generate_final_tokens) # Completion workflow.add_node("complete", complete_workflow) # Human checkpoint placeholder nodes (these just pass through) workflow.add_node("wait_for_pages", lambda s: s) workflow.add_node("wait_for_review", lambda s: s) workflow.add_node("wait_for_selection", lambda s: s) workflow.add_node("wait_for_approval", lambda s: s) # ------------------------------------------------------------------------- # ADD EDGES # ------------------------------------------------------------------------- # Entry point workflow.set_entry_point("discover") # Discovery -> (wait or extract) workflow.add_conditional_edges( "discover", route_after_discovery, { "wait_for_pages": "wait_for_pages", "extract": "extract_desktop", } ) # After human confirms pages -> extract workflow.add_edge("wait_for_pages", "extract_desktop") # Parallel extraction workflow.add_edge("extract_desktop", "extract_mobile") # After extraction -> normalize workflow.add_conditional_edges( "extract_mobile", route_after_extraction, { "normalize": "normalize", "error": END, } ) # Normalization -> (wait or advise) workflow.add_conditional_edges( "normalize", route_after_normalization, { "wait_for_review": "wait_for_review", "advise": "advise", } ) # After human reviews -> advise workflow.add_edge("wait_for_review", "advise") # Advisor -> (wait or generate) workflow.add_conditional_edges( "advise", route_after_recommendations, { "wait_for_selection": "wait_for_selection", "generate": "generate", } ) # After human selects upgrades -> generate workflow.add_edge("wait_for_selection", "generate") # Generation -> (wait or complete) workflow.add_conditional_edges( "generate", route_after_generation, { "wait_for_approval": "wait_for_approval", "complete": "complete", } ) # After human approves -> complete workflow.add_edge("wait_for_approval", "complete") # Complete -> END workflow.add_edge("complete", END) return workflow # ============================================================================= # WORKFLOW RUNNER # ============================================================================= class WorkflowRunner: """ Manages workflow execution with human-in-the-loop support. """ def __init__(self): self.graph = build_workflow_graph() self.checkpointer = MemorySaver() self.app = self.graph.compile(checkpointer=self.checkpointer) self.current_state: AgentState | None = None self.thread_id: str | None = None async def start(self, base_url: str, thread_id: str | None = None) -> AgentState: """Start a new workflow.""" self.thread_id = thread_id or f"workflow_{datetime.now().timestamp()}" self.current_state = create_initial_state(base_url) config = {"configurable": {"thread_id": self.thread_id}} # Run until first human checkpoint async for event in self.app.astream(self.current_state, config): self.current_state = event if self.current_state.get("awaiting_human_input"): break return self.current_state async def resume(self, human_input: dict) -> AgentState: """Resume workflow after human input.""" if not self.current_state or not self.thread_id: raise ValueError("No active workflow to resume") checkpoint = self.current_state.get("checkpoint_name") # Apply human input based on checkpoint if checkpoint == "confirm_pages": self.current_state = handle_page_confirmation( self.current_state, human_input.get("confirmed_pages", []) ) elif checkpoint == "review_tokens": self.current_state = handle_token_review( self.current_state, human_input.get("color_decisions", {}), human_input.get("typography_decisions", {}), human_input.get("spacing_decisions", {}), ) elif checkpoint == "select_upgrades": self.current_state = handle_upgrade_selection( self.current_state, human_input.get("type_scale"), human_input.get("spacing_system"), human_input.get("naming_convention"), human_input.get("color_ramps", {}), human_input.get("a11y_fixes", []), ) elif checkpoint == "approve_export": self.current_state = handle_export_approval( self.current_state, human_input.get("version_label", "v1") ) config = {"configurable": {"thread_id": self.thread_id}} # Continue until next checkpoint or completion async for event in self.app.astream(self.current_state, config): self.current_state = event if self.current_state.get("awaiting_human_input"): break return self.current_state def get_progress(self) -> dict: """Get current workflow progress.""" if not self.current_state: return {"status": "not_started"} return get_stage_progress(self.current_state) def get_state(self) -> AgentState | None: """Get current state.""" return self.current_state # ============================================================================= # CONVENIENCE FUNCTIONS # ============================================================================= def create_workflow() -> WorkflowRunner: """Create a new workflow runner instance.""" return WorkflowRunner() async def run_discovery_only(base_url: str) -> list: """Run only the discovery phase (for testing).""" from agents.crawler import PageDiscoverer discoverer = PageDiscoverer() return await discoverer.discover(base_url) async def run_extraction_only(pages: list[str], viewport: Viewport) -> dict: """Run only the extraction phase (for testing).""" from agents.extractor import TokenExtractor extractor = TokenExtractor(viewport=viewport) return await extractor.extract(pages)