Medium-MCP / src /error_handling.py
Nikhil Pravin Pise
feat: comprehensive migration - merge Scraper + MCP Server
ae588db
"""
Error Handling Utilities for Medium Scraper
Provides a standardized exception hierarchy and error handling utilities
with context preservation for better debugging and observability.
"""
import logging
from datetime import datetime
from typing import Optional, Dict, Any
from enum import Enum
# ============================================================================
# Exception Hierarchy
# ============================================================================
class ScraperError(Exception):
"""Base exception for all scraper errors."""
def __init__(self, message: str, context: Optional[Dict[str, Any]] = None):
super().__init__(message)
self.message = message
self.context = context or {}
self.timestamp = datetime.now().isoformat()
class NetworkError(ScraperError):
"""Network-related errors (timeouts, connection failures, DNS issues)."""
pass
class ParseError(ScraperError):
"""Content parsing errors (invalid HTML, missing data, malformed JSON)."""
pass
class RateLimitError(ScraperError):
"""Rate limit exceeded (API quota, throttling)."""
pass
class AuthenticationError(ScraperError):
"""Authentication/authorization errors (invalid cookies, expired tokens)."""
pass
class CircuitBreakerError(ScraperError):
"""Circuit breaker is open, preventing requests."""
pass
class CacheError(ScraperError):
"""Cache-related errors (corruption, unavailable)."""
pass
class DatabaseError(ScraperError):
"""Database operation errors (connection, query, integrity)."""
pass
class ValidationError(ScraperError):
"""Input validation errors (invalid URL, missing required fields)."""
pass
# ============================================================================
# Error Severity Levels
# ============================================================================
class ErrorSeverity(Enum):
"""Error severity levels for categorization."""
LOW = "low" # Recoverable, expected in some cases
MEDIUM = "medium" # Unexpected but handled gracefully
HIGH = "high" # Critical, needs immediate attention
CRITICAL = "critical" # System failure, service degraded
# ============================================================================
# Error Handler
# ============================================================================
def handle_error(
error: Exception,
context: str,
logger: logging.Logger,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
additional_context: Optional[Dict[str, Any]] = None
) -> None:
"""
Standardized error handler with context preservation.
Args:
error: The exception that occurred
context: Contextual string describing where error occurred
logger: Logger instance to use
severity: Error severity level
additional_context: Additional context data
Example:
try:
article = await scrape_article(url)
except Exception as e:
handle_error(
e,
context="scrape_article",
logger=logger,
severity=ErrorSeverity.HIGH,
additional_context={"url": url, "tier": "graphql"}
)
"""
error_class = type(error).__name__
error_message = str(error)
# Build structured error data
error_data = {
"context": context,
"error_type": error_class,
"message": error_message,
"severity": severity.value,
"timestamp": datetime.now().isoformat()
}
# Add custom context from ScraperError instances
if isinstance(error, ScraperError):
error_data["scraper_context"] = error.context
error_data["error_timestamp"] = error.timestamp
# Add additional context
if additional_context:
error_data["additional_context"] = additional_context
# Log based on severity
log_message = f"[{context}] {error_class}: {error_message}"
if severity == ErrorSeverity.CRITICAL:
logger.critical(log_message, extra=error_data, exc_info=True)
elif severity == ErrorSeverity.HIGH:
logger.error(log_message, extra=error_data, exc_info=True)
elif severity == ErrorSeverity.MEDIUM:
logger.warning(log_message, extra=error_data)
else: # LOW
logger.info(log_message, extra=error_data)
# ============================================================================
# Error Context Manager
# ============================================================================
class ErrorContext:
"""
Context manager for adding context to errors.
Usage:
with ErrorContext("scraping_article", url=url) as ctx:
article = await scrape(url)
ctx.add_data("tier", "graphql")
"""
def __init__(self, operation: str, **initial_context):
self.operation = operation
self.context_data = initial_context
def add_data(self, key: str, value: Any) -> None:
"""Add additional context data."""
self.context_data[key] = value
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None and isinstance(exc_val, ScraperError):
# Add context data to ScraperError
exc_val.context.update(self.context_data)
return False # Don't suppress exceptions
# ============================================================================
# Retry Decorator with Error Handling
# ============================================================================
def with_retry(
max_retries: int = 3,
backoff_base: float = 1.0,
backoff_multiplier: float = 2.0,
retry_on: tuple = (NetworkError, RateLimitError),
logger: Optional[logging.Logger] = None
):
"""
Decorator for automatic retries with exponential backoff.
Args:
max_retries: Maximum number of retry attempts
backoff_base: Initial delay in seconds
backoff_multiplier: Multiplier for exponential backoff
retry_on: Tuple of exception types to retry on
logger: Optional logger for retry events
Example:
@with_retry(max_retries=3, retry_on=(NetworkError,))
async def fetch_data(url):
return await httpx.get(url)
"""
import asyncio
from functools import wraps
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
delay = backoff_base
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retry_on as e:
last_exception = e
if attempt == max_retries:
if logger:
logger.error(
f"Max retries ({max_retries}) exceeded for {func.__name__}",
extra={"attempts": attempt + 1, "error": str(e)}
)
raise
if logger:
logger.warning(
f"Retry {attempt + 1}/{max_retries} for {func.__name__}",
extra={"delay": delay, "error": str(e)}
)
await asyncio.sleep(delay)
delay *= backoff_multiplier
except Exception as e:
# Don't retry on non-retryable errors
raise
# Should never reach here
raise last_exception
@wraps(func)
def sync_wrapper(*args, **kwargs):
import time
delay = backoff_base
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except retry_on as e:
last_exception = e
if attempt == max_retries:
if logger:
logger.error(
f"Max retries ({max_retries}) exceeded for {func.__name__}",
extra={"attempts": attempt + 1, "error": str(e)}
)
raise
if logger:
logger.warning(
f"Retry {attempt + 1}/{max_retries} for {func.__name__}",
extra={"delay": delay, "error": str(e)}
)
time.sleep(delay)
delay *= backoff_multiplier
except Exception as e:
# Don't retry on non-retryable errors
raise
raise last_exception
# Return appropriate wrapper based on function type
import inspect
if inspect.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator