| | """ |
| | Agent 1: Token Extractor |
| | Design System Extractor v2 |
| | |
| | Persona: Meticulous Design Archaeologist |
| | |
| | Responsibilities: |
| | - Crawl pages at specified viewport |
| | - Extract computed styles from all elements |
| | - Collect colors, typography, spacing, radius, shadows |
| | - Track frequency and context for each token |
| | """ |
| |
|
| | import asyncio |
| | import re |
| | from typing import Optional, Callable |
| | from datetime import datetime |
| | from collections import defaultdict |
| |
|
| | from playwright.async_api import async_playwright, Browser, Page, BrowserContext |
| |
|
| | from core.token_schema import ( |
| | Viewport, |
| | ExtractedTokens, |
| | ColorToken, |
| | TypographyToken, |
| | SpacingToken, |
| | RadiusToken, |
| | ShadowToken, |
| | FontFamily, |
| | TokenSource, |
| | Confidence, |
| | ) |
| | from core.color_utils import ( |
| | normalize_hex, |
| | parse_color, |
| | get_contrast_with_white, |
| | get_contrast_with_black, |
| | check_wcag_compliance, |
| | ) |
| | from config.settings import get_settings |
| |
|
| |
|
| | class TokenExtractor: |
| | """ |
| | Extracts design tokens from web pages. |
| | |
| | This is the second part of Agent 1's job — after pages are confirmed, |
| | we crawl and extract all CSS values. |
| | """ |
| | |
| | def __init__(self, viewport: Viewport = Viewport.DESKTOP): |
| | self.settings = get_settings() |
| | self.viewport = viewport |
| | self.browser: Optional[Browser] = None |
| | self.context: Optional[BrowserContext] = None |
| | |
| | |
| | self.colors: dict[str, ColorToken] = {} |
| | self.typography: dict[str, TypographyToken] = {} |
| | self.spacing: dict[str, SpacingToken] = {} |
| | self.radius: dict[str, RadiusToken] = {} |
| | self.shadows: dict[str, ShadowToken] = {} |
| | |
| | |
| | self.font_families: dict[str, FontFamily] = {} |
| | |
| | |
| | self.total_elements = 0 |
| | self.errors: list[str] = [] |
| | self.warnings: list[str] = [] |
| | |
| | async def __aenter__(self): |
| | """Async context manager entry.""" |
| | await self._init_browser() |
| | return self |
| | |
| | async def __aexit__(self, exc_type, exc_val, exc_tb): |
| | """Async context manager exit.""" |
| | await self._close_browser() |
| | |
| | async def _init_browser(self): |
| | """Initialize Playwright browser.""" |
| | playwright = await async_playwright().start() |
| | self.browser = await playwright.chromium.launch( |
| | headless=self.settings.browser.headless |
| | ) |
| | |
| | |
| | if self.viewport == Viewport.DESKTOP: |
| | width = self.settings.viewport.desktop_width |
| | height = self.settings.viewport.desktop_height |
| | else: |
| | width = self.settings.viewport.mobile_width |
| | height = self.settings.viewport.mobile_height |
| | |
| | self.context = await self.browser.new_context( |
| | viewport={"width": width, "height": height}, |
| | user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" |
| | ) |
| | |
| | async def _close_browser(self): |
| | """Close browser and cleanup.""" |
| | if self.context: |
| | await self.context.close() |
| | if self.browser: |
| | await self.browser.close() |
| | |
| | async def _scroll_page(self, page: Page): |
| | """Scroll page to load lazy content.""" |
| | await page.evaluate(""" |
| | async () => { |
| | const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); |
| | const height = document.body.scrollHeight; |
| | const step = window.innerHeight; |
| | |
| | for (let y = 0; y < height; y += step) { |
| | window.scrollTo(0, y); |
| | await delay(100); |
| | } |
| | |
| | // Scroll back to top |
| | window.scrollTo(0, 0); |
| | } |
| | """) |
| | |
| | |
| | await page.wait_for_load_state("networkidle", timeout=self.settings.browser.network_idle_timeout) |
| | |
| | async def _extract_styles_from_page(self, page: Page) -> dict: |
| | """ |
| | Extract computed styles from all elements on the page. |
| | |
| | This is the core extraction logic — we get getComputedStyle for every element. |
| | """ |
| | styles_data = await page.evaluate(""" |
| | () => { |
| | const elements = document.querySelectorAll('*'); |
| | const results = { |
| | colors: [], |
| | typography: [], |
| | spacing: [], |
| | radius: [], |
| | shadows: [], |
| | elements_count: elements.length, |
| | }; |
| | |
| | const colorProperties = [ |
| | 'color', 'background-color', 'border-color', |
| | 'border-top-color', 'border-right-color', |
| | 'border-bottom-color', 'border-left-color', |
| | 'outline-color', 'text-decoration-color', |
| | ]; |
| | |
| | const spacingProperties = [ |
| | 'margin-top', 'margin-right', 'margin-bottom', 'margin-left', |
| | 'padding-top', 'padding-right', 'padding-bottom', 'padding-left', |
| | 'gap', 'row-gap', 'column-gap', |
| | ]; |
| | |
| | elements.forEach(el => { |
| | const tag = el.tagName.toLowerCase(); |
| | const styles = window.getComputedStyle(el); |
| | |
| | // Skip invisible elements |
| | if (styles.display === 'none' || styles.visibility === 'hidden') { |
| | return; |
| | } |
| | |
| | // --- COLORS --- |
| | colorProperties.forEach(prop => { |
| | const value = styles.getPropertyValue(prop); |
| | if (value && value !== 'rgba(0, 0, 0, 0)' && value !== 'transparent') { |
| | results.colors.push({ |
| | value: value, |
| | property: prop, |
| | element: tag, |
| | context: prop.includes('background') ? 'background' : |
| | prop.includes('border') ? 'border' : 'text', |
| | }); |
| | } |
| | }); |
| | |
| | // --- TYPOGRAPHY --- |
| | const fontFamily = styles.getPropertyValue('font-family'); |
| | const fontSize = styles.getPropertyValue('font-size'); |
| | const fontWeight = styles.getPropertyValue('font-weight'); |
| | const lineHeight = styles.getPropertyValue('line-height'); |
| | const letterSpacing = styles.getPropertyValue('letter-spacing'); |
| | |
| | if (fontSize && fontFamily) { |
| | results.typography.push({ |
| | fontFamily: fontFamily, |
| | fontSize: fontSize, |
| | fontWeight: fontWeight, |
| | lineHeight: lineHeight, |
| | letterSpacing: letterSpacing, |
| | element: tag, |
| | }); |
| | } |
| | |
| | // --- SPACING --- |
| | spacingProperties.forEach(prop => { |
| | const value = styles.getPropertyValue(prop); |
| | if (value && value !== '0px' && value !== 'auto' && value !== 'normal') { |
| | const px = parseFloat(value); |
| | if (!isNaN(px) && px > 0 && px < 500) { |
| | results.spacing.push({ |
| | value: value, |
| | valuePx: Math.round(px), |
| | property: prop, |
| | context: prop.includes('margin') ? 'margin' : |
| | prop.includes('padding') ? 'padding' : 'gap', |
| | }); |
| | } |
| | } |
| | }); |
| | |
| | // --- BORDER RADIUS --- |
| | const radiusProps = [ |
| | 'border-radius', 'border-top-left-radius', |
| | 'border-top-right-radius', 'border-bottom-left-radius', |
| | 'border-bottom-right-radius', |
| | ]; |
| | |
| | radiusProps.forEach(prop => { |
| | const value = styles.getPropertyValue(prop); |
| | if (value && value !== '0px') { |
| | results.radius.push({ |
| | value: value, |
| | element: tag, |
| | }); |
| | } |
| | }); |
| | |
| | // --- BOX SHADOW --- |
| | const shadow = styles.getPropertyValue('box-shadow'); |
| | if (shadow && shadow !== 'none') { |
| | results.shadows.push({ |
| | value: shadow, |
| | element: tag, |
| | }); |
| | } |
| | }); |
| | |
| | return results; |
| | } |
| | """) |
| | |
| | return styles_data |
| | |
| | def _process_color(self, color_data: dict) -> Optional[str]: |
| | """Process and normalize a color value.""" |
| | value = color_data.get("value", "") |
| | |
| | |
| | parsed = parse_color(value) |
| | if not parsed: |
| | return None |
| | |
| | return parsed.hex |
| | |
| | def _aggregate_colors(self, raw_colors: list[dict]): |
| | """Aggregate color data from extraction.""" |
| | for color_data in raw_colors: |
| | hex_value = self._process_color(color_data) |
| | if not hex_value: |
| | continue |
| | |
| | if hex_value not in self.colors: |
| | |
| | contrast_white = get_contrast_with_white(hex_value) |
| | contrast_black = get_contrast_with_black(hex_value) |
| | compliance = check_wcag_compliance(hex_value, "#ffffff") |
| | |
| | self.colors[hex_value] = ColorToken( |
| | value=hex_value, |
| | frequency=0, |
| | contexts=[], |
| | elements=[], |
| | css_properties=[], |
| | contrast_white=round(contrast_white, 2), |
| | contrast_black=round(contrast_black, 2), |
| | wcag_aa_large_text=compliance["aa_large_text"], |
| | wcag_aa_small_text=compliance["aa_normal_text"], |
| | ) |
| | |
| | |
| | token = self.colors[hex_value] |
| | token.frequency += 1 |
| | |
| | context = color_data.get("context", "") |
| | if context and context not in token.contexts: |
| | token.contexts.append(context) |
| | |
| | element = color_data.get("element", "") |
| | if element and element not in token.elements: |
| | token.elements.append(element) |
| | |
| | prop = color_data.get("property", "") |
| | if prop and prop not in token.css_properties: |
| | token.css_properties.append(prop) |
| | |
| | def _aggregate_typography(self, raw_typography: list[dict]): |
| | """Aggregate typography data from extraction.""" |
| | for typo_data in raw_typography: |
| | |
| | font_family = typo_data.get("fontFamily", "") |
| | font_size = typo_data.get("fontSize", "") |
| | font_weight = typo_data.get("fontWeight", "400") |
| | line_height = typo_data.get("lineHeight", "normal") |
| | |
| | key = f"{font_size}|{font_weight}|{font_family[:50]}" |
| | |
| | if key not in self.typography: |
| | |
| | font_size_px = None |
| | if font_size.endswith("px"): |
| | try: |
| | font_size_px = float(font_size.replace("px", "")) |
| | except ValueError: |
| | pass |
| | |
| | |
| | line_height_computed = None |
| | if line_height and line_height != "normal": |
| | if line_height.endswith("px") and font_size_px: |
| | try: |
| | lh_px = float(line_height.replace("px", "")) |
| | line_height_computed = round(lh_px / font_size_px, 2) |
| | except ValueError: |
| | pass |
| | else: |
| | try: |
| | line_height_computed = float(line_height) |
| | except ValueError: |
| | pass |
| | |
| | self.typography[key] = TypographyToken( |
| | font_family=font_family.split(",")[0].strip().strip('"\''), |
| | font_size=font_size, |
| | font_size_px=font_size_px, |
| | font_weight=int(font_weight) if font_weight.isdigit() else 400, |
| | line_height=line_height, |
| | line_height_computed=line_height_computed, |
| | letter_spacing=typo_data.get("letterSpacing"), |
| | frequency=0, |
| | elements=[], |
| | ) |
| | |
| | |
| | token = self.typography[key] |
| | token.frequency += 1 |
| | |
| | element = typo_data.get("element", "") |
| | if element and element not in token.elements: |
| | token.elements.append(element) |
| | |
| | |
| | primary_font = token.font_family |
| | if primary_font not in self.font_families: |
| | self.font_families[primary_font] = FontFamily( |
| | name=primary_font, |
| | fallbacks=[f.strip().strip('"\'') for f in font_family.split(",")[1:]], |
| | frequency=0, |
| | ) |
| | self.font_families[primary_font].frequency += 1 |
| | |
| | def _aggregate_spacing(self, raw_spacing: list[dict]): |
| | """Aggregate spacing data from extraction.""" |
| | for space_data in raw_spacing: |
| | value = space_data.get("value", "") |
| | value_px = space_data.get("valuePx", 0) |
| | |
| | key = str(value_px) |
| | |
| | if key not in self.spacing: |
| | self.spacing[key] = SpacingToken( |
| | value=f"{value_px}px", |
| | value_px=value_px, |
| | frequency=0, |
| | contexts=[], |
| | properties=[], |
| | fits_base_4=value_px % 4 == 0, |
| | fits_base_8=value_px % 8 == 0, |
| | ) |
| | |
| | token = self.spacing[key] |
| | token.frequency += 1 |
| | |
| | context = space_data.get("context", "") |
| | if context and context not in token.contexts: |
| | token.contexts.append(context) |
| | |
| | prop = space_data.get("property", "") |
| | if prop and prop not in token.properties: |
| | token.properties.append(prop) |
| | |
| | def _aggregate_radius(self, raw_radius: list[dict]): |
| | """Aggregate border radius data.""" |
| | for radius_data in raw_radius: |
| | value = radius_data.get("value", "") |
| | |
| | |
| | |
| | parts = value.split() |
| | if len(set(parts)) == 1: |
| | value = parts[0] |
| | |
| | if value not in self.radius: |
| | value_px = None |
| | if value.endswith("px"): |
| | try: |
| | value_px = int(float(value.replace("px", ""))) |
| | except ValueError: |
| | pass |
| | |
| | self.radius[value] = RadiusToken( |
| | value=value, |
| | value_px=value_px, |
| | frequency=0, |
| | elements=[], |
| | fits_base_4=value_px % 4 == 0 if value_px else False, |
| | fits_base_8=value_px % 8 == 0 if value_px else False, |
| | ) |
| | |
| | token = self.radius[value] |
| | token.frequency += 1 |
| | |
| | element = radius_data.get("element", "") |
| | if element and element not in token.elements: |
| | token.elements.append(element) |
| | |
| | def _aggregate_shadows(self, raw_shadows: list[dict]): |
| | """Aggregate box shadow data.""" |
| | for shadow_data in raw_shadows: |
| | value = shadow_data.get("value", "") |
| | |
| | if value not in self.shadows: |
| | self.shadows[value] = ShadowToken( |
| | value=value, |
| | frequency=0, |
| | elements=[], |
| | ) |
| | |
| | token = self.shadows[value] |
| | token.frequency += 1 |
| | |
| | element = shadow_data.get("element", "") |
| | if element and element not in token.elements: |
| | token.elements.append(element) |
| | |
| | def _calculate_confidence(self, frequency: int) -> Confidence: |
| | """Calculate confidence level based on frequency.""" |
| | if frequency >= 10: |
| | return Confidence.HIGH |
| | elif frequency >= 3: |
| | return Confidence.MEDIUM |
| | return Confidence.LOW |
| | |
| | def _detect_spacing_base(self) -> Optional[int]: |
| | """Detect the base spacing unit (4 or 8).""" |
| | fits_4 = sum(1 for s in self.spacing.values() if s.fits_base_4) |
| | fits_8 = sum(1 for s in self.spacing.values() if s.fits_base_8) |
| | |
| | total = len(self.spacing) |
| | if total == 0: |
| | return None |
| | |
| | |
| | if fits_8 / total >= 0.8: |
| | return 8 |
| | |
| | elif fits_4 / total >= 0.8: |
| | return 4 |
| | |
| | return None |
| | |
| | async def extract( |
| | self, |
| | pages: list[str], |
| | progress_callback: Optional[Callable[[float], None]] = None |
| | ) -> ExtractedTokens: |
| | """ |
| | Extract tokens from a list of pages. |
| | |
| | Args: |
| | pages: List of URLs to crawl |
| | progress_callback: Optional callback for progress updates |
| | |
| | Returns: |
| | ExtractedTokens with all discovered tokens |
| | """ |
| | start_time = datetime.now() |
| | pages_crawled = [] |
| | |
| | async with self: |
| | for i, url in enumerate(pages): |
| | try: |
| | page = await self.context.new_page() |
| | |
| | |
| | try: |
| | await page.goto( |
| | url, |
| | wait_until="domcontentloaded", |
| | timeout=60000 |
| | ) |
| | |
| | await page.wait_for_timeout(2000) |
| | except Exception as nav_error: |
| | |
| | try: |
| | await page.goto( |
| | url, |
| | wait_until="load", |
| | timeout=60000 |
| | ) |
| | await page.wait_for_timeout(3000) |
| | except Exception: |
| | self.warnings.append(f"Slow load for {url}, extracting partial content") |
| | |
| | |
| | await self._scroll_page(page) |
| | |
| | |
| | styles = await self._extract_styles_from_page(page) |
| | |
| | |
| | self._aggregate_colors(styles.get("colors", [])) |
| | self._aggregate_typography(styles.get("typography", [])) |
| | self._aggregate_spacing(styles.get("spacing", [])) |
| | self._aggregate_radius(styles.get("radius", [])) |
| | self._aggregate_shadows(styles.get("shadows", [])) |
| | |
| | self.total_elements += styles.get("elements_count", 0) |
| | pages_crawled.append(url) |
| | |
| | await page.close() |
| | |
| | |
| | if progress_callback: |
| | progress_callback((i + 1) / len(pages)) |
| | |
| | |
| | await asyncio.sleep(self.settings.crawl.crawl_delay_ms / 1000) |
| | |
| | except Exception as e: |
| | self.errors.append(f"Error extracting {url}: {str(e)}") |
| | |
| | |
| | for token in self.colors.values(): |
| | token.confidence = self._calculate_confidence(token.frequency) |
| | for token in self.typography.values(): |
| | token.confidence = self._calculate_confidence(token.frequency) |
| | for token in self.spacing.values(): |
| | token.confidence = self._calculate_confidence(token.frequency) |
| | |
| | |
| | spacing_base = self._detect_spacing_base() |
| | |
| | |
| | if spacing_base: |
| | for token in self.spacing.values(): |
| | if spacing_base == 8 and not token.fits_base_8: |
| | token.is_outlier = True |
| | elif spacing_base == 4 and not token.fits_base_4: |
| | token.is_outlier = True |
| | |
| | |
| | if self.font_families: |
| | primary_font = max(self.font_families.values(), key=lambda f: f.frequency) |
| | primary_font.usage = "primary" |
| | |
| | |
| | end_time = datetime.now() |
| | duration_ms = int((end_time - start_time).total_seconds() * 1000) |
| | |
| | return ExtractedTokens( |
| | viewport=self.viewport, |
| | source_url=pages[0] if pages else "", |
| | pages_crawled=pages_crawled, |
| | colors=list(self.colors.values()), |
| | typography=list(self.typography.values()), |
| | spacing=list(self.spacing.values()), |
| | radius=list(self.radius.values()), |
| | shadows=list(self.shadows.values()), |
| | font_families=list(self.font_families.values()), |
| | spacing_base=spacing_base, |
| | extraction_timestamp=start_time, |
| | extraction_duration_ms=duration_ms, |
| | total_elements_analyzed=self.total_elements, |
| | unique_colors=len(self.colors), |
| | unique_font_sizes=len(set(t.font_size for t in self.typography.values())), |
| | unique_spacing_values=len(self.spacing), |
| | errors=self.errors, |
| | warnings=self.warnings, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def extract_from_pages( |
| | pages: list[str], |
| | viewport: Viewport = Viewport.DESKTOP |
| | ) -> ExtractedTokens: |
| | """Convenience function to extract tokens from pages.""" |
| | extractor = TokenExtractor(viewport=viewport) |
| | return await extractor.extract(pages) |
| |
|
| |
|
| | async def extract_both_viewports(pages: list[str]) -> tuple[ExtractedTokens, ExtractedTokens]: |
| | """Extract tokens from both desktop and mobile viewports.""" |
| | desktop_extractor = TokenExtractor(viewport=Viewport.DESKTOP) |
| | mobile_extractor = TokenExtractor(viewport=Viewport.MOBILE) |
| | |
| | desktop_result = await desktop_extractor.extract(pages) |
| | mobile_result = await mobile_extractor.extract(pages) |
| | |
| | return desktop_result, mobile_result |
| |
|