Spaces:
Sleeping
Sleeping
| """ | |
| Common Utilities Module | |
| Shared utility functions used across the application. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import hashlib | |
| import re | |
| import time | |
| from datetime import datetime | |
| from functools import wraps | |
| from typing import Any, Callable, Optional, TypeVar | |
| from urllib.parse import urlparse, urljoin | |
| T = TypeVar("T") | |
| # ============================================================================= | |
| # PLAYWRIGHT UTILITIES | |
| # ============================================================================= | |
| async def block_resources(route) -> None: | |
| """ | |
| Blocks unnecessary resources to speed up scraping. | |
| Used with Playwright's page.route() to block images, stylesheets, etc. | |
| """ | |
| if route.request.resource_type in ["image", "stylesheet", "font", "media"]: | |
| await route.abort() | |
| else: | |
| await route.continue_() | |
| # ============================================================================= | |
| # TIMING UTILITIES | |
| # ============================================================================= | |
| class Timer: | |
| """Simple timer for measuring execution time.""" | |
| def __init__(self) -> None: | |
| self.start_time: float = 0.0 | |
| self.end_time: float = 0.0 | |
| def __enter__(self) -> "Timer": | |
| self.start_time = time.perf_counter() | |
| return self | |
| def __exit__(self, *args) -> None: | |
| self.end_time = time.perf_counter() | |
| def elapsed(self) -> float: | |
| """Elapsed time in seconds.""" | |
| if self.end_time: | |
| return self.end_time - self.start_time | |
| return time.perf_counter() - self.start_time | |
| def elapsed_ms(self) -> float: | |
| """Elapsed time in milliseconds.""" | |
| return self.elapsed * 1000 | |
| def timed(func: Callable[..., T]) -> Callable[..., T]: | |
| """Decorator that logs execution time.""" | |
| import logging | |
| logger = logging.getLogger(func.__module__) | |
| async def async_wrapper(*args, **kwargs): | |
| start = time.perf_counter() | |
| result = await func(*args, **kwargs) | |
| duration = (time.perf_counter() - start) * 1000 | |
| logger.debug(f"{func.__name__} took {duration:.2f}ms") | |
| return result | |
| def sync_wrapper(*args, **kwargs): | |
| start = time.perf_counter() | |
| result = func(*args, **kwargs) | |
| duration = (time.perf_counter() - start) * 1000 | |
| logger.debug(f"{func.__name__} took {duration:.2f}ms") | |
| return result | |
| if asyncio.iscoroutinefunction(func): | |
| return async_wrapper | |
| return sync_wrapper | |
| # ============================================================================= | |
| # STRING UTILITIES | |
| # ============================================================================= | |
| def slugify(text: str, max_length: int = 100) -> str: | |
| """Convert text to URL-friendly slug.""" | |
| if not text: | |
| return "" | |
| slug = text.lower().strip() | |
| slug = re.sub(r"\s+", "-", slug) | |
| slug = re.sub(r"[^a-z0-9-]", "", slug) | |
| slug = re.sub(r"-+", "-", slug) | |
| slug = slug.strip("-") | |
| return slug[:max_length] | |
| def truncate(text: str, max_length: int = 100, suffix: str = "...") -> str: | |
| """Truncate text to maximum length.""" | |
| if not text or len(text) <= max_length: | |
| return text | |
| return text[:max_length - len(suffix)].rstrip() + suffix | |
| def strip_html(html: str) -> str: | |
| """Remove HTML tags from text.""" | |
| if not html: | |
| return "" | |
| import html as html_module | |
| text = re.sub(r"<[^>]+>", "", html) | |
| text = html_module.unescape(text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def word_count(text: str) -> int: | |
| """Count words in text.""" | |
| if not text: | |
| return 0 | |
| return len(text.split()) | |
| def reading_time(text: str, wpm: int = 200) -> int: | |
| """Estimate reading time in minutes.""" | |
| words = word_count(text) | |
| return max(1, words // wpm) | |
| # ============================================================================= | |
| # URL UTILITIES | |
| # ============================================================================= | |
| def extract_domain(url: str) -> str: | |
| """Extract domain from URL.""" | |
| if not url: | |
| return "" | |
| try: | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower() | |
| if domain.startswith("www."): | |
| domain = domain[4:] | |
| return domain | |
| except Exception: | |
| return "" | |
| def normalize_url(url: str) -> str: | |
| """Normalize a URL for comparison.""" | |
| if not url: | |
| return "" | |
| url = url.strip().lower().rstrip("/") | |
| tracking_params = ["utm_source", "utm_medium", "utm_campaign", "ref", "source"] | |
| parsed = urlparse(url) | |
| if parsed.query: | |
| from urllib.parse import parse_qs, urlencode | |
| params = parse_qs(parsed.query) | |
| filtered = {k: v for k, v in params.items() if k not in tracking_params} | |
| query = urlencode(filtered, doseq=True) | |
| url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" | |
| if query: | |
| url += f"?{query}" | |
| return url | |
| def is_absolute_url(url: str) -> bool: | |
| """Check if URL is absolute.""" | |
| if not url: | |
| return False | |
| return url.startswith(("http://", "https://")) | |
| def make_absolute_url(url: str, base_url: str) -> str: | |
| """Convert relative URL to absolute.""" | |
| if not url: | |
| return "" | |
| if is_absolute_url(url): | |
| return url | |
| return urljoin(base_url, url) | |
| # Default high resolution width for Medium images | |
| MEDIUM_IMAGE_DEFAULT_WIDTH = 1400 | |
| def upgrade_medium_image_url(url: str, target_width: int = MEDIUM_IMAGE_DEFAULT_WIDTH) -> str: | |
| """ | |
| Upgrades a Medium image URL to a higher resolution. | |
| Medium uses multiple CDN URL patterns: | |
| - https://miro.medium.com/v2/resize:fit:320/{image_id} | |
| - https://miro.medium.com/v2/resize:fill:88:88/{image_id} | |
| - https://miro.medium.com/max/320/{image_id} (older format) | |
| - https://cdn-images-1.medium.com/max/320/{image_id} (alternate CDN) | |
| - https://miro.medium.com/freeze/fit/320/240/{image_id} (animated) | |
| This function replaces the resize parameters with a higher resolution. | |
| Args: | |
| url: The original image URL | |
| target_width: Target width in pixels (default 1400 for high-res) | |
| Returns: | |
| Upgraded URL with higher resolution, or original if not a Medium image | |
| """ | |
| if not url: | |
| return url | |
| # Check if it's a Medium CDN URL (support multiple CDN domains) | |
| is_medium_cdn = any(cdn in url for cdn in [ | |
| "miro.medium.com", | |
| "cdn-images-1.medium.com", | |
| "cdn-images-2.medium.com", | |
| "cdn.medium.com" | |
| ]) | |
| if not is_medium_cdn: | |
| return url | |
| # Pattern 1: v2/resize:fit:WIDTH or v2/resize:fill:WIDTH:HEIGHT | |
| pattern_v2 = r"(miro\.medium\.com/v2/resize:)(fit|fill):(\d+)(?::(\d+))?" | |
| match = re.search(pattern_v2, url) | |
| if match: | |
| new_url = re.sub(pattern_v2, f"miro.medium.com/v2/resize:fit:{target_width}", url) | |
| return new_url | |
| # Pattern 2: older format max/WIDTH (both miro and cdn-images) | |
| pattern_max = r"((?:miro|cdn-images-\d+)\.medium\.com/max/)(\d+)" | |
| match = re.search(pattern_max, url) | |
| if match: | |
| # Convert to new miro.medium.com format | |
| new_url = re.sub(pattern_max, f"miro.medium.com/v2/resize:fit:{target_width}", url) | |
| return new_url | |
| # Pattern 3: freeze format with dimensions (for animated images) | |
| pattern_freeze = r"(miro\.medium\.com/freeze/)(fit|fill)/(\d+)/(\d+)" | |
| match = re.search(pattern_freeze, url) | |
| if match: | |
| new_url = re.sub(pattern_freeze, f"miro.medium.com/v2/resize:fit:{target_width}", url) | |
| return new_url | |
| # Pattern 4: Handle cdn-images-X.medium.com/fit/... format | |
| pattern_cdn_fit = r"(cdn-images-\d+\.medium\.com/)(?:fit|c)/(?:t/)?(\d+)/(\d+)" | |
| match = re.search(pattern_cdn_fit, url) | |
| if match: | |
| # Extract image ID and convert to miro format | |
| # Look for the image ID after the size parameters | |
| id_match = re.search(r"(\d+)/(\d+)/(.+)$", url) | |
| if id_match: | |
| image_id = id_match.group(3) | |
| return f"https://miro.medium.com/v2/resize:fit:{target_width}/{image_id}" | |
| # Pattern 5: Try to extract image ID from any Medium URL format | |
| # Image IDs typically contain * and have file extension | |
| pattern_id = r"(?:miro|cdn-images-\d+|cdn)\.medium\.com/.*?/([01]\*[a-zA-Z0-9_-]+\.[a-zA-Z0-9]+)" | |
| match = re.search(pattern_id, url) | |
| if match: | |
| image_id = match.group(1) | |
| return f"https://miro.medium.com/v2/resize:fit:{target_width}/{image_id}" | |
| # Pattern 6: Handle URLs with just the image hash (no file extension in path) | |
| # Example: .../v2/da:true/resize:fit:320/abc123def456 | |
| pattern_hash = r"(?:miro|cdn)\.medium\.com/.*?resize:(?:fit|fill):(\d+)/([a-zA-Z0-9]+)$" | |
| match = re.search(pattern_hash, url) | |
| if match: | |
| image_hash = match.group(2) | |
| return f"https://miro.medium.com/v2/resize:fit:{target_width}/{image_hash}" | |
| # Pattern 7: Generic fallback - try to get the last path segment as image ID | |
| # if it looks like an image ID (starts with 0* or 1*) | |
| pattern_generic = r"/([01]\*[^\s/]+)(?:\?.*)?$" | |
| match = re.search(pattern_generic, url) | |
| if match: | |
| image_id = match.group(1) | |
| return f"https://miro.medium.com/v2/resize:fit:{target_width}/{image_id}" | |
| # Return original if we can't upgrade (better to show low-res than break) | |
| return url | |
| def get_medium_image_url(image_id: str, width: int = MEDIUM_IMAGE_DEFAULT_WIDTH) -> str: | |
| """ | |
| Build a high-resolution Medium image URL from an image ID. | |
| Args: | |
| image_id: The Medium image ID (e.g., "1*abc123.png") | |
| width: Target width in pixels (default 1400 for high-res) | |
| Returns: | |
| Full Medium CDN URL for the image | |
| """ | |
| if not image_id: | |
| return "" | |
| return f"https://miro.medium.com/v2/resize:fit:{width}/{image_id}" | |
| # ============================================================================= | |
| # HASH UTILITIES | |
| # ============================================================================= | |
| def md5_hash(data: str | bytes) -> str: | |
| """Generate MD5 hash of data.""" | |
| if isinstance(data, str): | |
| data = data.encode("utf-8") | |
| return hashlib.md5(data).hexdigest() | |
| def content_hash(content: str) -> str: | |
| """Generate a short content hash for caching.""" | |
| return md5_hash(content)[:8] | |
| # ============================================================================= | |
| # DATE UTILITIES | |
| # ============================================================================= | |
| def parse_iso_date(date_str: str) -> Optional[datetime]: | |
| """Parse ISO 8601 date string.""" | |
| if not date_str: | |
| return None | |
| try: | |
| date_str = date_str.replace("Z", "+00:00") | |
| return datetime.fromisoformat(date_str) | |
| except ValueError: | |
| return None | |
| def format_relative_time(dt: datetime) -> str: | |
| """Format datetime as relative time.""" | |
| now = datetime.utcnow() | |
| if dt.tzinfo: | |
| from datetime import timezone | |
| now = datetime.now(timezone.utc) | |
| diff = now - dt | |
| seconds = diff.total_seconds() | |
| if seconds < 60: | |
| return "just now" | |
| elif seconds < 3600: | |
| minutes = int(seconds / 60) | |
| return f"{minutes} minute{'s' if minutes != 1 else ''} ago" | |
| elif seconds < 86400: | |
| hours = int(seconds / 3600) | |
| return f"{hours} hour{'s' if hours != 1 else ''} ago" | |
| elif seconds < 604800: | |
| days = int(seconds / 86400) | |
| return f"{days} day{'s' if days != 1 else ''} ago" | |
| else: | |
| return dt.strftime("%b %d, %Y") | |
| # ============================================================================= | |
| # ASYNC UTILITIES | |
| # ============================================================================= | |
| async def gather_with_exceptions(*coros, return_exceptions: bool = False) -> list[Any]: | |
| """Gather coroutines, optionally returning exceptions.""" | |
| return await asyncio.gather(*coros, return_exceptions=return_exceptions) | |
| def run_sync(coro): | |
| """Run async coroutine synchronously.""" | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as pool: | |
| return pool.submit(asyncio.run, coro).result() | |
| return loop.run_until_complete(coro) | |
| except RuntimeError: | |
| return asyncio.run(coro) | |
| # ============================================================================= | |
| # RETRY UTILITIES | |
| # ============================================================================= | |
| def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0, exceptions: tuple = (Exception,)): | |
| """Retry decorator with exponential backoff.""" | |
| def decorator(func: Callable) -> Callable: | |
| async def async_wrapper(*args, **kwargs): | |
| current_delay = delay | |
| last_exception = None | |
| for attempt in range(max_attempts): | |
| try: | |
| return await func(*args, **kwargs) | |
| except exceptions as e: | |
| last_exception = e | |
| if attempt < max_attempts - 1: | |
| await asyncio.sleep(current_delay) | |
| current_delay *= backoff | |
| raise last_exception | |
| def sync_wrapper(*args, **kwargs): | |
| current_delay = delay | |
| last_exception = None | |
| for attempt in range(max_attempts): | |
| try: | |
| return func(*args, **kwargs) | |
| except exceptions as e: | |
| last_exception = e | |
| if attempt < max_attempts - 1: | |
| time.sleep(current_delay) | |
| current_delay *= backoff | |
| raise last_exception | |
| if asyncio.iscoroutinefunction(func): | |
| return async_wrapper | |
| return sync_wrapper | |
| return decorator | |