""" API resilience system for MorphGuard. This module provides a comprehensive API resilience system with: - Intelligent fallbacks - Context-aware request handling - Dynamic routing - Health-aware service selection - Degraded mode operations - Resource-aware request throttling Usage: from src.api_resilience import get_resilient_api # Get the resilient API client api = get_resilient_api() # Make requests with context response = api.get("/users", context={"user_id": "123"}) """ import os import json import logging from typing import Dict, Any, Optional, List, Union, Callable # Import modules from src.telemetry import get_telemetry, EventCategory from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory from src.api_fallback_registry import ( get_fallback_registry, FallbackTier, FallbackStrategy, HealthStatus ) from src.context_aware_api import ( get_context_aware_api, EndpointPriority, ResourceConsumption, EndpointConfiguration ) from src.api_utils import get_api_utils class ResilientAPIClient: """ Resilient API client for MorphGuard. This client integrates: - The core API utilities - The fallback registry - The context-aware API adapter Providing a comprehensive resilient API client with: - Intelligent fallbacks - Context-aware request handling - Dynamic routing - Health-aware service selection - Degraded mode operations - Resource-aware request throttling """ def __init__(self, options: Dict[str, Any] = None): """ Initialize the resilient API client. Args: options: Configuration options """ # Default configuration self.config = { "base_url": "http://localhost:5000/api", "cache_dir": ".mg_api_cache", "fallbacks_dir": ".mg_fallbacks", "timeout": 30.0, "retries": 3, "enable_telemetry": True, "enable_fallbacks": True, "enable_circuit_breaker": True, "enable_context_aware": True, "enable_resource_monitoring": True, "endpoint_config_path": None, "fallback_storage_path": None, "static_fallbacks": {}, "register_default_fallbacks": True, "auto_register_fallbacks": True } # Update with user-provided options if options: self.config.update(options) # Create directories if needed os.makedirs(self.config["cache_dir"], exist_ok=True) os.makedirs(self.config["fallbacks_dir"], exist_ok=True) # Initialize telemetry self.telemetry = get_telemetry() # Configure paths if not provided if not self.config["endpoint_config_path"]: self.config["endpoint_config_path"] = os.path.join( self.config["fallbacks_dir"], "endpoint_configs.json" ) if not self.config["fallback_storage_path"]: self.config["fallback_storage_path"] = os.path.join( self.config["fallbacks_dir"], "fallbacks.json" ) # Get API utilities api_utils_options = { "base_url": self.config["base_url"], "cache_dir": self.config["cache_dir"], "timeout": self.config["timeout"], "retries": self.config["retries"], "circuit_breaker_enabled": self.config["enable_circuit_breaker"] } self.api_utils = get_api_utils(api_utils_options) # Get fallback registry fallback_registry_options = { "fallback_storage_path": self.config["fallback_storage_path"], "load_from_storage": True, "save_to_storage": True, "enable_telemetry": self.config["enable_telemetry"] } self.fallback_registry = get_fallback_registry(fallback_registry_options) # Get context-aware API adapter context_api_options = { "endpoint_config_path": self.config["endpoint_config_path"], "load_from_storage": True, "save_to_storage": True, "resource_monitoring_enabled": self.config["enable_resource_monitoring"], "enable_telemetry": self.config["enable_telemetry"] } self.context_api = get_context_aware_api(context_api_options) # Register default fallbacks if enabled if self.config["register_default_fallbacks"]: self._register_default_fallbacks() # Register static fallbacks from config if self.config["static_fallbacks"]: self._register_static_fallbacks() # Log initialization self.telemetry.info( "Resilient API client initialized", category=EventCategory.API, context={ "base_url": self.config["base_url"], "fallbacks_enabled": self.config["enable_fallbacks"], "circuit_breaker_enabled": self.config["enable_circuit_breaker"], "context_aware_enabled": self.config["enable_context_aware"] } ) def _register_default_fallbacks(self): """Register default fallbacks for common endpoints.""" # User profile fallback self.fallback_registry.register_fallback( endpoint="/users/profile", key="default_profile", fallback={"id": None, "name": "Guest", "email": None, "role": "guest"}, tier=FallbackTier.PRIMARY, tags={"type": "static", "category": "user"} ) # User settings fallback self.fallback_registry.register_fallback( endpoint="/users/settings", key="default_settings", fallback={"theme": "light", "notifications": True, "language": "en"}, tier=FallbackTier.PRIMARY, tags={"type": "static", "category": "user"} ) # Stats fallback self.fallback_registry.register_fallback( endpoint="/stats/*", key="empty_stats", fallback={}, tier=FallbackTier.PRIMARY, tags={"type": "static", "category": "stats"} ) # Configuration fallback for empty response self.fallback_registry.register_fallback( endpoint="/config/*", key="empty_config", fallback={}, tier=FallbackTier.PRIMARY, tags={"type": "static", "category": "config"} ) # Function to generate fallback data def generate_empty_list(): return [] # Register dynamic fallbacks self.fallback_registry.register_fallback( endpoint="/items/*", key="empty_items", fallback=generate_empty_list, is_function=True, tier=FallbackTier.PRIMARY, tags={"type": "dynamic", "category": "items"} ) def _register_static_fallbacks(self): """Register static fallbacks from configuration.""" for endpoint, fallbacks in self.config["static_fallbacks"].items(): for key, fallback_data in fallbacks.items(): value = fallback_data.get("value") is_function = fallback_data.get("is_function", False) tier_name = fallback_data.get("tier", "PRIMARY") tier = getattr(FallbackTier, tier_name, FallbackTier.PRIMARY) context_rules = fallback_data.get("context_rules") tags = fallback_data.get("tags") self.fallback_registry.register_fallback( endpoint=endpoint, key=key, fallback=value, is_function=is_function, tier=tier, context_rules=context_rules, tags=tags ) def _should_use_context_api(self, options: Dict[str, Any]) -> bool: """ Check if the context-aware API should be used. Args: options: Request options Returns: Whether to use the context-aware API """ # Check if explicitly disabled in options if "use_context_api" in options: return options["use_context_api"] # Check if globally enabled return self.config["enable_context_aware"] def request( self, method: str, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a resilient API request. Args: method: HTTP method endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data Raises: MGError: If the request fails """ options = options or {} try: # Use context-aware API if enabled if self._should_use_context_api(options): return self.context_api.request(method, endpoint, data, context, options) else: # Otherwise, use base API utilities return self.api_utils.request(method, endpoint, data, options) except Exception as error: # Try to use fallback if enabled if self.config["enable_fallbacks"] and not options.get("no_fallback", False): fallback = self.fallback_registry.get_fallback( endpoint, context=context, args=[method, endpoint, data, options], kwargs={} ) if fallback is not None: self.telemetry.info( f"Using fallback for {method} {endpoint}", category=EventCategory.API, context={"method": method, "endpoint": endpoint} ) return fallback # Re-raise the error raise error def get( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a GET request. Args: endpoint: API endpoint data: Query parameters context: Request context options: Request options Returns: Response data """ return self.request("GET", endpoint, data, context, options) def post( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a POST request. Args: endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data """ return self.request("POST", endpoint, data, context, options) def put( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a PUT request. Args: endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data """ return self.request("PUT", endpoint, data, context, options) def patch( self, endpoint: str, data: Any = None, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a PATCH request. Args: endpoint: API endpoint data: Request data context: Request context options: Request options Returns: Response data """ return self.request("PATCH", endpoint, data, context, options) def delete( self, endpoint: str, context: Optional[Dict[str, Any]] = None, options: Optional[Dict[str, Any]] = None ) -> Any: """ Make a DELETE request. Args: endpoint: API endpoint context: Request context options: Request options Returns: Response data """ return self.request("DELETE", endpoint, None, context, options) def register_fallback( self, endpoint: str, key: str, fallback: Union[Any, Callable], is_function: bool = False, tier: FallbackTier = FallbackTier.PRIMARY, context_rules: Optional[Dict[str, Any]] = None, tags: Optional[Dict[str, str]] = None ) -> None: """ Register a fallback for an endpoint. Args: endpoint: API endpoint key: Fallback key fallback: Fallback value or function is_function: Whether the fallback is a function tier: Fallback tier context_rules: Rules for context-based selection tags: Tags for the fallback """ self.fallback_registry.register_fallback( endpoint=endpoint, key=key, fallback=fallback, is_function=is_function, tier=tier, context_rules=context_rules, tags=tags ) def configure_endpoint( self, endpoint: str, priority: Optional[EndpointPriority] = None, resource_consumption: Optional[ResourceConsumption] = None, timeout: Optional[float] = None, retries: Optional[int] = None, retry_delay: Optional[float] = None, circuit_breaker_enabled: Optional[bool] = None, failure_threshold: Optional[int] = None, recovery_timeout: Optional[float] = None, cache_enabled: Optional[bool] = None, cache_duration: Optional[int] = None, fallback_strategy: Optional[FallbackStrategy] = None, requires_authentication: Optional[bool] = None, allows_batching: Optional[bool] = None, idempotent: Optional[bool] = None, tags: Optional[Dict[str, str]] = None ) -> None: """ Configure an endpoint. Args: endpoint: API endpoint priority: Priority level for the endpoint resource_consumption: Resource consumption level timeout: Request timeout in seconds retries: Number of retries retry_delay: Delay between retries in seconds circuit_breaker_enabled: Whether to enable circuit breaker failure_threshold: Number of failures before opening circuit recovery_timeout: Time before trying to recover in seconds cache_enabled: Whether to enable caching cache_duration: Cache duration in seconds fallback_strategy: Strategy for selecting fallbacks requires_authentication: Whether authentication is required allows_batching: Whether the endpoint can be batched idempotent: Whether the endpoint is idempotent tags: Tags for the endpoint """ # Get existing configuration if any config = self.context_api.get_endpoint_config(endpoint) # Update with provided values if priority is not None: config.priority = priority if resource_consumption is not None: config.resource_consumption = resource_consumption if timeout is not None: config.timeout = timeout if retries is not None: config.retries = retries if retry_delay is not None: config.retry_delay = retry_delay if circuit_breaker_enabled is not None: config.circuit_breaker_enabled = circuit_breaker_enabled if failure_threshold is not None: config.failure_threshold = failure_threshold if recovery_timeout is not None: config.recovery_timeout = recovery_timeout if cache_enabled is not None: config.cache_enabled = cache_enabled if cache_duration is not None: config.cache_duration = cache_duration if fallback_strategy is not None: config.fallback_strategy = fallback_strategy if requires_authentication is not None: config.requires_authentication = requires_authentication if allows_batching is not None: config.allows_batching = allows_batching if idempotent is not None: config.idempotent = idempotent if tags is not None: config.tags.update(tags) # Set updated configuration self.context_api.set_endpoint_config(endpoint, config) def get_endpoint_configuration(self, endpoint: str) -> Dict[str, Any]: """ Get configuration for an endpoint. Args: endpoint: API endpoint Returns: Endpoint configuration as dictionary """ config = self.context_api.get_endpoint_config(endpoint) return config.to_dict() def reset_circuit_breaker(self, endpoint: str) -> None: """ Reset circuit breaker for an endpoint. Args: endpoint: API endpoint """ self.context_api.reset_circuit_breaker(endpoint) def reset_all_circuit_breakers(self) -> None: """Reset all circuit breakers.""" self.context_api.reset_all_circuit_breakers() def clear_cache(self, endpoint: Optional[str] = None) -> None: """ Clear the cache. Args: endpoint: Optional endpoint to clear (clears all if None) """ self.api_utils.clear_cache(endpoint) def is_in_degraded_mode(self) -> bool: """ Check if the system is in degraded mode. Returns: Whether the system is in degraded mode """ return self.context_api.is_in_degraded_mode() def get_request_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]: """ Get request statistics. Args: endpoint: Optional endpoint to get stats for Returns: Request statistics """ return self.context_api.get_request_stats(endpoint) def get_fallbacks(self, endpoint: Optional[str] = None) -> Dict[str, Any]: """ Get registered fallbacks. Args: endpoint: Optional endpoint to get fallbacks for Returns: Fallback information """ if endpoint: return self.fallback_registry.get_fallbacks_for_endpoint(endpoint) else: return self.fallback_registry.get_all_fallbacks() def shutdown(self) -> None: """Shutdown the client and save state.""" # Shutdown context-aware API self.context_api.shutdown() # Shutdown fallback registry self.fallback_registry.shutdown() # Singleton instance _instance = None def get_resilient_api(options: Dict[str, Any] = None) -> ResilientAPIClient: """ Get the global resilient API client instance. Args: options: Configuration options Returns: ResilientAPIClient instance """ global _instance if _instance is None: _instance = ResilientAPIClient(options) return _instance