import time from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from functools import wraps from typing import Any, Callable, Dict, List, Optional from .. import LOGGER class ErrorSeverity(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" @dataclass class ErrorRecord: timestamp: datetime error_type: str message: str severity: ErrorSeverity context: Dict[str, Any] = field(default_factory=dict) traceback: Optional[str] = None @dataclass class HealthMetrics: total_operations: int = 0 successful_operations: int = 0 failed_operations: int = 0 error_rate: float = 0.0 avg_response_time: float = 0.0 last_updated: datetime = field(default_factory=datetime.now) class CircuitBreaker: def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failure_count = 0 self.last_failure_time = None self.state = CircuitState.CLOSED def can_execute(self) -> bool: if self.state == CircuitState.CLOSED: return True elif self.state == CircuitState.OPEN: if ( datetime.now() - self.last_failure_time ).seconds >= self.recovery_timeout: self.state = CircuitState.HALF_OPEN return True return False else: return True def on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def on_failure(self): self.failure_count += 1 self.last_failure_time = datetime.now() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN class ErrorMonitor: def __init__(self): self.errors: List[ErrorRecord] = [] self.health_metrics: Dict[str, HealthMetrics] = {} self.circuit_breakers: Dict[str, CircuitBreaker] = {} self.max_errors = 1000 def record_error( self, error_type: str, message: str, severity: ErrorSeverity = ErrorSeverity.MEDIUM, context: Dict[str, Any] = None, traceback: str = None, ): error_record = ErrorRecord( timestamp=datetime.now(), error_type=error_type, message=message, severity=severity, context=context or {}, traceback=traceback, ) self.errors.append(error_record) if len(self.errors) > self.max_errors: self.errors.pop(0) LOGGER.error(f"Error recorded: {error_type} - {message}") def update_health_metrics( self, operation: str, success: bool, response_time: float ): if operation not in self.health_metrics: self.health_metrics[operation] = HealthMetrics() metrics = self.health_metrics[operation] metrics.total_operations += 1 if success: metrics.successful_operations += 1 else: metrics.failed_operations += 1 metrics.error_rate = metrics.failed_operations / metrics.total_operations metrics.avg_response_time = ( metrics.avg_response_time * (metrics.total_operations - 1) + response_time ) / metrics.total_operations metrics.last_updated = datetime.now() def get_circuit_breaker(self, operation: str) -> CircuitBreaker: if operation not in self.circuit_breakers: self.circuit_breakers[operation] = CircuitBreaker() return self.circuit_breakers[operation] def get_error_summary(self, hours: int = 24) -> Dict[str, Any]: cutoff_time = datetime.now() - timedelta(hours=hours) recent_errors = [e for e in self.errors if e.timestamp >= cutoff_time] return { "total_errors": len(recent_errors), "errors_by_severity": { severity.value: len( [e for e in recent_errors if e.severity == severity] ) for severity in ErrorSeverity }, "errors_by_type": { error_type: len( [e for e in recent_errors if e.error_type == error_type] ) for error_type in set(e.error_type for e in recent_errors) }, "health_metrics": { op: { "error_rate": metrics.error_rate, "avg_response_time": metrics.avg_response_time, "total_operations": metrics.total_operations, } for op, metrics in self.health_metrics.items() }, } error_monitor = ErrorMonitor() def error_handler(operation: str = None): def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): op_name = operation or func.__name__ start_time = time.time() circuit_breaker = error_monitor.get_circuit_breaker(op_name) if not circuit_breaker.can_execute(): raise Exception(f"Circuit breaker open for {op_name}") try: result = await func(*args, **kwargs) circuit_breaker.on_success() error_monitor.update_health_metrics( op_name, True, time.time() - start_time ) return result except Exception as e: circuit_breaker.on_failure() error_monitor.update_health_metrics( op_name, False, time.time() - start_time ) error_monitor.record_error( error_type=type(e).__name__, message=str(e), severity=ErrorSeverity.HIGH, context={ "operation": op_name, "args": str(args), "kwargs": str(kwargs), }, ) raise return wrapper return decorator def circuit_breaker( operation: str, failure_threshold: int = 5, recovery_timeout: int = 60 ): def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): cb = CircuitBreaker(failure_threshold, recovery_timeout) if not cb.can_execute(): raise Exception(f"Circuit breaker open for {operation}") try: result = await func(*args, **kwargs) cb.on_success() return result except Exception: cb.on_failure() raise return wrapper return decorator