Spaces:
Running
Running
| """ | |
| Context-aware API adapter for MorphGuard. | |
| This module provides a context-aware API adapter that enhances the existing API client with: | |
| - Intelligent request routing based on context | |
| - Dynamic endpoint selection | |
| - Adaptive timeout and retry strategies | |
| - Resource-aware request throttling | |
| - Integrated circuit breaking and fallbacks | |
| """ | |
| import time | |
| import threading | |
| import logging | |
| from enum import Enum | |
| from typing import Dict, Any, List, Optional, Union, Callable, Tuple, TypeVar | |
| # Import from existing modules | |
| from src.telemetry import get_telemetry, EventCategory | |
| from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory | |
| from src.api_utils import get_api_utils, FallbackStrategy, CircuitBreaker | |
| from src.api_fallback_registry import ( | |
| get_fallback_registry, FallbackTier, FallbackStrategy as FallbackRegistryStrategy | |
| ) | |
| # Type definitions | |
| T = TypeVar('T') | |
| class EndpointPriority(Enum): | |
| """Priority levels for API endpoints.""" | |
| CRITICAL = 0 # Highest priority, must succeed | |
| HIGH = 1 # High priority, should prioritize | |
| NORMAL = 2 # Normal priority | |
| LOW = 3 # Low priority, can be delayed | |
| BACKGROUND = 4 # Lowest priority, can be dropped | |
| class ResourceConsumption(Enum): | |
| """Resource consumption levels for requests.""" | |
| MINIMAL = 0 # Very light requests | |
| LIGHT = 1 # Light requests | |
| MODERATE = 2 # Moderate resource usage | |
| HEAVY = 3 # Heavy resource usage | |
| INTENSIVE = 4 # Very intensive resource usage | |
| class EndpointConfiguration: | |
| """Configuration for an API endpoint.""" | |
| def __init__( | |
| self, | |
| priority: EndpointPriority = EndpointPriority.NORMAL, | |
| resource_consumption: ResourceConsumption = ResourceConsumption.MODERATE, | |
| timeout: float = 30.0, | |
| retries: int = 3, | |
| retry_delay: float = 1.0, | |
| circuit_breaker_enabled: bool = True, | |
| failure_threshold: int = 5, | |
| recovery_timeout: float = 30.0, | |
| cache_enabled: bool = True, | |
| cache_duration: int = 300, | |
| fallback_strategy: FallbackRegistryStrategy = FallbackRegistryStrategy.STATIC, | |
| requires_authentication: bool = False, | |
| allows_batching: bool = True, | |
| idempotent: bool = False, | |
| tags: Optional[Dict[str, str]] = None | |
| ): | |
| """ | |
| Initialize endpoint configuration. | |
| Args: | |
| priority: Priority level for the endpoint | |
| resource_consumption: Resource consumption level | |
| timeout: Request timeout in seconds | |
| retries: Number of retries | |
| retry_delay: Delay between retries in seconds | |
| circuit_breaker_enabled: Whether to enable circuit breaker | |
| failure_threshold: Number of failures before opening circuit | |
| recovery_timeout: Time before trying to recover in seconds | |
| cache_enabled: Whether to enable caching | |
| cache_duration: Cache duration in seconds | |
| fallback_strategy: Strategy for selecting fallbacks | |
| requires_authentication: Whether authentication is required | |
| allows_batching: Whether the endpoint can be batched | |
| idempotent: Whether the endpoint is idempotent | |
| tags: Tags for the endpoint | |
| """ | |
| self.priority = priority | |
| self.resource_consumption = resource_consumption | |
| self.timeout = timeout | |
| self.retries = retries | |
| self.retry_delay = retry_delay | |
| self.circuit_breaker_enabled = circuit_breaker_enabled | |
| self.failure_threshold = failure_threshold | |
| self.recovery_timeout = recovery_timeout | |
| self.cache_enabled = cache_enabled | |
| self.cache_duration = cache_duration | |
| self.fallback_strategy = fallback_strategy | |
| self.requires_authentication = requires_authentication | |
| self.allows_batching = allows_batching | |
| self.idempotent = idempotent | |
| self.tags = tags or {} | |
| def to_dict(self) -> Dict[str, Any]: | |
| """ | |
| Convert to dictionary. | |
| Returns: | |
| Dictionary representation | |
| """ | |
| return { | |
| "priority": self.priority.name, | |
| "resource_consumption": self.resource_consumption.name, | |
| "timeout": self.timeout, | |
| "retries": self.retries, | |
| "retry_delay": self.retry_delay, | |
| "circuit_breaker_enabled": self.circuit_breaker_enabled, | |
| "failure_threshold": self.failure_threshold, | |
| "recovery_timeout": self.recovery_timeout, | |
| "cache_enabled": self.cache_enabled, | |
| "cache_duration": self.cache_duration, | |
| "fallback_strategy": self.fallback_strategy.value, | |
| "requires_authentication": self.requires_authentication, | |
| "allows_batching": self.allows_batching, | |
| "idempotent": self.idempotent, | |
| "tags": self.tags | |
| } | |
| class ContextAwareAPIAdapter: | |
| """ | |
| Context-aware API adapter that enhances the existing API client. | |
| This adapter provides: | |
| - Intelligent request routing based on context | |
| - Dynamic endpoint selection | |
| - Adaptive timeout and retry strategies | |
| - Resource-aware request throttling | |
| - Integrated circuit breaking and fallbacks | |
| """ | |
| def __init__(self, options: Dict[str, Any] = None): | |
| """ | |
| Initialize the API adapter. | |
| Args: | |
| options: Configuration options | |
| """ | |
| # Default configuration | |
| self.config = { | |
| "default_timeout": 30.0, | |
| "default_retries": 3, | |
| "default_retry_delay": 1.0, | |
| "default_priority": EndpointPriority.NORMAL, | |
| "default_resource_consumption": ResourceConsumption.MODERATE, | |
| "default_circuit_breaker_enabled": True, | |
| "default_cache_enabled": True, | |
| "default_cache_duration": 300, | |
| "default_fallback_strategy": FallbackRegistryStrategy.STATIC, | |
| "endpoint_config_path": None, | |
| "load_from_storage": False, | |
| "save_to_storage": False, | |
| "resource_monitoring_enabled": True, | |
| "resource_threshold_cpu": 80.0, # Percent | |
| "resource_threshold_memory": 80.0, # Percent | |
| "enable_telemetry": True, | |
| "enable_adaptive_timeouts": True, | |
| "adaptive_timeout_factor": 1.5, | |
| "adaptive_retry_factor": 0.8, | |
| "max_concurrent_requests": 100, | |
| "request_throttling_enabled": True | |
| } | |
| # Update with user-provided options | |
| if options: | |
| self.config.update(options) | |
| # Initialize telemetry | |
| self.telemetry = get_telemetry() | |
| # Get API utilities instance | |
| self.api_utils = get_api_utils() | |
| # Get fallback registry instance | |
| self.fallback_registry = get_fallback_registry() | |
| # Endpoint configurations | |
| self.endpoint_configs: Dict[str, EndpointConfiguration] = {} | |
| # Circuit breakers by endpoint | |
| self.circuit_breakers: Dict[str, CircuitBreaker] = {} | |
| # Request stats by endpoint | |
| self.request_stats: Dict[str, Dict[str, Any]] = {} | |
| # Resource usage stats | |
| self.resource_usage = { | |
| "cpu": 0.0, | |
| "memory": 0.0, | |
| "last_updated": 0.0 | |
| } | |
| # Locks | |
| self.config_lock = threading.RLock() | |
| self.stats_lock = threading.RLock() | |
| self.resource_lock = threading.RLock() | |
| # Semaphore for limiting concurrent requests | |
| self.request_semaphore = threading.Semaphore(self.config["max_concurrent_requests"]) | |
| # Load endpoint configurations if enabled | |
| if self.config["load_from_storage"] and self.config["endpoint_config_path"]: | |
| self._load_endpoint_configs() | |
| # Initialize resource monitoring if enabled | |
| if self.config["resource_monitoring_enabled"]: | |
| self._start_resource_monitoring() | |
| def _start_resource_monitoring(self) -> None: | |
| """Initialize resource monitoring.""" | |
| try: | |
| import psutil | |
| # Initial resource check | |
| self._update_resource_usage() | |
| # Schedule periodic updates through telemetry's system metrics | |
| # This leverages existing system metrics collection | |
| except ImportError: | |
| self.telemetry.warning( | |
| "psutil not available, resource monitoring disabled", | |
| category=EventCategory.API | |
| ) | |
| self.config["resource_monitoring_enabled"] = False | |
| def _update_resource_usage(self) -> None: | |
| """Update resource usage statistics.""" | |
| try: | |
| import psutil | |
| with self.resource_lock: | |
| self.resource_usage["cpu"] = psutil.cpu_percent(interval=0.1) | |
| self.resource_usage["memory"] = psutil.virtual_memory().percent | |
| self.resource_usage["last_updated"] = time.time() | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Error updating resource usage: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| def _load_endpoint_configs(self) -> None: | |
| """Load endpoint configurations from storage.""" | |
| import os | |
| import json | |
| try: | |
| if not os.path.exists(self.config["endpoint_config_path"]): | |
| return | |
| with open(self.config["endpoint_config_path"], "r") as f: | |
| config_data = json.load(f) | |
| with self.config_lock: | |
| for endpoint, config in config_data.items(): | |
| self.endpoint_configs[endpoint] = EndpointConfiguration( | |
| priority=EndpointPriority[config["priority"]], | |
| resource_consumption=ResourceConsumption[config["resource_consumption"]], | |
| timeout=config["timeout"], | |
| retries=config["retries"], | |
| retry_delay=config["retry_delay"], | |
| circuit_breaker_enabled=config["circuit_breaker_enabled"], | |
| failure_threshold=config["failure_threshold"], | |
| recovery_timeout=config["recovery_timeout"], | |
| cache_enabled=config["cache_enabled"], | |
| cache_duration=config["cache_duration"], | |
| fallback_strategy=FallbackRegistryStrategy(config["fallback_strategy"]), | |
| requires_authentication=config["requires_authentication"], | |
| allows_batching=config["allows_batching"], | |
| idempotent=config["idempotent"], | |
| tags=config["tags"] | |
| ) | |
| self.telemetry.info( | |
| "Loaded endpoint configurations from storage", | |
| category=EventCategory.API, | |
| context={"path": self.config["endpoint_config_path"]} | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to load endpoint configurations: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| def _save_endpoint_configs(self) -> None: | |
| """Save endpoint configurations to storage.""" | |
| import os | |
| import json | |
| try: | |
| if not self.config["endpoint_config_path"]: | |
| return | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(self.config["endpoint_config_path"]), exist_ok=True) | |
| config_data = {} | |
| with self.config_lock: | |
| for endpoint, config in self.endpoint_configs.items(): | |
| config_data[endpoint] = config.to_dict() | |
| with open(self.config["endpoint_config_path"], "w") as f: | |
| json.dump(config_data, f, indent=2) | |
| self.telemetry.debug( | |
| "Saved endpoint configurations to storage", | |
| category=EventCategory.API, | |
| context={"path": self.config["endpoint_config_path"]} | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to save endpoint configurations: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| def set_endpoint_config(self, endpoint: str, config: EndpointConfiguration) -> None: | |
| """ | |
| Set configuration for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| config: Endpoint configuration | |
| """ | |
| with self.config_lock: | |
| self.endpoint_configs[endpoint] = config | |
| # Create circuit breaker if needed | |
| if config.circuit_breaker_enabled and endpoint not in self.circuit_breakers: | |
| self.circuit_breakers[endpoint] = CircuitBreaker( | |
| failure_threshold=config.failure_threshold, | |
| recovery_timeout=config.recovery_timeout | |
| ) | |
| if self.config["save_to_storage"] and self.config["endpoint_config_path"]: | |
| self._save_endpoint_configs() | |
| def get_endpoint_config(self, endpoint: str) -> EndpointConfiguration: | |
| """ | |
| Get configuration for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| Returns: | |
| Endpoint configuration | |
| """ | |
| with self.config_lock: | |
| # Check for exact match | |
| if endpoint in self.endpoint_configs: | |
| return self.endpoint_configs[endpoint] | |
| # Check for pattern matches | |
| for pattern, config in self.endpoint_configs.items(): | |
| if "*" in pattern: | |
| # Convert pattern to simpler matching logic | |
| if pattern.startswith("*") and pattern.endswith("*"): | |
| # *pattern* - contains | |
| pattern_part = pattern.strip("*") | |
| if pattern_part in endpoint: | |
| return config | |
| elif pattern.startswith("*"): | |
| # *pattern - ends with | |
| pattern_part = pattern[1:] | |
| if endpoint.endswith(pattern_part): | |
| return config | |
| elif pattern.endswith("*"): | |
| # pattern* - starts with | |
| pattern_part = pattern[:-1] | |
| if endpoint.startswith(pattern_part): | |
| return config | |
| # Return default configuration | |
| return EndpointConfiguration( | |
| priority=self.config["default_priority"], | |
| resource_consumption=self.config["default_resource_consumption"], | |
| timeout=self.config["default_timeout"], | |
| retries=self.config["default_retries"], | |
| retry_delay=self.config["default_retry_delay"], | |
| circuit_breaker_enabled=self.config["default_circuit_breaker_enabled"], | |
| cache_enabled=self.config["default_cache_enabled"], | |
| cache_duration=self.config["default_cache_duration"], | |
| fallback_strategy=self.config["default_fallback_strategy"] | |
| ) | |
| def _get_circuit_breaker(self, endpoint: str) -> Optional[CircuitBreaker]: | |
| """ | |
| Get circuit breaker for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| Returns: | |
| Circuit breaker or None if not enabled | |
| """ | |
| with self.config_lock: | |
| # Check for exact match | |
| if endpoint in self.circuit_breakers: | |
| return self.circuit_breakers[endpoint] | |
| # Get endpoint configuration | |
| config = self.get_endpoint_config(endpoint) | |
| if not config.circuit_breaker_enabled: | |
| return None | |
| # Create new circuit breaker | |
| circuit_breaker = CircuitBreaker( | |
| failure_threshold=config.failure_threshold, | |
| recovery_timeout=config.recovery_timeout | |
| ) | |
| self.circuit_breakers[endpoint] = circuit_breaker | |
| return circuit_breaker | |
| def _update_request_stats(self, endpoint: str, success: bool, duration: float) -> None: | |
| """ | |
| Update request statistics for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| success: Whether the request was successful | |
| duration: Request duration in seconds | |
| """ | |
| with self.stats_lock: | |
| if endpoint not in self.request_stats: | |
| self.request_stats[endpoint] = { | |
| "total_requests": 0, | |
| "successful_requests": 0, | |
| "failed_requests": 0, | |
| "total_duration": 0.0, | |
| "avg_duration": 0.0, | |
| "last_request_time": 0.0, | |
| "min_duration": float('inf'), | |
| "max_duration": 0.0 | |
| } | |
| stats = self.request_stats[endpoint] | |
| stats["total_requests"] += 1 | |
| stats["last_request_time"] = time.time() | |
| stats["total_duration"] += duration | |
| stats["avg_duration"] = stats["total_duration"] / stats["total_requests"] | |
| stats["min_duration"] = min(stats["min_duration"], duration) | |
| stats["max_duration"] = max(stats["max_duration"], duration) | |
| if success: | |
| stats["successful_requests"] += 1 | |
| else: | |
| stats["failed_requests"] += 1 | |
| def _should_throttle_request(self, endpoint: str) -> bool: | |
| """ | |
| Check if a request should be throttled based on resource usage. | |
| Args: | |
| endpoint: API endpoint | |
| Returns: | |
| Whether the request should be throttled | |
| """ | |
| if not self.config["request_throttling_enabled"]: | |
| return False | |
| # Get endpoint configuration | |
| config = self.get_endpoint_config(endpoint) | |
| # Don't throttle critical requests | |
| if config.priority == EndpointPriority.CRITICAL: | |
| return False | |
| # Check resource usage | |
| with self.resource_lock: | |
| # If resource usage data is stale, update it | |
| if time.time() - self.resource_usage["last_updated"] > 5.0: | |
| self._update_resource_usage() | |
| cpu_usage = self.resource_usage["cpu"] | |
| memory_usage = self.resource_usage["memory"] | |
| # Check if we're over thresholds | |
| cpu_threshold = self.config["resource_threshold_cpu"] | |
| memory_threshold = self.config["resource_threshold_memory"] | |
| # Apply more aggressive throttling for resource-intensive requests | |
| if config.resource_consumption == ResourceConsumption.INTENSIVE: | |
| cpu_threshold *= 0.8 | |
| memory_threshold *= 0.8 | |
| elif config.resource_consumption == ResourceConsumption.HEAVY: | |
| cpu_threshold *= 0.9 | |
| memory_threshold *= 0.9 | |
| # Only throttle low priority requests initially | |
| if config.priority == EndpointPriority.LOW or config.priority == EndpointPriority.BACKGROUND: | |
| if cpu_usage > cpu_threshold * 0.8 or memory_usage > memory_threshold * 0.8: | |
| return True | |
| # Throttle normal priority requests only when closer to thresholds | |
| if config.priority == EndpointPriority.NORMAL: | |
| if cpu_usage > cpu_threshold * 0.9 or memory_usage > memory_threshold * 0.9: | |
| return True | |
| # Throttle high priority requests only when at thresholds | |
| if config.priority == EndpointPriority.HIGH: | |
| if cpu_usage > cpu_threshold or memory_usage > memory_threshold: | |
| return True | |
| return False | |
| def _adjust_timeout(self, endpoint: str, base_timeout: float) -> float: | |
| """ | |
| Adjust timeout based on request statistics. | |
| Args: | |
| endpoint: API endpoint | |
| base_timeout: Base timeout in seconds | |
| Returns: | |
| Adjusted timeout in seconds | |
| """ | |
| if not self.config["enable_adaptive_timeouts"]: | |
| return base_timeout | |
| with self.stats_lock: | |
| if endpoint not in self.request_stats: | |
| return base_timeout | |
| stats = self.request_stats[endpoint] | |
| if stats["total_requests"] < 5: | |
| return base_timeout | |
| # Calculate timeout based on average duration plus buffer | |
| avg_duration = stats["avg_duration"] | |
| max_duration = stats["max_duration"] | |
| # Use a higher factor of the average for more consistent endpoints | |
| # and a lower factor of the max for less consistent endpoints | |
| if stats["max_duration"] > stats["avg_duration"] * 3: | |
| # High variance, use max as reference | |
| adjusted_timeout = max_duration * 1.2 | |
| else: | |
| # Low variance, use average as reference | |
| adjusted_timeout = avg_duration * self.config["adaptive_timeout_factor"] | |
| # Don't go below base timeout | |
| return max(base_timeout, adjusted_timeout) | |
| def _adjust_retries(self, endpoint: str, base_retries: int) -> int: | |
| """ | |
| Adjust retry count based on request statistics. | |
| Args: | |
| endpoint: API endpoint | |
| base_retries: Base retry count | |
| Returns: | |
| Adjusted retry count | |
| """ | |
| if not self.config["enable_adaptive_timeouts"]: | |
| return base_retries | |
| with self.stats_lock: | |
| if endpoint not in self.request_stats: | |
| return base_retries | |
| stats = self.request_stats[endpoint] | |
| if stats["total_requests"] < 5: | |
| return base_retries | |
| # Calculate success rate | |
| success_rate = stats["successful_requests"] / stats["total_requests"] | |
| # Adjust retries based on success rate | |
| if success_rate > 0.95: | |
| # Very reliable, reduce retries | |
| return max(1, int(base_retries * self.config["adaptive_retry_factor"])) | |
| elif success_rate < 0.5: | |
| # Unreliable, increase retries | |
| return min(10, int(base_retries / self.config["adaptive_retry_factor"])) | |
| return base_retries | |
| def request( | |
| self, | |
| method: str, | |
| endpoint: str, | |
| data: Any = None, | |
| context: Optional[Dict[str, Any]] = None, | |
| options: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """ | |
| Make a context-aware API request. | |
| Args: | |
| method: HTTP method | |
| endpoint: API endpoint | |
| data: Request data | |
| context: Request context | |
| options: Request options | |
| Returns: | |
| Response data | |
| Raises: | |
| MGError: If the request fails | |
| """ | |
| context = context or {} | |
| options = options or {} | |
| # Get endpoint configuration | |
| config = self.get_endpoint_config(endpoint) | |
| # Create request options based on configuration | |
| request_options = { | |
| "timeout": config.timeout, | |
| "retries": config.retries, | |
| "retry_delay": config.retry_delay, | |
| "cache": config.cache_enabled, | |
| "cache_duration": config.cache_duration, | |
| "circuit_breaker": config.circuit_breaker_enabled, | |
| "critical": config.priority == EndpointPriority.CRITICAL, | |
| "headers": options.get("headers", {}) | |
| } | |
| # Apply adaptive timeouts and retries | |
| if self.config["enable_adaptive_timeouts"]: | |
| request_options["timeout"] = self._adjust_timeout(endpoint, config.timeout) | |
| request_options["retries"] = self._adjust_retries(endpoint, config.retries) | |
| # Override with explicit options | |
| if "timeout" in options: | |
| request_options["timeout"] = options["timeout"] | |
| if "retries" in options: | |
| request_options["retries"] = options["retries"] | |
| if "retry_delay" in options: | |
| request_options["retry_delay"] = options["retry_delay"] | |
| if "cache" in options: | |
| request_options["cache"] = options["cache"] | |
| if "cache_duration" in options: | |
| request_options["cache_duration"] = options["cache_duration"] | |
| if "circuit_breaker" in options: | |
| request_options["circuit_breaker"] = options["circuit_breaker"] | |
| if "critical" in options: | |
| request_options["critical"] = options["critical"] | |
| # Set priority based on endpoint configuration | |
| if config.priority == EndpointPriority.CRITICAL: | |
| request_options["priority"] = "high" | |
| elif config.priority == EndpointPriority.HIGH: | |
| request_options["priority"] = "high" | |
| elif config.priority == EndpointPriority.LOW: | |
| request_options["priority"] = "low" | |
| elif config.priority == EndpointPriority.BACKGROUND: | |
| request_options["priority"] = "low" | |
| # Set up fallback | |
| fallback_value = None | |
| if not options.get("no_fallback", False): | |
| fallback_strategy = options.get("fallback_strategy", config.fallback_strategy) | |
| # Map fallback strategy to APIUtils strategy | |
| if fallback_strategy == FallbackRegistryStrategy.STATIC: | |
| api_fallback_strategy = FallbackStrategy.STATIC | |
| elif fallback_strategy == FallbackRegistryStrategy.RANDOM: | |
| api_fallback_strategy = FallbackStrategy.RANDOM | |
| elif fallback_strategy == FallbackRegistryStrategy.WEIGHTED: | |
| api_fallback_strategy = FallbackStrategy.WEIGHTED | |
| else: | |
| api_fallback_strategy = FallbackStrategy.STATIC | |
| # Try to get fallback from registry | |
| fallback_value = self.fallback_registry.get_fallback( | |
| endpoint, | |
| strategy=fallback_strategy, | |
| context=context, | |
| args=[method, endpoint, data, request_options], | |
| kwargs={} | |
| ) | |
| if fallback_value is not None: | |
| request_options["fallback"] = fallback_value | |
| # Check if we should throttle this request | |
| if self._should_throttle_request(endpoint): | |
| if config.priority == EndpointPriority.BACKGROUND: | |
| # Drop background requests when throttling | |
| self.telemetry.debug( | |
| f"Dropping background request {method} {endpoint} due to resource constraints", | |
| category=EventCategory.API, | |
| context={"priority": config.priority.name} | |
| ) | |
| # Use fallback if available | |
| if fallback_value is not None: | |
| return fallback_value | |
| # Otherwise, raise throttled error | |
| raise MGError( | |
| message="Request throttled due to resource constraints", | |
| code=ErrorCode.THROTTLED, | |
| category=ErrorCategory.SYSTEM, | |
| severity=ErrorSeverity.WARNING | |
| ) | |
| self.telemetry.debug( | |
| f"Throttling request {method} {endpoint} due to resource constraints", | |
| category=EventCategory.API, | |
| context={"priority": config.priority.name} | |
| ) | |
| # Check circuit breaker | |
| circuit_breaker = self._get_circuit_breaker(endpoint) | |
| if circuit_breaker and not circuit_breaker.allow_request(): | |
| self.telemetry.warning( | |
| f"Circuit breaker open for {endpoint}", | |
| category=EventCategory.API, | |
| context={"endpoint": endpoint} | |
| ) | |
| # Use fallback if available | |
| if fallback_value is not None: | |
| return fallback_value | |
| # Otherwise, raise circuit open error | |
| raise MGError( | |
| message=f"Circuit breaker open for {endpoint}", | |
| code=ErrorCode.SERVICE_UNAVAILABLE, | |
| category=ErrorCategory.API, | |
| severity=ErrorSeverity.WARNING | |
| ) | |
| # Record start time | |
| start_time = time.time() | |
| try: | |
| # Acquire semaphore if limiting concurrent requests | |
| if self.config["max_concurrent_requests"] > 0: | |
| self.request_semaphore.acquire() | |
| # Make the request | |
| response = self.api_utils.request(method, endpoint, data, request_options) | |
| # Record success | |
| if circuit_breaker: | |
| circuit_breaker.record_success() | |
| duration = time.time() - start_time | |
| self._update_request_stats(endpoint, True, duration) | |
| return response | |
| except Exception as error: | |
| # Record failure | |
| if circuit_breaker: | |
| circuit_breaker.record_failure() | |
| duration = time.time() - start_time | |
| self._update_request_stats(endpoint, False, duration) | |
| # Log error with context | |
| self.telemetry.error( | |
| f"Error in API request {method} {endpoint}: {error}", | |
| category=EventCategory.API, | |
| context={ | |
| "method": method, | |
| "endpoint": endpoint, | |
| "duration": duration, | |
| "error": str(error) | |
| } | |
| ) | |
| raise error | |
| finally: | |
| # Release semaphore if acquired | |
| if self.config["max_concurrent_requests"] > 0: | |
| self.request_semaphore.release() | |
| def get( | |
| self, | |
| endpoint: str, | |
| data: Any = None, | |
| context: Optional[Dict[str, Any]] = None, | |
| options: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """ | |
| Make a GET request. | |
| Args: | |
| endpoint: API endpoint | |
| data: Query parameters | |
| context: Request context | |
| options: Request options | |
| Returns: | |
| Response data | |
| """ | |
| return self.request("GET", endpoint, data, context, options) | |
| def post( | |
| self, | |
| endpoint: str, | |
| data: Any = None, | |
| context: Optional[Dict[str, Any]] = None, | |
| options: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """ | |
| Make a POST request. | |
| Args: | |
| endpoint: API endpoint | |
| data: Request data | |
| context: Request context | |
| options: Request options | |
| Returns: | |
| Response data | |
| """ | |
| return self.request("POST", endpoint, data, context, options) | |
| def put( | |
| self, | |
| endpoint: str, | |
| data: Any = None, | |
| context: Optional[Dict[str, Any]] = None, | |
| options: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """ | |
| Make a PUT request. | |
| Args: | |
| endpoint: API endpoint | |
| data: Request data | |
| context: Request context | |
| options: Request options | |
| Returns: | |
| Response data | |
| """ | |
| return self.request("PUT", endpoint, data, context, options) | |
| def patch( | |
| self, | |
| endpoint: str, | |
| data: Any = None, | |
| context: Optional[Dict[str, Any]] = None, | |
| options: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """ | |
| Make a PATCH request. | |
| Args: | |
| endpoint: API endpoint | |
| data: Request data | |
| context: Request context | |
| options: Request options | |
| Returns: | |
| Response data | |
| """ | |
| return self.request("PATCH", endpoint, data, context, options) | |
| def delete( | |
| self, | |
| endpoint: str, | |
| context: Optional[Dict[str, Any]] = None, | |
| options: Optional[Dict[str, Any]] = None | |
| ) -> Any: | |
| """ | |
| Make a DELETE request. | |
| Args: | |
| endpoint: API endpoint | |
| context: Request context | |
| options: Request options | |
| Returns: | |
| Response data | |
| """ | |
| return self.request("DELETE", endpoint, None, context, options) | |
| def get_request_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Get request statistics. | |
| Args: | |
| endpoint: Optional endpoint to get stats for | |
| Returns: | |
| Request statistics | |
| """ | |
| with self.stats_lock: | |
| if endpoint: | |
| return self.request_stats.get(endpoint, {}).copy() | |
| else: | |
| return { | |
| endpoint: stats.copy() | |
| for endpoint, stats in self.request_stats.items() | |
| } | |
| def get_resource_usage(self) -> Dict[str, Any]: | |
| """ | |
| Get current resource usage. | |
| Returns: | |
| Resource usage statistics | |
| """ | |
| with self.resource_lock: | |
| return self.resource_usage.copy() | |
| def reset_circuit_breaker(self, endpoint: str) -> None: | |
| """ | |
| Reset circuit breaker for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| """ | |
| with self.config_lock: | |
| if endpoint in self.circuit_breakers: | |
| self.circuit_breakers[endpoint].reset() | |
| def reset_all_circuit_breakers(self) -> None: | |
| """Reset all circuit breakers.""" | |
| with self.config_lock: | |
| for circuit_breaker in self.circuit_breakers.values(): | |
| circuit_breaker.reset() | |
| def is_in_degraded_mode(self) -> bool: | |
| """ | |
| Check if the system is in degraded mode. | |
| Returns: | |
| Whether the system is in degraded mode | |
| """ | |
| return self.fallback_registry.is_degraded_mode() | |
| def register_fallback( | |
| self, | |
| endpoint: str, | |
| key: str, | |
| fallback: Union[Any, Callable], | |
| is_function: bool = False, | |
| tier: FallbackTier = FallbackTier.PRIMARY, | |
| context_rules: Optional[Dict[str, Any]] = None, | |
| tags: Optional[Dict[str, str]] = None | |
| ) -> None: | |
| """ | |
| Register a fallback for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| key: Fallback key | |
| fallback: Fallback value or function | |
| is_function: Whether the fallback is a function | |
| tier: Fallback tier | |
| context_rules: Rules for context-based selection | |
| tags: Tags for the fallback | |
| """ | |
| self.fallback_registry.register_fallback( | |
| endpoint=endpoint, | |
| key=key, | |
| fallback=fallback, | |
| is_function=is_function, | |
| tier=tier, | |
| context_rules=context_rules, | |
| tags=tags | |
| ) | |
| def shutdown(self) -> None: | |
| """Shutdown the adapter and save configurations.""" | |
| # Save endpoint configurations if enabled | |
| if self.config["save_to_storage"] and self.config["endpoint_config_path"]: | |
| self._save_endpoint_configs() | |
| # Shutdown fallback registry | |
| self.fallback_registry.shutdown() | |
| # Singleton instance | |
| _instance = None | |
| def get_context_aware_api(options: Dict[str, Any] = None) -> ContextAwareAPIAdapter: | |
| """ | |
| Get the global context-aware API adapter instance. | |
| Args: | |
| options: Configuration options | |
| Returns: | |
| ContextAwareAPIAdapter instance | |
| """ | |
| global _instance | |
| if _instance is None: | |
| _instance = ContextAwareAPIAdapter(options) | |
| return _instance |