Spaces:
Sleeping
Sleeping
| """ | |
| LangGraph Workflow Orchestration | |
| Design System Extractor v2 | |
| Defines the main workflow graph with agents, checkpoints, and transitions. | |
| """ | |
| from typing import Literal | |
| from datetime import datetime | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from agents.state import AgentState, create_initial_state, get_stage_progress | |
| from core.token_schema import Viewport | |
| # ============================================================================= | |
| # NODE FUNCTIONS (Agent Entry Points) | |
| # ============================================================================= | |
| async def discover_pages(state: AgentState) -> AgentState: | |
| """ | |
| Agent 1 - Part 1: Discover pages from base URL. | |
| This node: | |
| 1. Takes the base URL | |
| 2. Crawls to find linked pages | |
| 3. Classifies page types (homepage, listing, detail, etc.) | |
| 4. Returns discovered pages for user confirmation | |
| """ | |
| from agents.crawler import PageDiscoverer | |
| state["current_stage"] = "discover" | |
| state["stage_started_at"] = datetime.now() | |
| try: | |
| discoverer = PageDiscoverer() | |
| pages = await discoverer.discover(state["base_url"]) | |
| state["discovered_pages"] = pages | |
| state["awaiting_human_input"] = True | |
| state["checkpoint_name"] = "confirm_pages" | |
| except Exception as e: | |
| state["errors"].append(f"Discovery failed: {str(e)}") | |
| return state | |
| async def extract_tokens_desktop(state: AgentState) -> AgentState: | |
| """ | |
| Agent 1 - Part 2a: Extract tokens from desktop viewport. | |
| """ | |
| from agents.extractor import TokenExtractor | |
| state["current_stage"] = "extract" | |
| try: | |
| extractor = TokenExtractor(viewport=Viewport.DESKTOP) | |
| result = await extractor.extract( | |
| pages=state["pages_to_crawl"], | |
| progress_callback=lambda p: state.update({"desktop_crawl_progress": p}) | |
| ) | |
| state["desktop_extraction"] = result | |
| except Exception as e: | |
| state["errors"].append(f"Desktop extraction failed: {str(e)}") | |
| return state | |
| async def extract_tokens_mobile(state: AgentState) -> AgentState: | |
| """ | |
| Agent 1 - Part 2b: Extract tokens from mobile viewport. | |
| """ | |
| from agents.extractor import TokenExtractor | |
| try: | |
| extractor = TokenExtractor(viewport=Viewport.MOBILE) | |
| result = await extractor.extract( | |
| pages=state["pages_to_crawl"], | |
| progress_callback=lambda p: state.update({"mobile_crawl_progress": p}) | |
| ) | |
| state["mobile_extraction"] = result | |
| except Exception as e: | |
| state["errors"].append(f"Mobile extraction failed: {str(e)}") | |
| return state | |
| async def normalize_tokens(state: AgentState) -> AgentState: | |
| """ | |
| Agent 2: Normalize and structure extracted tokens. | |
| """ | |
| from agents.normalizer import TokenNormalizer | |
| state["current_stage"] = "normalize" | |
| state["stage_started_at"] = datetime.now() | |
| try: | |
| normalizer = TokenNormalizer() | |
| if state["desktop_extraction"]: | |
| state["desktop_normalized"] = normalizer.normalize(state["desktop_extraction"]) | |
| if state["mobile_extraction"]: | |
| state["mobile_normalized"] = normalizer.normalize(state["mobile_extraction"]) | |
| # After normalization, wait for human review | |
| state["awaiting_human_input"] = True | |
| state["checkpoint_name"] = "review_tokens" | |
| except Exception as e: | |
| state["errors"].append(f"Normalization failed: {str(e)}") | |
| return state | |
| async def generate_recommendations(state: AgentState) -> AgentState: | |
| """ | |
| Agent 3: Generate upgrade recommendations. | |
| """ | |
| from agents.advisor import DesignSystemAdvisor | |
| state["current_stage"] = "advise" | |
| state["stage_started_at"] = datetime.now() | |
| try: | |
| advisor = DesignSystemAdvisor() | |
| recommendations = await advisor.analyze_and_recommend( | |
| desktop=state["desktop_normalized"], | |
| mobile=state["mobile_normalized"], | |
| ) | |
| state["upgrade_recommendations"] = recommendations | |
| # Wait for human to select upgrades | |
| state["awaiting_human_input"] = True | |
| state["checkpoint_name"] = "select_upgrades" | |
| except Exception as e: | |
| state["errors"].append(f"Recommendation generation failed: {str(e)}") | |
| return state | |
| async def generate_final_tokens(state: AgentState) -> AgentState: | |
| """ | |
| Agent 4: Generate final token JSON. | |
| """ | |
| from agents.generator import TokenGenerator | |
| state["current_stage"] = "generate" | |
| state["stage_started_at"] = datetime.now() | |
| try: | |
| generator = TokenGenerator() | |
| # Build selection config from user choices | |
| selections = { | |
| "type_scale": state["selected_type_scale"], | |
| "spacing_system": state["selected_spacing_system"], | |
| "naming_convention": state["selected_naming_convention"], | |
| "color_ramps": state["selected_color_ramps"], | |
| "a11y_fixes": state["selected_a11y_fixes"], | |
| } | |
| if state["desktop_normalized"]: | |
| state["desktop_final"] = generator.generate( | |
| normalized=state["desktop_normalized"], | |
| selections=selections, | |
| version=state["version_label"], | |
| ) | |
| if state["mobile_normalized"]: | |
| state["mobile_final"] = generator.generate( | |
| normalized=state["mobile_normalized"], | |
| selections=selections, | |
| version=state["version_label"], | |
| ) | |
| # Wait for human to approve export | |
| state["awaiting_human_input"] = True | |
| state["checkpoint_name"] = "approve_export" | |
| except Exception as e: | |
| state["errors"].append(f"Token generation failed: {str(e)}") | |
| return state | |
| async def complete_workflow(state: AgentState) -> AgentState: | |
| """ | |
| Final node: Mark workflow as complete. | |
| """ | |
| state["current_stage"] = "export" | |
| state["awaiting_human_input"] = False | |
| state["checkpoint_name"] = None | |
| return state | |
| # ============================================================================= | |
| # HUMAN CHECKPOINT HANDLERS | |
| # ============================================================================= | |
| def handle_page_confirmation(state: AgentState, confirmed_pages: list[str]) -> AgentState: | |
| """Handle human confirmation of pages to crawl.""" | |
| state["pages_to_crawl"] = confirmed_pages | |
| state["awaiting_human_input"] = False | |
| state["checkpoint_name"] = None | |
| return state | |
| def handle_token_review( | |
| state: AgentState, | |
| color_decisions: dict[str, bool], | |
| typography_decisions: dict[str, bool], | |
| spacing_decisions: dict[str, bool], | |
| ) -> AgentState: | |
| """Handle human review of extracted tokens.""" | |
| state["accepted_colors"] = [k for k, v in color_decisions.items() if v] | |
| state["rejected_colors"] = [k for k, v in color_decisions.items() if not v] | |
| state["accepted_typography"] = [k for k, v in typography_decisions.items() if v] | |
| state["rejected_typography"] = [k for k, v in typography_decisions.items() if not v] | |
| state["accepted_spacing"] = [k for k, v in spacing_decisions.items() if v] | |
| state["rejected_spacing"] = [k for k, v in spacing_decisions.items() if not v] | |
| state["awaiting_human_input"] = False | |
| state["checkpoint_name"] = None | |
| return state | |
| def handle_upgrade_selection( | |
| state: AgentState, | |
| type_scale: str | None, | |
| spacing_system: str | None, | |
| naming_convention: str | None, | |
| color_ramps: dict[str, bool], | |
| a11y_fixes: list[str], | |
| ) -> AgentState: | |
| """Handle human selection of upgrade options.""" | |
| state["selected_type_scale"] = type_scale | |
| state["selected_spacing_system"] = spacing_system | |
| state["selected_naming_convention"] = naming_convention | |
| state["selected_color_ramps"] = color_ramps | |
| state["selected_a11y_fixes"] = a11y_fixes | |
| state["awaiting_human_input"] = False | |
| state["checkpoint_name"] = None | |
| return state | |
| def handle_export_approval(state: AgentState, version_label: str) -> AgentState: | |
| """Handle human approval of final export.""" | |
| state["version_label"] = version_label | |
| state["awaiting_human_input"] = False | |
| state["checkpoint_name"] = None | |
| return state | |
| # ============================================================================= | |
| # ROUTING FUNCTIONS | |
| # ============================================================================= | |
| def route_after_discovery(state: AgentState) -> Literal["wait_for_pages", "extract"]: | |
| """Route after discovery: wait for human or continue.""" | |
| if state["awaiting_human_input"]: | |
| return "wait_for_pages" | |
| return "extract" | |
| def route_after_extraction(state: AgentState) -> Literal["normalize", "error"]: | |
| """Route after extraction: normalize or handle error.""" | |
| if state["desktop_extraction"] is None and state["mobile_extraction"] is None: | |
| return "error" | |
| return "normalize" | |
| def route_after_normalization(state: AgentState) -> Literal["wait_for_review", "advise"]: | |
| """Route after normalization: wait for review or continue.""" | |
| if state["awaiting_human_input"]: | |
| return "wait_for_review" | |
| return "advise" | |
| def route_after_recommendations(state: AgentState) -> Literal["wait_for_selection", "generate"]: | |
| """Route after recommendations: wait for selection or continue.""" | |
| if state["awaiting_human_input"]: | |
| return "wait_for_selection" | |
| return "generate" | |
| def route_after_generation(state: AgentState) -> Literal["wait_for_approval", "complete"]: | |
| """Route after generation: wait for approval or complete.""" | |
| if state["awaiting_human_input"]: | |
| return "wait_for_approval" | |
| return "complete" | |
| # ============================================================================= | |
| # GRAPH BUILDER | |
| # ============================================================================= | |
| def build_workflow_graph() -> StateGraph: | |
| """ | |
| Build the main LangGraph workflow. | |
| Flow: | |
| 1. discover_pages -> [human confirms pages] | |
| 2. extract_desktop + extract_mobile (parallel) | |
| 3. normalize_tokens -> [human reviews tokens] | |
| 4. generate_recommendations -> [human selects upgrades] | |
| 5. generate_final_tokens -> [human approves export] | |
| 6. complete | |
| """ | |
| # Create the graph | |
| workflow = StateGraph(AgentState) | |
| # ------------------------------------------------------------------------- | |
| # ADD NODES | |
| # ------------------------------------------------------------------------- | |
| # Discovery | |
| workflow.add_node("discover", discover_pages) | |
| # Extraction (will be parallel in subgraph) | |
| workflow.add_node("extract_desktop", extract_tokens_desktop) | |
| workflow.add_node("extract_mobile", extract_tokens_mobile) | |
| # Normalization | |
| workflow.add_node("normalize", normalize_tokens) | |
| # Advisor | |
| workflow.add_node("advise", generate_recommendations) | |
| # Generator | |
| workflow.add_node("generate", generate_final_tokens) | |
| # Completion | |
| workflow.add_node("complete", complete_workflow) | |
| # Human checkpoint placeholder nodes (these just pass through) | |
| workflow.add_node("wait_for_pages", lambda s: s) | |
| workflow.add_node("wait_for_review", lambda s: s) | |
| workflow.add_node("wait_for_selection", lambda s: s) | |
| workflow.add_node("wait_for_approval", lambda s: s) | |
| # ------------------------------------------------------------------------- | |
| # ADD EDGES | |
| # ------------------------------------------------------------------------- | |
| # Entry point | |
| workflow.set_entry_point("discover") | |
| # Discovery -> (wait or extract) | |
| workflow.add_conditional_edges( | |
| "discover", | |
| route_after_discovery, | |
| { | |
| "wait_for_pages": "wait_for_pages", | |
| "extract": "extract_desktop", | |
| } | |
| ) | |
| # After human confirms pages -> extract | |
| workflow.add_edge("wait_for_pages", "extract_desktop") | |
| # Parallel extraction | |
| workflow.add_edge("extract_desktop", "extract_mobile") | |
| # After extraction -> normalize | |
| workflow.add_conditional_edges( | |
| "extract_mobile", | |
| route_after_extraction, | |
| { | |
| "normalize": "normalize", | |
| "error": END, | |
| } | |
| ) | |
| # Normalization -> (wait or advise) | |
| workflow.add_conditional_edges( | |
| "normalize", | |
| route_after_normalization, | |
| { | |
| "wait_for_review": "wait_for_review", | |
| "advise": "advise", | |
| } | |
| ) | |
| # After human reviews -> advise | |
| workflow.add_edge("wait_for_review", "advise") | |
| # Advisor -> (wait or generate) | |
| workflow.add_conditional_edges( | |
| "advise", | |
| route_after_recommendations, | |
| { | |
| "wait_for_selection": "wait_for_selection", | |
| "generate": "generate", | |
| } | |
| ) | |
| # After human selects upgrades -> generate | |
| workflow.add_edge("wait_for_selection", "generate") | |
| # Generation -> (wait or complete) | |
| workflow.add_conditional_edges( | |
| "generate", | |
| route_after_generation, | |
| { | |
| "wait_for_approval": "wait_for_approval", | |
| "complete": "complete", | |
| } | |
| ) | |
| # After human approves -> complete | |
| workflow.add_edge("wait_for_approval", "complete") | |
| # Complete -> END | |
| workflow.add_edge("complete", END) | |
| return workflow | |
| # ============================================================================= | |
| # WORKFLOW RUNNER | |
| # ============================================================================= | |
| class WorkflowRunner: | |
| """ | |
| Manages workflow execution with human-in-the-loop support. | |
| """ | |
| def __init__(self): | |
| self.graph = build_workflow_graph() | |
| self.checkpointer = MemorySaver() | |
| self.app = self.graph.compile(checkpointer=self.checkpointer) | |
| self.current_state: AgentState | None = None | |
| self.thread_id: str | None = None | |
| async def start(self, base_url: str, thread_id: str | None = None) -> AgentState: | |
| """Start a new workflow.""" | |
| self.thread_id = thread_id or f"workflow_{datetime.now().timestamp()}" | |
| self.current_state = create_initial_state(base_url) | |
| config = {"configurable": {"thread_id": self.thread_id}} | |
| # Run until first human checkpoint | |
| async for event in self.app.astream(self.current_state, config): | |
| self.current_state = event | |
| if self.current_state.get("awaiting_human_input"): | |
| break | |
| return self.current_state | |
| async def resume(self, human_input: dict) -> AgentState: | |
| """Resume workflow after human input.""" | |
| if not self.current_state or not self.thread_id: | |
| raise ValueError("No active workflow to resume") | |
| checkpoint = self.current_state.get("checkpoint_name") | |
| # Apply human input based on checkpoint | |
| if checkpoint == "confirm_pages": | |
| self.current_state = handle_page_confirmation( | |
| self.current_state, | |
| human_input.get("confirmed_pages", []) | |
| ) | |
| elif checkpoint == "review_tokens": | |
| self.current_state = handle_token_review( | |
| self.current_state, | |
| human_input.get("color_decisions", {}), | |
| human_input.get("typography_decisions", {}), | |
| human_input.get("spacing_decisions", {}), | |
| ) | |
| elif checkpoint == "select_upgrades": | |
| self.current_state = handle_upgrade_selection( | |
| self.current_state, | |
| human_input.get("type_scale"), | |
| human_input.get("spacing_system"), | |
| human_input.get("naming_convention"), | |
| human_input.get("color_ramps", {}), | |
| human_input.get("a11y_fixes", []), | |
| ) | |
| elif checkpoint == "approve_export": | |
| self.current_state = handle_export_approval( | |
| self.current_state, | |
| human_input.get("version_label", "v1") | |
| ) | |
| config = {"configurable": {"thread_id": self.thread_id}} | |
| # Continue until next checkpoint or completion | |
| async for event in self.app.astream(self.current_state, config): | |
| self.current_state = event | |
| if self.current_state.get("awaiting_human_input"): | |
| break | |
| return self.current_state | |
| def get_progress(self) -> dict: | |
| """Get current workflow progress.""" | |
| if not self.current_state: | |
| return {"status": "not_started"} | |
| return get_stage_progress(self.current_state) | |
| def get_state(self) -> AgentState | None: | |
| """Get current state.""" | |
| return self.current_state | |
| # ============================================================================= | |
| # CONVENIENCE FUNCTIONS | |
| # ============================================================================= | |
| def create_workflow() -> WorkflowRunner: | |
| """Create a new workflow runner instance.""" | |
| return WorkflowRunner() | |
| async def run_discovery_only(base_url: str) -> list: | |
| """Run only the discovery phase (for testing).""" | |
| from agents.crawler import PageDiscoverer | |
| discoverer = PageDiscoverer() | |
| return await discoverer.discover(base_url) | |
| async def run_extraction_only(pages: list[str], viewport: Viewport) -> dict: | |
| """Run only the extraction phase (for testing).""" | |
| from agents.extractor import TokenExtractor | |
| extractor = TokenExtractor(viewport=viewport) | |
| return await extractor.extract(pages) | |