""" Error Handling Utilities for Medium Scraper Provides a standardized exception hierarchy and error handling utilities with context preservation for better debugging and observability. """ import logging from datetime import datetime from typing import Optional, Dict, Any from enum import Enum # ============================================================================ # Exception Hierarchy # ============================================================================ class ScraperError(Exception): """Base exception for all scraper errors.""" def __init__(self, message: str, context: Optional[Dict[str, Any]] = None): super().__init__(message) self.message = message self.context = context or {} self.timestamp = datetime.now().isoformat() class NetworkError(ScraperError): """Network-related errors (timeouts, connection failures, DNS issues).""" pass class ParseError(ScraperError): """Content parsing errors (invalid HTML, missing data, malformed JSON).""" pass class RateLimitError(ScraperError): """Rate limit exceeded (API quota, throttling).""" pass class AuthenticationError(ScraperError): """Authentication/authorization errors (invalid cookies, expired tokens).""" pass class CircuitBreakerError(ScraperError): """Circuit breaker is open, preventing requests.""" pass class CacheError(ScraperError): """Cache-related errors (corruption, unavailable).""" pass class DatabaseError(ScraperError): """Database operation errors (connection, query, integrity).""" pass class ValidationError(ScraperError): """Input validation errors (invalid URL, missing required fields).""" pass # ============================================================================ # Error Severity Levels # ============================================================================ class ErrorSeverity(Enum): """Error severity levels for categorization.""" LOW = "low" # Recoverable, expected in some cases MEDIUM = "medium" # Unexpected but handled gracefully HIGH = "high" # Critical, needs immediate attention CRITICAL = "critical" # System failure, service degraded # ============================================================================ # Error Handler # ============================================================================ def handle_error( error: Exception, context: str, logger: logging.Logger, severity: ErrorSeverity = ErrorSeverity.MEDIUM, additional_context: Optional[Dict[str, Any]] = None ) -> None: """ Standardized error handler with context preservation. Args: error: The exception that occurred context: Contextual string describing where error occurred logger: Logger instance to use severity: Error severity level additional_context: Additional context data Example: try: article = await scrape_article(url) except Exception as e: handle_error( e, context="scrape_article", logger=logger, severity=ErrorSeverity.HIGH, additional_context={"url": url, "tier": "graphql"} ) """ error_class = type(error).__name__ error_message = str(error) # Build structured error data error_data = { "context": context, "error_type": error_class, "message": error_message, "severity": severity.value, "timestamp": datetime.now().isoformat() } # Add custom context from ScraperError instances if isinstance(error, ScraperError): error_data["scraper_context"] = error.context error_data["error_timestamp"] = error.timestamp # Add additional context if additional_context: error_data["additional_context"] = additional_context # Log based on severity log_message = f"[{context}] {error_class}: {error_message}" if severity == ErrorSeverity.CRITICAL: logger.critical(log_message, extra=error_data, exc_info=True) elif severity == ErrorSeverity.HIGH: logger.error(log_message, extra=error_data, exc_info=True) elif severity == ErrorSeverity.MEDIUM: logger.warning(log_message, extra=error_data) else: # LOW logger.info(log_message, extra=error_data) # ============================================================================ # Error Context Manager # ============================================================================ class ErrorContext: """ Context manager for adding context to errors. Usage: with ErrorContext("scraping_article", url=url) as ctx: article = await scrape(url) ctx.add_data("tier", "graphql") """ def __init__(self, operation: str, **initial_context): self.operation = operation self.context_data = initial_context def add_data(self, key: str, value: Any) -> None: """Add additional context data.""" self.context_data[key] = value def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None and isinstance(exc_val, ScraperError): # Add context data to ScraperError exc_val.context.update(self.context_data) return False # Don't suppress exceptions # ============================================================================ # Retry Decorator with Error Handling # ============================================================================ def with_retry( max_retries: int = 3, backoff_base: float = 1.0, backoff_multiplier: float = 2.0, retry_on: tuple = (NetworkError, RateLimitError), logger: Optional[logging.Logger] = None ): """ Decorator for automatic retries with exponential backoff. Args: max_retries: Maximum number of retry attempts backoff_base: Initial delay in seconds backoff_multiplier: Multiplier for exponential backoff retry_on: Tuple of exception types to retry on logger: Optional logger for retry events Example: @with_retry(max_retries=3, retry_on=(NetworkError,)) async def fetch_data(url): return await httpx.get(url) """ import asyncio from functools import wraps def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): delay = backoff_base last_exception = None for attempt in range(max_retries + 1): try: return await func(*args, **kwargs) except retry_on as e: last_exception = e if attempt == max_retries: if logger: logger.error( f"Max retries ({max_retries}) exceeded for {func.__name__}", extra={"attempts": attempt + 1, "error": str(e)} ) raise if logger: logger.warning( f"Retry {attempt + 1}/{max_retries} for {func.__name__}", extra={"delay": delay, "error": str(e)} ) await asyncio.sleep(delay) delay *= backoff_multiplier except Exception as e: # Don't retry on non-retryable errors raise # Should never reach here raise last_exception @wraps(func) def sync_wrapper(*args, **kwargs): import time delay = backoff_base last_exception = None for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except retry_on as e: last_exception = e if attempt == max_retries: if logger: logger.error( f"Max retries ({max_retries}) exceeded for {func.__name__}", extra={"attempts": attempt + 1, "error": str(e)} ) raise if logger: logger.warning( f"Retry {attempt + 1}/{max_retries} for {func.__name__}", extra={"delay": delay, "error": str(e)} ) time.sleep(delay) delay *= backoff_multiplier except Exception as e: # Don't retry on non-retryable errors raise raise last_exception # Return appropriate wrapper based on function type import inspect if inspect.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator