""" Agent 1: Token Extractor Design System Extractor v2 Persona: Meticulous Design Archaeologist Responsibilities: - Crawl pages at specified viewport - Extract computed styles from all elements - Collect colors, typography, spacing, radius, shadows - Track frequency and context for each token """ import asyncio import re from typing import Optional, Callable from datetime import datetime from collections import defaultdict from playwright.async_api import async_playwright, Browser, Page, BrowserContext from core.token_schema import ( Viewport, ExtractedTokens, ColorToken, TypographyToken, SpacingToken, RadiusToken, ShadowToken, FontFamily, TokenSource, Confidence, ) from core.color_utils import ( normalize_hex, parse_color, get_contrast_with_white, get_contrast_with_black, check_wcag_compliance, ) from config.settings import get_settings class TokenExtractor: """ Extracts design tokens from web pages. This is the second part of Agent 1's job — after pages are confirmed, we crawl and extract all CSS values. """ def __init__(self, viewport: Viewport = Viewport.DESKTOP): self.settings = get_settings() self.viewport = viewport self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None # Token collection self.colors: dict[str, ColorToken] = {} self.typography: dict[str, TypographyToken] = {} self.spacing: dict[str, SpacingToken] = {} self.radius: dict[str, RadiusToken] = {} self.shadows: dict[str, ShadowToken] = {} # Font tracking self.font_families: dict[str, FontFamily] = {} # Statistics self.total_elements = 0 self.errors: list[str] = [] self.warnings: list[str] = [] async def __aenter__(self): """Async context manager entry.""" await self._init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self._close_browser() async def _init_browser(self): """Initialize Playwright browser.""" playwright = await async_playwright().start() self.browser = await playwright.chromium.launch( headless=self.settings.browser.headless ) # Set viewport based on extraction mode if self.viewport == Viewport.DESKTOP: width = self.settings.viewport.desktop_width height = self.settings.viewport.desktop_height else: width = self.settings.viewport.mobile_width height = self.settings.viewport.mobile_height self.context = await self.browser.new_context( viewport={"width": width, "height": height}, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ) async def _close_browser(self): """Close browser and cleanup.""" if self.context: await self.context.close() if self.browser: await self.browser.close() async def _scroll_page(self, page: Page): """Scroll page to load lazy content.""" await page.evaluate(""" async () => { const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); const height = document.body.scrollHeight; const step = window.innerHeight; for (let y = 0; y < height; y += step) { window.scrollTo(0, y); await delay(100); } // Scroll back to top window.scrollTo(0, 0); } """) # Wait for network idle after scrolling await page.wait_for_load_state("networkidle", timeout=self.settings.browser.network_idle_timeout) async def _extract_styles_from_page(self, page: Page) -> dict: """ Extract computed styles from all elements on the page. This is the core extraction logic — we get getComputedStyle for every element. """ styles_data = await page.evaluate(""" () => { const elements = document.querySelectorAll('*'); const results = { colors: [], typography: [], spacing: [], radius: [], shadows: [], elements_count: elements.length, }; const colorProperties = [ 'color', 'background-color', 'border-color', 'border-top-color', 'border-right-color', 'border-bottom-color', 'border-left-color', 'outline-color', 'text-decoration-color', ]; const spacingProperties = [ 'margin-top', 'margin-right', 'margin-bottom', 'margin-left', 'padding-top', 'padding-right', 'padding-bottom', 'padding-left', 'gap', 'row-gap', 'column-gap', ]; elements.forEach(el => { const tag = el.tagName.toLowerCase(); const styles = window.getComputedStyle(el); // Skip invisible elements if (styles.display === 'none' || styles.visibility === 'hidden') { return; } // --- COLORS --- colorProperties.forEach(prop => { const value = styles.getPropertyValue(prop); if (value && value !== 'rgba(0, 0, 0, 0)' && value !== 'transparent') { results.colors.push({ value: value, property: prop, element: tag, context: prop.includes('background') ? 'background' : prop.includes('border') ? 'border' : 'text', }); } }); // --- TYPOGRAPHY --- const fontFamily = styles.getPropertyValue('font-family'); const fontSize = styles.getPropertyValue('font-size'); const fontWeight = styles.getPropertyValue('font-weight'); const lineHeight = styles.getPropertyValue('line-height'); const letterSpacing = styles.getPropertyValue('letter-spacing'); if (fontSize && fontFamily) { results.typography.push({ fontFamily: fontFamily, fontSize: fontSize, fontWeight: fontWeight, lineHeight: lineHeight, letterSpacing: letterSpacing, element: tag, }); } // --- SPACING --- spacingProperties.forEach(prop => { const value = styles.getPropertyValue(prop); if (value && value !== '0px' && value !== 'auto' && value !== 'normal') { const px = parseFloat(value); if (!isNaN(px) && px > 0 && px < 500) { results.spacing.push({ value: value, valuePx: Math.round(px), property: prop, context: prop.includes('margin') ? 'margin' : prop.includes('padding') ? 'padding' : 'gap', }); } } }); // --- BORDER RADIUS --- const radiusProps = [ 'border-radius', 'border-top-left-radius', 'border-top-right-radius', 'border-bottom-left-radius', 'border-bottom-right-radius', ]; radiusProps.forEach(prop => { const value = styles.getPropertyValue(prop); if (value && value !== '0px') { results.radius.push({ value: value, element: tag, }); } }); // --- BOX SHADOW --- const shadow = styles.getPropertyValue('box-shadow'); if (shadow && shadow !== 'none') { results.shadows.push({ value: shadow, element: tag, }); } }); return results; } """) return styles_data def _process_color(self, color_data: dict) -> Optional[str]: """Process and normalize a color value.""" value = color_data.get("value", "") # Parse and normalize parsed = parse_color(value) if not parsed: return None return parsed.hex def _aggregate_colors(self, raw_colors: list[dict]): """Aggregate color data from extraction.""" for color_data in raw_colors: hex_value = self._process_color(color_data) if not hex_value: continue if hex_value not in self.colors: # Calculate contrast ratios contrast_white = get_contrast_with_white(hex_value) contrast_black = get_contrast_with_black(hex_value) compliance = check_wcag_compliance(hex_value, "#ffffff") self.colors[hex_value] = ColorToken( value=hex_value, frequency=0, contexts=[], elements=[], css_properties=[], contrast_white=round(contrast_white, 2), contrast_black=round(contrast_black, 2), wcag_aa_large_text=compliance["aa_large_text"], wcag_aa_small_text=compliance["aa_normal_text"], ) # Update frequency and context token = self.colors[hex_value] token.frequency += 1 context = color_data.get("context", "") if context and context not in token.contexts: token.contexts.append(context) element = color_data.get("element", "") if element and element not in token.elements: token.elements.append(element) prop = color_data.get("property", "") if prop and prop not in token.css_properties: token.css_properties.append(prop) def _aggregate_typography(self, raw_typography: list[dict]): """Aggregate typography data from extraction.""" for typo_data in raw_typography: # Create unique key font_family = typo_data.get("fontFamily", "") font_size = typo_data.get("fontSize", "") font_weight = typo_data.get("fontWeight", "400") line_height = typo_data.get("lineHeight", "normal") key = f"{font_size}|{font_weight}|{font_family[:50]}" if key not in self.typography: # Parse font size to px font_size_px = None if font_size.endswith("px"): try: font_size_px = float(font_size.replace("px", "")) except ValueError: pass # Parse line height line_height_computed = None if line_height and line_height != "normal": if line_height.endswith("px") and font_size_px: try: lh_px = float(line_height.replace("px", "")) line_height_computed = round(lh_px / font_size_px, 2) except ValueError: pass else: try: line_height_computed = float(line_height) except ValueError: pass self.typography[key] = TypographyToken( font_family=font_family.split(",")[0].strip().strip('"\''), font_size=font_size, font_size_px=font_size_px, font_weight=int(font_weight) if font_weight.isdigit() else 400, line_height=line_height, line_height_computed=line_height_computed, letter_spacing=typo_data.get("letterSpacing"), frequency=0, elements=[], ) # Update token = self.typography[key] token.frequency += 1 element = typo_data.get("element", "") if element and element not in token.elements: token.elements.append(element) # Track font families primary_font = token.font_family if primary_font not in self.font_families: self.font_families[primary_font] = FontFamily( name=primary_font, fallbacks=[f.strip().strip('"\'') for f in font_family.split(",")[1:]], frequency=0, ) self.font_families[primary_font].frequency += 1 def _aggregate_spacing(self, raw_spacing: list[dict]): """Aggregate spacing data from extraction.""" for space_data in raw_spacing: value = space_data.get("value", "") value_px = space_data.get("valuePx", 0) key = str(value_px) if key not in self.spacing: self.spacing[key] = SpacingToken( value=f"{value_px}px", value_px=value_px, frequency=0, contexts=[], properties=[], fits_base_4=value_px % 4 == 0, fits_base_8=value_px % 8 == 0, ) token = self.spacing[key] token.frequency += 1 context = space_data.get("context", "") if context and context not in token.contexts: token.contexts.append(context) prop = space_data.get("property", "") if prop and prop not in token.properties: token.properties.append(prop) def _aggregate_radius(self, raw_radius: list[dict]): """Aggregate border radius data.""" for radius_data in raw_radius: value = radius_data.get("value", "") # Normalize to simple format # "8px 8px 8px 8px" -> "8px" parts = value.split() if len(set(parts)) == 1: value = parts[0] if value not in self.radius: value_px = None if value.endswith("px"): try: value_px = int(float(value.replace("px", ""))) except ValueError: pass self.radius[value] = RadiusToken( value=value, value_px=value_px, frequency=0, elements=[], fits_base_4=value_px % 4 == 0 if value_px else False, fits_base_8=value_px % 8 == 0 if value_px else False, ) token = self.radius[value] token.frequency += 1 element = radius_data.get("element", "") if element and element not in token.elements: token.elements.append(element) def _aggregate_shadows(self, raw_shadows: list[dict]): """Aggregate box shadow data.""" for shadow_data in raw_shadows: value = shadow_data.get("value", "") if value not in self.shadows: self.shadows[value] = ShadowToken( value=value, frequency=0, elements=[], ) token = self.shadows[value] token.frequency += 1 element = shadow_data.get("element", "") if element and element not in token.elements: token.elements.append(element) def _calculate_confidence(self, frequency: int) -> Confidence: """Calculate confidence level based on frequency.""" if frequency >= 10: return Confidence.HIGH elif frequency >= 3: return Confidence.MEDIUM return Confidence.LOW def _detect_spacing_base(self) -> Optional[int]: """Detect the base spacing unit (4 or 8).""" fits_4 = sum(1 for s in self.spacing.values() if s.fits_base_4) fits_8 = sum(1 for s in self.spacing.values() if s.fits_base_8) total = len(self.spacing) if total == 0: return None # If 80%+ values fit base 8, use 8 if fits_8 / total >= 0.8: return 8 # If 80%+ values fit base 4, use 4 elif fits_4 / total >= 0.8: return 4 return None async def extract( self, pages: list[str], progress_callback: Optional[Callable[[float], None]] = None ) -> ExtractedTokens: """ Extract tokens from a list of pages. Args: pages: List of URLs to crawl progress_callback: Optional callback for progress updates Returns: ExtractedTokens with all discovered tokens """ start_time = datetime.now() pages_crawled = [] async with self: for i, url in enumerate(pages): try: page = await self.context.new_page() # Navigate with fallback strategy try: await page.goto( url, wait_until="domcontentloaded", timeout=60000 # 60 seconds ) # Wait for JS to render await page.wait_for_timeout(2000) except Exception as nav_error: # Fallback to load event try: await page.goto( url, wait_until="load", timeout=60000 ) await page.wait_for_timeout(3000) except Exception: self.warnings.append(f"Slow load for {url}, extracting partial content") # Scroll to load lazy content await self._scroll_page(page) # Extract styles styles = await self._extract_styles_from_page(page) # Aggregate self._aggregate_colors(styles.get("colors", [])) self._aggregate_typography(styles.get("typography", [])) self._aggregate_spacing(styles.get("spacing", [])) self._aggregate_radius(styles.get("radius", [])) self._aggregate_shadows(styles.get("shadows", [])) self.total_elements += styles.get("elements_count", 0) pages_crawled.append(url) await page.close() # Progress callback if progress_callback: progress_callback((i + 1) / len(pages)) # Rate limiting await asyncio.sleep(self.settings.crawl.crawl_delay_ms / 1000) except Exception as e: self.errors.append(f"Error extracting {url}: {str(e)}") # Calculate confidence for all tokens for token in self.colors.values(): token.confidence = self._calculate_confidence(token.frequency) for token in self.typography.values(): token.confidence = self._calculate_confidence(token.frequency) for token in self.spacing.values(): token.confidence = self._calculate_confidence(token.frequency) # Detect spacing base spacing_base = self._detect_spacing_base() # Mark outliers in spacing if spacing_base: for token in self.spacing.values(): if spacing_base == 8 and not token.fits_base_8: token.is_outlier = True elif spacing_base == 4 and not token.fits_base_4: token.is_outlier = True # Determine primary font if self.font_families: primary_font = max(self.font_families.values(), key=lambda f: f.frequency) primary_font.usage = "primary" # Build result end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) return ExtractedTokens( viewport=self.viewport, source_url=pages[0] if pages else "", pages_crawled=pages_crawled, colors=list(self.colors.values()), typography=list(self.typography.values()), spacing=list(self.spacing.values()), radius=list(self.radius.values()), shadows=list(self.shadows.values()), font_families=list(self.font_families.values()), spacing_base=spacing_base, extraction_timestamp=start_time, extraction_duration_ms=duration_ms, total_elements_analyzed=self.total_elements, unique_colors=len(self.colors), unique_font_sizes=len(set(t.font_size for t in self.typography.values())), unique_spacing_values=len(self.spacing), errors=self.errors, warnings=self.warnings, ) # ============================================================================= # CONVENIENCE FUNCTIONS # ============================================================================= async def extract_from_pages( pages: list[str], viewport: Viewport = Viewport.DESKTOP ) -> ExtractedTokens: """Convenience function to extract tokens from pages.""" extractor = TokenExtractor(viewport=viewport) return await extractor.extract(pages) async def extract_both_viewports(pages: list[str]) -> tuple[ExtractedTokens, ExtractedTokens]: """Extract tokens from both desktop and mobile viewports.""" desktop_extractor = TokenExtractor(viewport=Viewport.DESKTOP) mobile_extractor = TokenExtractor(viewport=Viewport.MOBILE) desktop_result = await desktop_extractor.extract(pages) mobile_result = await mobile_extractor.extract(pages) return desktop_result, mobile_result