""" Context-aware API adapter for MorphGuard. This module provides a context-aware API adapter that enhances the existing API client with: - Intelligent request routing based on context - Dynamic endpoint selection - Adaptive timeout and retry strategies - Resource-aware request throttling - Integrated circuit breaking and fallbacks """ import time import threading import logging from enum import Enum from typing import Dict, Any, List, Optional, Union, Callable, Tuple, TypeVar # Import from existing modules from src.telemetry import get_telemetry, EventCategory from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory from src.api_utils import get_api_utils, FallbackStrategy, CircuitBreaker from src.api_fallback_registry import ( get_fallback_registry, FallbackTier, FallbackStrategy as FallbackRegistryStrategy ) # Type definitions T = TypeVar('T') class EndpointPriority(Enum): """Priority levels for API endpoints.""" CRITICAL = 0 # Highest priority, must succeed HIGH = 1 # High priority, should prioritize NORMAL = 2 # Normal priority LOW = 3 # Low priority, can be delayed BACKGROUND = 4 # Lowest priority, can be dropped class ResourceConsumption(Enum): """Resource consumption levels for requests.""" MINIMAL = 0 # Very light requests LIGHT = 1 # Light requests MODERATE = 2 # Moderate resource usage HEAVY = 3 # Heavy resource usage INTENSIVE = 4 # Very intensive resource usage class EndpointConfiguration: """Configuration for an API endpoint.""" def __init__( self, priority: EndpointPriority = EndpointPriority.NORMAL, resource_consumption: ResourceConsumption = ResourceConsumption.MODERATE, timeout: float = 30.0, retries: int = 3, retry_delay: float = 1.0, circuit_breaker_enabled: bool = True, failure_threshold: int = 5, recovery_timeout: float = 30.0, cache_enabled: bool = True, cache_duration: int = 300, fallback_strategy: FallbackRegistryStrategy = FallbackRegistryStrategy.STATIC, requires_authentication: bool = False, allows_batching: bool = True, idempotent: bool = False, tags: Optional[Dict[str, str]] = None ): """ Initialize endpoint configuration. Args: priority: Priority level for the endpoint resource_consumption: Resource consumption level timeout: Request timeout in seconds retries: Number of retries retry_delay: Delay between retries in seconds circuit_breaker_enabled: Whether to enable circuit breaker failure_threshold: Number of failures before opening circuit recovery_timeout: Time before trying to recover in seconds cache_enabled: Whether to enable caching cache_duration: Cache duration in seconds fallback_strategy: Strategy for selecting fallbacks requires_authentication: Whether authentication is required allows_batching: Whether the endpoint can be batched idempotent: Whether the endpoint is idempotent tags: Tags for the endpoint """ self.priority = priority self.resource_consumption = resource_consumption self.timeout = timeout self.retries = retries self.retry_delay = retry_delay self.circuit_breaker_enabled = circuit_breaker_enabled self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.cache_enabled = cache_enabled self.cache_duration = cache_duration self.fallback_strategy = fallback_strategy self.requires_authentication = requires_authentication self.allows_batching = allows_batching self.idempotent = idempotent self.tags = tags or {} def to_dict(self) -> Dict[str, Any]: """ Convert to dictionary. Returns: Dictionary representation """ return { "priority": self.priority.name, "resource_consumption": self.resource_consumption.name, "timeout": self.timeout, "retries": self.retries, "retry_delay": self.retry_delay, "circuit_breaker_enabled": self.circuit_breaker_enabled, "failure_threshold": self.failure_threshold, "recovery_timeout": self.recovery_timeout, "cache_enabled": self.cache_enabled, "cache_duration": self.cache_duration, "fallback_strategy": self.fallback_strategy.value, "requires_authentication": self.requires_authentication, "allows_batching": self.allows_batching, "idempotent": self.idempotent, "tags": self.tags } class ContextAwareAPIAdapter: """ Context-aware API adapter that enhances the existing API client. This adapter provides: - Intelligent request routing based on context - Dynamic endpoint selection - Adaptive timeout and retry strategies - Resource-aware request throttling - Integrated circuit breaking and fallbacks """ def __init__(self, options: Dict[str, Any] = None): """ Initialize the API adapter. Args: options: Configuration options """ # Default configuration self.config = { "default_timeout": 30.0, "default_retries": 3, "default_retry_delay": 1.0, "default_priority": EndpointPriority.NORMAL, "default_resource_consumption": ResourceConsumption.MODERATE, "default_circuit_breaker_enabled": True, "default_cache_enabled": True, "default_cache_duration": 300, "default_fallback_strategy": FallbackRegistryStrategy.STATIC, "endpoint_config_path": None, "load_from_storage": False, "save_to_storage": False, "resource_monitoring_enabled": True, "resource_threshold_cpu": 80.0, # Percent "resource_threshold_memory": 80.0, # Percent "enable_telemetry": True, "enable_adaptive_timeouts": True, "adaptive_timeout_factor": 1.5, "adaptive_retry_factor": 0.8, "max_concurrent_requests": 100, "request_throttling_enabled": True } # Update with user-provided options if options: self.config.update(options) # Initialize telemetry self.telemetry = get_telemetry() # Get API utilities instance self.api_utils = get_api_utils() # Get fallback registry instance self.fallback_registry = get_fallback_registry() # Endpoint configurations self.endpoint_configs: Dict[str, EndpointConfiguration] = {} # Circuit breakers by endpoint self.circuit_breakers: Dict[str, CircuitBreaker] = {} # Request stats by endpoint self.request_stats: Dict[str, Dict[str, Any]] = {} # Resource usage stats self.resource_usage = { "cpu": 0.0, "memory": 0.0, "last_updated": 0.0 } # Locks self.config_lock = threading.RLock() self.stats_lock = threading.RLock() self.resource_lock = threading.RLock() # Semaphore for limiting concurrent requests self.request_semaphore = threading.Semaphore(self.config["max_concurrent_requests"]) # Load endpoint configurations if enabled if self.config["load_from_storage"] and self.config["endpoint_config_path"]: self._load_endpoint_configs() # Initialize resource monitoring if enabled if self.config["resource_monitoring_enabled"]: self._start_resource_monitoring() def _start_resource_monitoring(self) -> None: """Initialize resource monitoring.""" try: import psutil # Initial resource check self._update_resource_usage() # Schedule periodic updates through telemetry's system metrics # This leverages existing system metrics collection except ImportError: self.telemetry.warning( "psutil not available, resource monitoring disabled", category=EventCategory.API ) self.config["resource_monitoring_enabled"] = False def _update_resource_usage(self) -> None: """Update resource usage statistics.""" try: import psutil with self.resource_lock: self.resource_usage["cpu"] = psutil.cpu_percent(interval=0.1) self.resource_usage["memory"] = psutil.virtual_memory().percent self.resource_usage["last_updated"] = time.time() except Exception as e: self.telemetry.error( f"Error updating resource usage: {e}", category=EventCategory.API, exc_info=True ) def _load_endpoint_configs(self) -> None: """Load endpoint configurations from storage.""" import os import json try: if not os.path.exists(self.config["endpoint_config_path"]): return with open(self.config["endpoint_config_path"], "r") as f: config_data = json.load(f) with self.config_lock: for endpoint, config in config_data.items(): self.endpoint_configs[endpoint] = EndpointConfiguration( priority=EndpointPriority[config["priority"]], resource_consumption=ResourceConsumption[config["resource_consumption"]], timeout=config["timeout"], retries=config["retries"], retry_delay=config["retry_delay"], circuit_breaker_enabled=config["circuit_breaker_enabled"], failure_threshold=config["failure_threshold"], recovery_timeout=config["recovery_timeout"], cache_enabled=config["cache_enabled"], cache_duration=config["cache_duration"], fallback_strategy=FallbackRegistryStrategy(config["fallback_strategy"]), requires_authentication=config["requires_authentication"], allows_batching=config["allows_batching"], idempotent=config["idempotent"], tags=config["tags"] ) self.telemetry.info( "Loaded endpoint configurations from storage", category=EventCategory.API, context={"path": self.config["endpoint_config_path"]} ) except Exception as e: self.telemetry.error( f"Failed to load endpoint configurations: {e}", category=EventCategory.API, exc_info=True ) def _save_endpoint_configs(self) -> None: """Save endpoint configurations to storage.""" import os import json try: if not self.config["endpoint_config_path"]: return # Create directory if it doesn't exist os.makedirs(os.path.dirname(self.config["endpoint_config_path"]), exist_ok=True) config_data = {} with self.config_lock: for endpoint, config in self.endpoint_configs.items(): config_data[endpoint] = config.to_dict() with open(self.config["endpoint_config_path"], "w") as f: json.dump(config_data, f, indent=2) self.telemetry.debug( "Saved endpoint configurations to storage", category=EventCategory.API, context={"path": self.config["endpoint_config_path"]} ) except Exception as e: self.telemetry.error( f"Failed to save endpoint configurations: {e}", category=EventCategory.API, exc_info=True ) def set_endpoint_config(self, endpoint: str, config: EndpointConfiguration) -> None: """ Set configuration for an endpoint. Args: endpoint: API endpoint config: Endpoint configuration """ with self.config_lock: self.endpoint_configs[endpoint] = config # Create circuit breaker if needed if config.circuit_breaker_enabled and endpoint not in self.circuit_breakers: self.circuit_breakers[endpoint] = CircuitBreaker( failure_threshold=config.failure_threshold, recovery_timeout=config.recovery_timeout ) if self.config["save_to_storage"] and self.config["endpoint_config_path"]: self._save_endpoint_configs() def get_endpoint_config(self, endpoint: str) -> EndpointConfiguration: """ Get configuration for an endpoint. Args: endpoint: API endpoint Returns: Endpoint configuration """ with self.config_lock: # Check for exact match if endpoint in self.endpoint_configs: return self.endpoint_configs[endpoint] # Check for pattern matches for pattern, config in self.endpoint_configs.items(): if "*" in pattern: # Convert pattern to simpler matching logic if pattern.startswith("*") and pattern.endswith("*"): # *pattern* - contains pattern_part = pattern.strip("*") if pattern_part in endpoint: return config elif pattern.startswith("*"): # *pattern - ends with pattern_part = pattern[1:] if endpoint.endswith(pattern_part): return config elif pattern.endswith("*"): # pattern* - starts with pattern_part = pattern[:-1] if endpoint.startswith(pattern_part): return config # Return default configuration return EndpointConfiguration( priority=self.config["default_priority"], resource_consumption=self.config["default_resource_consumption"], timeout=self.config["default_timeout"], retries=self.config["default_retries"], retry_delay=self.config["default_retry_delay"], circuit_breaker_enabled=self.config["default_circuit_breaker_enabled"], cache_enabled=self.config["default_cache_enabled"], cache_duration=self.config["default_cache_duration"], fallback_strategy=self.config["default_fallback_strategy"] ) def _get_circuit_breaker(self, endpoint: str) -> Optional[CircuitBreaker]: """ Get circuit breaker for an endpoint. Args: endpoint: API endpoint Returns: Circuit breaker or None if not enabled """ with self.config_lock: # Check for exact match if endpoint in self.circuit_breakers: return self.circuit_breakers[endpoint] # Get endpoint configuration config = self.get_endpoint_config(endpoint) if not config.circuit_breaker_enabled: return None # Create new circuit breaker circuit_breaker = CircuitBreaker( failure_threshold=config.failure_threshold, recovery_timeout=config.recovery_timeout ) self.circuit_breakers[endpoint] = circuit_breaker return circuit_breaker def _update_request_stats(self, endpoint: str, success: bool, duration: float) -> None: """ Update request statistics for an endpoint. Args: endpoint: API endpoint success: Whether the request was successful duration: Request duration in seconds """ with self.stats_lock: if endpoint not in self.request_stats: self.request_stats[endpoint] = { "total_requests": 0, "successful_requests": 0, "failed_requests": 0, "total_duration": 0.0, "avg_duration": 0.0, "last_request_time": 0.0, "min_duration": float('inf'), "max_duration": 0.0 } stats = self.request_stats[endpoint] stats["total_requests"] += 1 stats["last_request_time"] = time.time() stats["total_duration"] += duration stats["avg_duration"] = stats["total_duration"] / stats["total_requests"] stats["min_duration"] = min(stats["min_duration"], duration) stats["max_duration"] = max(stats["max_duration"], duration) if success: stats["successful_requests"] += 1 else: stats["failed_requests"] += 1 def _should_throttle_request(self, endpoint: str) -> bool: """ Check if a request should be throttled based on resource usage. Args: endpoint: API endpoint Returns: Whether the request should be throttled """ if not self.config["request_throttling_enabled"]: return False # Get endpoint configuration config = self.get_endpoint_config(endpoint) # Don't throttle critical requests if config.priority == EndpointPriority.CRITICAL: return False # Check resource usage with self.resource_lock: # If resource usage data is stale, update it if time.time() - self.resource_usage["last_updated"] > 5.0: self._update_resource_usage() cpu_usage = self.resource_usage["cpu"] memory_usage = self.resource_usage["memory"] # Check if we're over thresholds cpu_threshold = self.config["resource_threshold_cpu"] memory_threshold = self.config["resource_threshold_memory"] # Apply more aggressive throttling for resource-intensive requests if config.resource_consumption == ResourceConsumption.INTENSIVE: cpu_threshold *= 0.8 memory_threshold *= 0.8 elif config.resource_consumption == ResourceConsumption.HEAVY: cpu_threshold *= 0.9 memory_threshold *= 0.9 # Only throttle low priority requests initially if config.priority == EndpointPriority.LOW or config.priority == EndpointPriority.BACKGROUND: if cpu_usage > cpu_threshold * 0.8 or memory_usage > memory_threshold * 0.8: return True # Throttle normal priority requests only when closer to thresholds if config.priority == EndpointPriority.NORMAL: if cpu_usage > cpu_threshold * 0.9 or memory_usage > memory_threshold * 0.9: return True # Throttle high priority requests only when at thresholds if config.priority == EndpointPriority.HIGH: if cpu_usage > cpu_threshold or memory_usage > memory_threshold: return True return False def _adjust_timeout(self, endpoint: str, base_timeout: float) -> float: """ Adjust timeout based on request statistics. Args: endpoint: API endpoint base_timeout: Base timeout in seconds Returns: Adjusted timeout in seconds """ if not self.config["enable_adaptive_timeouts"]: return base_timeout with self.stats_lock: if endpoint not in self.request_stats: return base_timeout stats = self.request_stats[endpoint] if stats["total_requests"] < 5: return base_timeout # Calculate timeout based on average duration plus buffer avg_duration = stats["avg_duration"] max_duration = stats["max_duration"] # Use a higher factor of the average for more consistent endpoints # and a lower factor of the max for less consistent endpoints if stats["max_duration"] > stats["avg_duration"] * 3: # High variance, use max as reference adjusted_timeout = max_duration * 1.2 else: # Low variance, use average as reference adjusted_timeout = avg_duration * self.config["adaptive_timeout_factor"] # Don't go below base timeout return max(base_timeout, adjusted_timeout) def _adjust_retries(self, endpoint: str, base_retries: int) -> int: """ Adjust retry count based on request statistics. Args: endpoint: API endpoint base_retries: Base retry count Returns: Adjusted retry count """ if not self.config["enable_adaptive_timeouts"]: return base_retries with self.stats_lock: if endpoint not in self.request_stats: return base_retries stats = self.request_stats[endpoint] if stats["total_requests"] < 5: return base_retries # Calculate success rate success_rate = stats["successful_requests"] / stats["total_requests"] # Adjust retries based on success rate if success_rate > 0.95: # Very reliable, reduce retries return max(1, int(base_retries * self.config["adaptive_retry_factor"])) elif success_rate < 0.5: # Unreliable, increase retries return min(10, int(base_retries / self.config["adaptive_retry_factor"])) return base_retries def request( self, method: str, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a context-aware API request. Args: method: HTTP method endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data Raises: MGError: If the request fails """ context = context or {} options = options or {} # Get endpoint configuration config = self.get_endpoint_config(endpoint) # Create request options based on configuration request_options = { "timeout": config.timeout, "retries": config.retries, "retry_delay": config.retry_delay, "cache": config.cache_enabled, "cache_duration": config.cache_duration, "circuit_breaker": config.circuit_breaker_enabled, "critical": config.priority == EndpointPriority.CRITICAL, "headers": options.get("headers", {}) } # Apply adaptive timeouts and retries if self.config["enable_adaptive_timeouts"]: request_options["timeout"] = self._adjust_timeout(endpoint, config.timeout) request_options["retries"] = self._adjust_retries(endpoint, config.retries) # Override with explicit options if "timeout" in options: request_options["timeout"] = options["timeout"] if "retries" in options: request_options["retries"] = options["retries"] if "retry_delay" in options: request_options["retry_delay"] = options["retry_delay"] if "cache" in options: request_options["cache"] = options["cache"] if "cache_duration" in options: request_options["cache_duration"] = options["cache_duration"] if "circuit_breaker" in options: request_options["circuit_breaker"] = options["circuit_breaker"] if "critical" in options: request_options["critical"] = options["critical"] # Set priority based on endpoint configuration if config.priority == EndpointPriority.CRITICAL: request_options["priority"] = "high" elif config.priority == EndpointPriority.HIGH: request_options["priority"] = "high" elif config.priority == EndpointPriority.LOW: request_options["priority"] = "low" elif config.priority == EndpointPriority.BACKGROUND: request_options["priority"] = "low" # Set up fallback fallback_value = None if not options.get("no_fallback", False): fallback_strategy = options.get("fallback_strategy", config.fallback_strategy) # Map fallback strategy to APIUtils strategy if fallback_strategy == FallbackRegistryStrategy.STATIC: api_fallback_strategy = FallbackStrategy.STATIC elif fallback_strategy == FallbackRegistryStrategy.RANDOM: api_fallback_strategy = FallbackStrategy.RANDOM elif fallback_strategy == FallbackRegistryStrategy.WEIGHTED: api_fallback_strategy = FallbackStrategy.WEIGHTED else: api_fallback_strategy = FallbackStrategy.STATIC # Try to get fallback from registry fallback_value = self.fallback_registry.get_fallback( endpoint, strategy=fallback_strategy, context=context, args=[method, endpoint, data, request_options], kwargs={} ) if fallback_value is not None: request_options["fallback"] = fallback_value # Check if we should throttle this request if self._should_throttle_request(endpoint): if config.priority == EndpointPriority.BACKGROUND: # Drop background requests when throttling self.telemetry.debug( f"Dropping background request {method} {endpoint} due to resource constraints", category=EventCategory.API, context={"priority": config.priority.name} ) # Use fallback if available if fallback_value is not None: return fallback_value # Otherwise, raise throttled error raise MGError( message="Request throttled due to resource constraints", code=ErrorCode.THROTTLED, category=ErrorCategory.SYSTEM, severity=ErrorSeverity.WARNING ) self.telemetry.debug( f"Throttling request {method} {endpoint} due to resource constraints", category=EventCategory.API, context={"priority": config.priority.name} ) # Check circuit breaker circuit_breaker = self._get_circuit_breaker(endpoint) if circuit_breaker and not circuit_breaker.allow_request(): self.telemetry.warning( f"Circuit breaker open for {endpoint}", category=EventCategory.API, context={"endpoint": endpoint} ) # Use fallback if available if fallback_value is not None: return fallback_value # Otherwise, raise circuit open error raise MGError( message=f"Circuit breaker open for {endpoint}", code=ErrorCode.SERVICE_UNAVAILABLE, category=ErrorCategory.API, severity=ErrorSeverity.WARNING ) # Record start time start_time = time.time() try: # Acquire semaphore if limiting concurrent requests if self.config["max_concurrent_requests"] > 0: self.request_semaphore.acquire() # Make the request response = self.api_utils.request(method, endpoint, data, request_options) # Record success if circuit_breaker: circuit_breaker.record_success() duration = time.time() - start_time self._update_request_stats(endpoint, True, duration) return response except Exception as error: # Record failure if circuit_breaker: circuit_breaker.record_failure() duration = time.time() - start_time self._update_request_stats(endpoint, False, duration) # Log error with context self.telemetry.error( f"Error in API request {method} {endpoint}: {error}", category=EventCategory.API, context={ "method": method, "endpoint": endpoint, "duration": duration, "error": str(error) } ) raise error finally: # Release semaphore if acquired if self.config["max_concurrent_requests"] > 0: self.request_semaphore.release() def get( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a GET request. Args: endpoint: API endpoint data: Query parameters context: Request context options: Request options Returns: Response data """ return self.request("GET", endpoint, data, context, options) def post( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a POST request. Args: endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data """ return self.request("POST", endpoint, data, context, options) def put( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a PUT request. Args: endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data """ return self.request("PUT", endpoint, data, context, options) def patch( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a PATCH request. Args: endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data """ return self.request("PATCH", endpoint, data, context, options) def delete( self, endpoint: str, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a DELETE request. Args: endpoint: API endpoint context: Request context options: Request options Returns: Response data """ return self.request("DELETE", endpoint, None, context, options) def get_request_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]: """ Get request statistics. Args: endpoint: Optional endpoint to get stats for Returns: Request statistics """ with self.stats_lock: if endpoint: return self.request_stats.get(endpoint, {}).copy() else: return { endpoint: stats.copy() for endpoint, stats in self.request_stats.items() } def get_resource_usage(self) -> Dict[str, Any]: """ Get current resource usage. Returns: Resource usage statistics """ with self.resource_lock: return self.resource_usage.copy() def reset_circuit_breaker(self, endpoint: str) -> None: """ Reset circuit breaker for an endpoint. Args: endpoint: API endpoint """ with self.config_lock: if endpoint in self.circuit_breakers: self.circuit_breakers[endpoint].reset() def reset_all_circuit_breakers(self) -> None: """Reset all circuit breakers.""" with self.config_lock: for circuit_breaker in self.circuit_breakers.values(): circuit_breaker.reset() def is_in_degraded_mode(self) -> bool: """ Check if the system is in degraded mode. Returns: Whether the system is in degraded mode """ return self.fallback_registry.is_degraded_mode() def register_fallback( self, endpoint: str, key: str, fallback: Union[Any, Callable], is_function: bool = False, tier: FallbackTier = FallbackTier.PRIMARY, context_rules: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, str]] = None ) -> None: """ Register a fallback for an endpoint. Args: endpoint: API endpoint key: Fallback key fallback: Fallback value or function is_function: Whether the fallback is a function tier: Fallback tier context_rules: Rules for context-based selection tags: Tags for the fallback """ self.fallback_registry.register_fallback( endpoint=endpoint, key=key, fallback=fallback, is_function=is_function, tier=tier, context_rules=context_rules, tags=tags ) def shutdown(self) -> None: """Shutdown the adapter and save configurations.""" # Save endpoint configurations if enabled if self.config["save_to_storage"] and self.config["endpoint_config_path"]: self._save_endpoint_configs() # Shutdown fallback registry self.fallback_registry.shutdown() # Singleton instance _instance = None def get_context_aware_api(options: Dict[str, Any] = None) -> ContextAwareAPIAdapter: """ Get the global context-aware API adapter instance. Args: options: Configuration options Returns: ContextAwareAPIAdapter instance """ global _instance if _instance is None: _instance = ContextAwareAPIAdapter(options) return _instance