riazmo's picture
Upload 2 files
9131d5e verified
"""
Agent 1: Token Extractor
Design System Extractor v2
Persona: Meticulous Design Archaeologist
Responsibilities:
- Crawl pages at specified viewport
- Extract computed styles from all elements
- Collect colors, typography, spacing, radius, shadows
- Track frequency and context for each token
"""
import asyncio
import re
from typing import Optional, Callable
from datetime import datetime
from collections import defaultdict
from playwright.async_api import async_playwright, Browser, Page, BrowserContext
from core.token_schema import (
Viewport,
ExtractedTokens,
ColorToken,
TypographyToken,
SpacingToken,
RadiusToken,
ShadowToken,
FontFamily,
TokenSource,
Confidence,
)
from core.color_utils import (
normalize_hex,
parse_color,
get_contrast_with_white,
get_contrast_with_black,
check_wcag_compliance,
)
from config.settings import get_settings
class TokenExtractor:
"""
Extracts design tokens from web pages.
This is the second part of Agent 1's job — after pages are confirmed,
we crawl and extract all CSS values.
"""
def __init__(self, viewport: Viewport = Viewport.DESKTOP):
self.settings = get_settings()
self.viewport = viewport
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
# Token collection
self.colors: dict[str, ColorToken] = {}
self.typography: dict[str, TypographyToken] = {}
self.spacing: dict[str, SpacingToken] = {}
self.radius: dict[str, RadiusToken] = {}
self.shadows: dict[str, ShadowToken] = {}
# Font tracking
self.font_families: dict[str, FontFamily] = {}
# Statistics
self.total_elements = 0
self.errors: list[str] = []
self.warnings: list[str] = []
async def __aenter__(self):
"""Async context manager entry."""
await self._init_browser()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self._close_browser()
async def _init_browser(self):
"""Initialize Playwright browser."""
playwright = await async_playwright().start()
self.browser = await playwright.chromium.launch(
headless=self.settings.browser.headless
)
# Set viewport based on extraction mode
if self.viewport == Viewport.DESKTOP:
width = self.settings.viewport.desktop_width
height = self.settings.viewport.desktop_height
else:
width = self.settings.viewport.mobile_width
height = self.settings.viewport.mobile_height
self.context = await self.browser.new_context(
viewport={"width": width, "height": height},
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
async def _close_browser(self):
"""Close browser and cleanup."""
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
async def _scroll_page(self, page: Page):
"""Scroll page to load lazy content."""
await page.evaluate("""
async () => {
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
const height = document.body.scrollHeight;
const step = window.innerHeight;
for (let y = 0; y < height; y += step) {
window.scrollTo(0, y);
await delay(100);
}
// Scroll back to top
window.scrollTo(0, 0);
}
""")
# Wait for network idle after scrolling
await page.wait_for_load_state("networkidle", timeout=self.settings.browser.network_idle_timeout)
async def _extract_styles_from_page(self, page: Page) -> dict:
"""
Extract computed styles from all elements on the page.
This is the core extraction logic — we get getComputedStyle for every element.
"""
styles_data = await page.evaluate("""
() => {
const elements = document.querySelectorAll('*');
const results = {
colors: [],
typography: [],
spacing: [],
radius: [],
shadows: [],
elements_count: elements.length,
};
const colorProperties = [
'color', 'background-color', 'border-color',
'border-top-color', 'border-right-color',
'border-bottom-color', 'border-left-color',
'outline-color', 'text-decoration-color',
];
const spacingProperties = [
'margin-top', 'margin-right', 'margin-bottom', 'margin-left',
'padding-top', 'padding-right', 'padding-bottom', 'padding-left',
'gap', 'row-gap', 'column-gap',
];
elements.forEach(el => {
const tag = el.tagName.toLowerCase();
const styles = window.getComputedStyle(el);
// Skip invisible elements
if (styles.display === 'none' || styles.visibility === 'hidden') {
return;
}
// --- COLORS ---
colorProperties.forEach(prop => {
const value = styles.getPropertyValue(prop);
if (value && value !== 'rgba(0, 0, 0, 0)' && value !== 'transparent') {
results.colors.push({
value: value,
property: prop,
element: tag,
context: prop.includes('background') ? 'background' :
prop.includes('border') ? 'border' : 'text',
});
}
});
// --- TYPOGRAPHY ---
const fontFamily = styles.getPropertyValue('font-family');
const fontSize = styles.getPropertyValue('font-size');
const fontWeight = styles.getPropertyValue('font-weight');
const lineHeight = styles.getPropertyValue('line-height');
const letterSpacing = styles.getPropertyValue('letter-spacing');
if (fontSize && fontFamily) {
results.typography.push({
fontFamily: fontFamily,
fontSize: fontSize,
fontWeight: fontWeight,
lineHeight: lineHeight,
letterSpacing: letterSpacing,
element: tag,
});
}
// --- SPACING ---
spacingProperties.forEach(prop => {
const value = styles.getPropertyValue(prop);
if (value && value !== '0px' && value !== 'auto' && value !== 'normal') {
const px = parseFloat(value);
if (!isNaN(px) && px > 0 && px < 500) {
results.spacing.push({
value: value,
valuePx: Math.round(px),
property: prop,
context: prop.includes('margin') ? 'margin' :
prop.includes('padding') ? 'padding' : 'gap',
});
}
}
});
// --- BORDER RADIUS ---
const radiusProps = [
'border-radius', 'border-top-left-radius',
'border-top-right-radius', 'border-bottom-left-radius',
'border-bottom-right-radius',
];
radiusProps.forEach(prop => {
const value = styles.getPropertyValue(prop);
if (value && value !== '0px') {
results.radius.push({
value: value,
element: tag,
});
}
});
// --- BOX SHADOW ---
const shadow = styles.getPropertyValue('box-shadow');
if (shadow && shadow !== 'none') {
results.shadows.push({
value: shadow,
element: tag,
});
}
});
return results;
}
""")
return styles_data
def _process_color(self, color_data: dict) -> Optional[str]:
"""Process and normalize a color value."""
value = color_data.get("value", "")
# Parse and normalize
parsed = parse_color(value)
if not parsed:
return None
return parsed.hex
def _aggregate_colors(self, raw_colors: list[dict]):
"""Aggregate color data from extraction."""
for color_data in raw_colors:
hex_value = self._process_color(color_data)
if not hex_value:
continue
if hex_value not in self.colors:
# Calculate contrast ratios
contrast_white = get_contrast_with_white(hex_value)
contrast_black = get_contrast_with_black(hex_value)
compliance = check_wcag_compliance(hex_value, "#ffffff")
self.colors[hex_value] = ColorToken(
value=hex_value,
frequency=0,
contexts=[],
elements=[],
css_properties=[],
contrast_white=round(contrast_white, 2),
contrast_black=round(contrast_black, 2),
wcag_aa_large_text=compliance["aa_large_text"],
wcag_aa_small_text=compliance["aa_normal_text"],
)
# Update frequency and context
token = self.colors[hex_value]
token.frequency += 1
context = color_data.get("context", "")
if context and context not in token.contexts:
token.contexts.append(context)
element = color_data.get("element", "")
if element and element not in token.elements:
token.elements.append(element)
prop = color_data.get("property", "")
if prop and prop not in token.css_properties:
token.css_properties.append(prop)
def _aggregate_typography(self, raw_typography: list[dict]):
"""Aggregate typography data from extraction."""
for typo_data in raw_typography:
# Create unique key
font_family = typo_data.get("fontFamily", "")
font_size = typo_data.get("fontSize", "")
font_weight = typo_data.get("fontWeight", "400")
line_height = typo_data.get("lineHeight", "normal")
key = f"{font_size}|{font_weight}|{font_family[:50]}"
if key not in self.typography:
# Parse font size to px
font_size_px = None
if font_size.endswith("px"):
try:
font_size_px = float(font_size.replace("px", ""))
except ValueError:
pass
# Parse line height
line_height_computed = None
if line_height and line_height != "normal":
if line_height.endswith("px") and font_size_px:
try:
lh_px = float(line_height.replace("px", ""))
line_height_computed = round(lh_px / font_size_px, 2)
except ValueError:
pass
else:
try:
line_height_computed = float(line_height)
except ValueError:
pass
self.typography[key] = TypographyToken(
font_family=font_family.split(",")[0].strip().strip('"\''),
font_size=font_size,
font_size_px=font_size_px,
font_weight=int(font_weight) if font_weight.isdigit() else 400,
line_height=line_height,
line_height_computed=line_height_computed,
letter_spacing=typo_data.get("letterSpacing"),
frequency=0,
elements=[],
)
# Update
token = self.typography[key]
token.frequency += 1
element = typo_data.get("element", "")
if element and element not in token.elements:
token.elements.append(element)
# Track font families
primary_font = token.font_family
if primary_font not in self.font_families:
self.font_families[primary_font] = FontFamily(
name=primary_font,
fallbacks=[f.strip().strip('"\'') for f in font_family.split(",")[1:]],
frequency=0,
)
self.font_families[primary_font].frequency += 1
def _aggregate_spacing(self, raw_spacing: list[dict]):
"""Aggregate spacing data from extraction."""
for space_data in raw_spacing:
value = space_data.get("value", "")
value_px = space_data.get("valuePx", 0)
key = str(value_px)
if key not in self.spacing:
self.spacing[key] = SpacingToken(
value=f"{value_px}px",
value_px=value_px,
frequency=0,
contexts=[],
properties=[],
fits_base_4=value_px % 4 == 0,
fits_base_8=value_px % 8 == 0,
)
token = self.spacing[key]
token.frequency += 1
context = space_data.get("context", "")
if context and context not in token.contexts:
token.contexts.append(context)
prop = space_data.get("property", "")
if prop and prop not in token.properties:
token.properties.append(prop)
def _aggregate_radius(self, raw_radius: list[dict]):
"""Aggregate border radius data."""
for radius_data in raw_radius:
value = radius_data.get("value", "")
# Normalize to simple format
# "8px 8px 8px 8px" -> "8px"
parts = value.split()
if len(set(parts)) == 1:
value = parts[0]
if value not in self.radius:
value_px = None
if value.endswith("px"):
try:
value_px = int(float(value.replace("px", "")))
except ValueError:
pass
self.radius[value] = RadiusToken(
value=value,
value_px=value_px,
frequency=0,
elements=[],
fits_base_4=value_px % 4 == 0 if value_px else False,
fits_base_8=value_px % 8 == 0 if value_px else False,
)
token = self.radius[value]
token.frequency += 1
element = radius_data.get("element", "")
if element and element not in token.elements:
token.elements.append(element)
def _aggregate_shadows(self, raw_shadows: list[dict]):
"""Aggregate box shadow data."""
for shadow_data in raw_shadows:
value = shadow_data.get("value", "")
if value not in self.shadows:
self.shadows[value] = ShadowToken(
value=value,
frequency=0,
elements=[],
)
token = self.shadows[value]
token.frequency += 1
element = shadow_data.get("element", "")
if element and element not in token.elements:
token.elements.append(element)
def _calculate_confidence(self, frequency: int) -> Confidence:
"""Calculate confidence level based on frequency."""
if frequency >= 10:
return Confidence.HIGH
elif frequency >= 3:
return Confidence.MEDIUM
return Confidence.LOW
def _detect_spacing_base(self) -> Optional[int]:
"""Detect the base spacing unit (4 or 8)."""
fits_4 = sum(1 for s in self.spacing.values() if s.fits_base_4)
fits_8 = sum(1 for s in self.spacing.values() if s.fits_base_8)
total = len(self.spacing)
if total == 0:
return None
# If 80%+ values fit base 8, use 8
if fits_8 / total >= 0.8:
return 8
# If 80%+ values fit base 4, use 4
elif fits_4 / total >= 0.8:
return 4
return None
async def extract(
self,
pages: list[str],
progress_callback: Optional[Callable[[float], None]] = None
) -> ExtractedTokens:
"""
Extract tokens from a list of pages.
Args:
pages: List of URLs to crawl
progress_callback: Optional callback for progress updates
Returns:
ExtractedTokens with all discovered tokens
"""
start_time = datetime.now()
pages_crawled = []
async with self:
for i, url in enumerate(pages):
try:
page = await self.context.new_page()
# Navigate with fallback strategy
try:
await page.goto(
url,
wait_until="domcontentloaded",
timeout=60000 # 60 seconds
)
# Wait for JS to render
await page.wait_for_timeout(2000)
except Exception as nav_error:
# Fallback to load event
try:
await page.goto(
url,
wait_until="load",
timeout=60000
)
await page.wait_for_timeout(3000)
except Exception:
self.warnings.append(f"Slow load for {url}, extracting partial content")
# Scroll to load lazy content
await self._scroll_page(page)
# Extract styles
styles = await self._extract_styles_from_page(page)
# Aggregate
self._aggregate_colors(styles.get("colors", []))
self._aggregate_typography(styles.get("typography", []))
self._aggregate_spacing(styles.get("spacing", []))
self._aggregate_radius(styles.get("radius", []))
self._aggregate_shadows(styles.get("shadows", []))
self.total_elements += styles.get("elements_count", 0)
pages_crawled.append(url)
await page.close()
# Progress callback
if progress_callback:
progress_callback((i + 1) / len(pages))
# Rate limiting
await asyncio.sleep(self.settings.crawl.crawl_delay_ms / 1000)
except Exception as e:
self.errors.append(f"Error extracting {url}: {str(e)}")
# Calculate confidence for all tokens
for token in self.colors.values():
token.confidence = self._calculate_confidence(token.frequency)
for token in self.typography.values():
token.confidence = self._calculate_confidence(token.frequency)
for token in self.spacing.values():
token.confidence = self._calculate_confidence(token.frequency)
# Detect spacing base
spacing_base = self._detect_spacing_base()
# Mark outliers in spacing
if spacing_base:
for token in self.spacing.values():
if spacing_base == 8 and not token.fits_base_8:
token.is_outlier = True
elif spacing_base == 4 and not token.fits_base_4:
token.is_outlier = True
# Determine primary font
if self.font_families:
primary_font = max(self.font_families.values(), key=lambda f: f.frequency)
primary_font.usage = "primary"
# Build result
end_time = datetime.now()
duration_ms = int((end_time - start_time).total_seconds() * 1000)
return ExtractedTokens(
viewport=self.viewport,
source_url=pages[0] if pages else "",
pages_crawled=pages_crawled,
colors=list(self.colors.values()),
typography=list(self.typography.values()),
spacing=list(self.spacing.values()),
radius=list(self.radius.values()),
shadows=list(self.shadows.values()),
font_families=list(self.font_families.values()),
spacing_base=spacing_base,
extraction_timestamp=start_time,
extraction_duration_ms=duration_ms,
total_elements_analyzed=self.total_elements,
unique_colors=len(self.colors),
unique_font_sizes=len(set(t.font_size for t in self.typography.values())),
unique_spacing_values=len(self.spacing),
errors=self.errors,
warnings=self.warnings,
)
# =============================================================================
# CONVENIENCE FUNCTIONS
# =============================================================================
async def extract_from_pages(
pages: list[str],
viewport: Viewport = Viewport.DESKTOP
) -> ExtractedTokens:
"""Convenience function to extract tokens from pages."""
extractor = TokenExtractor(viewport=viewport)
return await extractor.extract(pages)
async def extract_both_viewports(pages: list[str]) -> tuple[ExtractedTokens, ExtractedTokens]:
"""Extract tokens from both desktop and mobile viewports."""
desktop_extractor = TokenExtractor(viewport=Viewport.DESKTOP)
mobile_extractor = TokenExtractor(viewport=Viewport.MOBILE)
desktop_result = await desktop_extractor.extract(pages)
mobile_result = await mobile_extractor.extract(pages)
return desktop_result, mobile_result