|
|
""" |
|
|
Agent 1: Website Crawler |
|
|
Design System Extractor v2 |
|
|
|
|
|
Persona: Meticulous Design Archaeologist |
|
|
|
|
|
Responsibilities: |
|
|
- Auto-discover pages from base URL |
|
|
- Classify page types (homepage, listing, detail, etc.) |
|
|
- Prepare page list for user confirmation |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import re |
|
|
from urllib.parse import urljoin, urlparse |
|
|
from typing import Optional, Callable |
|
|
from datetime import datetime |
|
|
|
|
|
from playwright.async_api import async_playwright, Browser, Page, BrowserContext |
|
|
|
|
|
from core.token_schema import DiscoveredPage, PageType, Viewport |
|
|
from config.settings import get_settings |
|
|
|
|
|
|
|
|
class PageDiscoverer: |
|
|
""" |
|
|
Discovers pages from a website for design system extraction. |
|
|
|
|
|
This is the first part of Agent 1's job — finding pages before |
|
|
the human confirms which ones to crawl. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.settings = get_settings() |
|
|
self.browser: Optional[Browser] = None |
|
|
self.context: Optional[BrowserContext] = None |
|
|
self.visited_urls: set[str] = set() |
|
|
self.discovered_pages: list[DiscoveredPage] = [] |
|
|
|
|
|
async def __aenter__(self): |
|
|
"""Async context manager entry.""" |
|
|
await self._init_browser() |
|
|
return self |
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb): |
|
|
"""Async context manager exit.""" |
|
|
await self._close_browser() |
|
|
|
|
|
async def _init_browser(self): |
|
|
"""Initialize Playwright browser.""" |
|
|
playwright = await async_playwright().start() |
|
|
self.browser = await playwright.chromium.launch( |
|
|
headless=self.settings.browser.headless |
|
|
) |
|
|
self.context = await self.browser.new_context( |
|
|
viewport={ |
|
|
"width": self.settings.viewport.desktop_width, |
|
|
"height": self.settings.viewport.desktop_height, |
|
|
}, |
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" |
|
|
) |
|
|
|
|
|
async def _close_browser(self): |
|
|
"""Close browser and cleanup.""" |
|
|
if self.context: |
|
|
await self.context.close() |
|
|
if self.browser: |
|
|
await self.browser.close() |
|
|
|
|
|
def _normalize_url(self, url: str, base_url: str) -> Optional[str]: |
|
|
"""Normalize and validate URL.""" |
|
|
|
|
|
if not url.startswith(('http://', 'https://')): |
|
|
url = urljoin(base_url, url) |
|
|
|
|
|
parsed = urlparse(url) |
|
|
base_parsed = urlparse(base_url) |
|
|
|
|
|
|
|
|
if parsed.netloc != base_parsed.netloc: |
|
|
return None |
|
|
|
|
|
|
|
|
normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" |
|
|
|
|
|
|
|
|
if normalized.endswith('/') and len(normalized) > len(f"{parsed.scheme}://{parsed.netloc}/"): |
|
|
normalized = normalized.rstrip('/') |
|
|
|
|
|
return normalized |
|
|
|
|
|
def _classify_page_type(self, url: str, title: str = "") -> PageType: |
|
|
""" |
|
|
Classify page type based on URL patterns and title. |
|
|
|
|
|
This is a heuristic — not perfect, but good enough for discovery. |
|
|
""" |
|
|
url_lower = url.lower() |
|
|
title_lower = title.lower() if title else "" |
|
|
|
|
|
|
|
|
patterns = { |
|
|
PageType.HOMEPAGE: [r'/$', r'/home$', r'/index'], |
|
|
PageType.LISTING: [r'/products', r'/catalog', r'/list', r'/category', r'/collection', r'/search'], |
|
|
PageType.DETAIL: [r'/product/', r'/item/', r'/detail/', r'/p/', r'/[a-z-]+/\d+'], |
|
|
PageType.FORM: [r'/contact', r'/form', r'/apply', r'/submit', r'/register'], |
|
|
PageType.AUTH: [r'/login', r'/signin', r'/signup', r'/auth', r'/account'], |
|
|
PageType.CHECKOUT: [r'/cart', r'/checkout', r'/basket', r'/payment'], |
|
|
PageType.MARKETING: [r'/landing', r'/promo', r'/campaign', r'/offer'], |
|
|
PageType.ABOUT: [r'/about', r'/team', r'/company', r'/story'], |
|
|
PageType.CONTACT: [r'/contact', r'/support', r'/help'], |
|
|
} |
|
|
|
|
|
for page_type, url_patterns in patterns.items(): |
|
|
for pattern in url_patterns: |
|
|
if re.search(pattern, url_lower): |
|
|
return page_type |
|
|
|
|
|
|
|
|
title_patterns = { |
|
|
PageType.HOMEPAGE: ['home', 'welcome'], |
|
|
PageType.LISTING: ['products', 'catalog', 'collection', 'browse'], |
|
|
PageType.DETAIL: ['product', 'item'], |
|
|
PageType.AUTH: ['login', 'sign in', 'sign up', 'register'], |
|
|
PageType.ABOUT: ['about', 'our story', 'team'], |
|
|
PageType.CONTACT: ['contact', 'get in touch', 'support'], |
|
|
} |
|
|
|
|
|
for page_type, keywords in title_patterns.items(): |
|
|
for keyword in keywords: |
|
|
if keyword in title_lower: |
|
|
return page_type |
|
|
|
|
|
return PageType.OTHER |
|
|
|
|
|
async def _extract_links(self, page: Page, base_url: str) -> list[str]: |
|
|
"""Extract all internal links from a page.""" |
|
|
links = await page.evaluate(""" |
|
|
() => { |
|
|
const links = Array.from(document.querySelectorAll('a[href]')); |
|
|
return links.map(a => a.href).filter(href => |
|
|
href && |
|
|
!href.startsWith('javascript:') && |
|
|
!href.startsWith('mailto:') && |
|
|
!href.startsWith('tel:') && |
|
|
!href.includes('#') |
|
|
); |
|
|
} |
|
|
""") |
|
|
|
|
|
|
|
|
valid_links = [] |
|
|
for link in links: |
|
|
normalized = self._normalize_url(link, base_url) |
|
|
if normalized and normalized not in self.visited_urls: |
|
|
valid_links.append(normalized) |
|
|
|
|
|
return list(set(valid_links)) |
|
|
|
|
|
async def _get_page_title(self, page: Page) -> str: |
|
|
"""Get page title.""" |
|
|
try: |
|
|
return await page.title() |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
async def discover( |
|
|
self, |
|
|
base_url: str, |
|
|
max_pages: int = None, |
|
|
progress_callback: Optional[Callable[[float], None]] = None |
|
|
) -> list[DiscoveredPage]: |
|
|
""" |
|
|
Discover pages from a website. |
|
|
|
|
|
Args: |
|
|
base_url: The starting URL |
|
|
max_pages: Maximum pages to discover (default from settings) |
|
|
progress_callback: Optional callback for progress updates |
|
|
|
|
|
Returns: |
|
|
List of discovered pages |
|
|
""" |
|
|
max_pages = max_pages or self.settings.crawl.max_pages |
|
|
|
|
|
async with self: |
|
|
|
|
|
normalized_base = self._normalize_url(base_url, base_url) |
|
|
if not normalized_base: |
|
|
raise ValueError(f"Invalid base URL: {base_url}") |
|
|
|
|
|
queue = [normalized_base] |
|
|
self.visited_urls = set() |
|
|
self.discovered_pages = [] |
|
|
|
|
|
while queue and len(self.discovered_pages) < max_pages: |
|
|
current_url = queue.pop(0) |
|
|
|
|
|
if current_url in self.visited_urls: |
|
|
continue |
|
|
|
|
|
self.visited_urls.add(current_url) |
|
|
|
|
|
try: |
|
|
page = await self.context.new_page() |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
await page.goto( |
|
|
current_url, |
|
|
wait_until="domcontentloaded", |
|
|
timeout=60000 |
|
|
) |
|
|
|
|
|
await page.wait_for_timeout(2000) |
|
|
except Exception as nav_error: |
|
|
|
|
|
try: |
|
|
await page.goto( |
|
|
current_url, |
|
|
wait_until="load", |
|
|
timeout=60000 |
|
|
) |
|
|
await page.wait_for_timeout(3000) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
title = await self._get_page_title(page) |
|
|
page_type = self._classify_page_type(current_url, title) |
|
|
depth = len(urlparse(current_url).path.split('/')) - 1 |
|
|
|
|
|
|
|
|
discovered = DiscoveredPage( |
|
|
url=current_url, |
|
|
title=title, |
|
|
page_type=page_type, |
|
|
depth=depth, |
|
|
selected=True, |
|
|
) |
|
|
self.discovered_pages.append(discovered) |
|
|
|
|
|
|
|
|
new_links = await self._extract_links(page, base_url) |
|
|
|
|
|
|
|
|
priority_patterns = ['/product', '/listing', '/category', '/about', '/contact'] |
|
|
priority_links = [l for l in new_links if any(p in l.lower() for p in priority_patterns)] |
|
|
other_links = [l for l in new_links if l not in priority_links] |
|
|
|
|
|
|
|
|
for link in priority_links + other_links: |
|
|
if link not in self.visited_urls and link not in queue: |
|
|
queue.append(link) |
|
|
|
|
|
await page.close() |
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
progress = len(self.discovered_pages) / max_pages |
|
|
progress_callback(min(progress, 1.0)) |
|
|
|
|
|
|
|
|
await asyncio.sleep(self.settings.crawl.crawl_delay_ms / 1000) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
discovered = DiscoveredPage( |
|
|
url=current_url, |
|
|
title="", |
|
|
page_type=PageType.OTHER, |
|
|
depth=0, |
|
|
selected=False, |
|
|
error=str(e), |
|
|
) |
|
|
self.discovered_pages.append(discovered) |
|
|
|
|
|
return self.discovered_pages |
|
|
|
|
|
def get_pages_by_type(self) -> dict[PageType, list[DiscoveredPage]]: |
|
|
"""Group discovered pages by type.""" |
|
|
grouped: dict[PageType, list[DiscoveredPage]] = {} |
|
|
for page in self.discovered_pages: |
|
|
if page.page_type not in grouped: |
|
|
grouped[page.page_type] = [] |
|
|
grouped[page.page_type].append(page) |
|
|
return grouped |
|
|
|
|
|
def get_suggested_pages(self, min_pages: int = None) -> list[DiscoveredPage]: |
|
|
""" |
|
|
Get suggested pages for extraction. |
|
|
|
|
|
Ensures diversity of page types and prioritizes key templates. |
|
|
""" |
|
|
min_pages = min_pages or self.settings.crawl.min_pages |
|
|
|
|
|
|
|
|
priority_types = [ |
|
|
PageType.HOMEPAGE, |
|
|
PageType.LISTING, |
|
|
PageType.DETAIL, |
|
|
PageType.FORM, |
|
|
PageType.MARKETING, |
|
|
PageType.AUTH, |
|
|
PageType.ABOUT, |
|
|
PageType.CONTACT, |
|
|
PageType.OTHER, |
|
|
] |
|
|
|
|
|
selected = [] |
|
|
grouped = self.get_pages_by_type() |
|
|
|
|
|
|
|
|
for page_type in priority_types: |
|
|
if page_type in grouped and grouped[page_type]: |
|
|
|
|
|
page = sorted(grouped[page_type], key=lambda p: p.depth)[0] |
|
|
if page not in selected: |
|
|
selected.append(page) |
|
|
|
|
|
|
|
|
remaining = [p for p in self.discovered_pages if p not in selected and not p.error] |
|
|
remaining.sort(key=lambda p: p.depth) |
|
|
|
|
|
while len(selected) < min_pages and remaining: |
|
|
selected.append(remaining.pop(0)) |
|
|
|
|
|
|
|
|
for page in selected: |
|
|
page.selected = True |
|
|
|
|
|
return selected |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def discover_pages(base_url: str, max_pages: int = 20) -> list[DiscoveredPage]: |
|
|
"""Convenience function to discover pages.""" |
|
|
discoverer = PageDiscoverer() |
|
|
return await discoverer.discover(base_url, max_pages) |
|
|
|
|
|
|
|
|
async def quick_discover(base_url: str) -> dict: |
|
|
"""Quick discovery returning summary dict.""" |
|
|
pages = await discover_pages(base_url) |
|
|
|
|
|
return { |
|
|
"total_found": len(pages), |
|
|
"by_type": { |
|
|
pt.value: len([p for p in pages if p.page_type == pt]) |
|
|
for pt in PageType |
|
|
}, |
|
|
"pages": [ |
|
|
{ |
|
|
"url": p.url, |
|
|
"title": p.title, |
|
|
"type": p.page_type.value, |
|
|
"selected": p.selected, |
|
|
} |
|
|
for p in pages |
|
|
], |
|
|
} |
|
|
|