MorphGuard / src /context_aware_api.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
37.1 kB
"""
Context-aware API adapter for MorphGuard.
This module provides a context-aware API adapter that enhances the existing API client with:
- Intelligent request routing based on context
- Dynamic endpoint selection
- Adaptive timeout and retry strategies
- Resource-aware request throttling
- Integrated circuit breaking and fallbacks
"""
import time
import threading
import logging
from enum import Enum
from typing import Dict, Any, List, Optional, Union, Callable, Tuple, TypeVar
# Import from existing modules
from src.telemetry import get_telemetry, EventCategory
from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory
from src.api_utils import get_api_utils, FallbackStrategy, CircuitBreaker
from src.api_fallback_registry import (
get_fallback_registry, FallbackTier, FallbackStrategy as FallbackRegistryStrategy
)
# Type definitions
T = TypeVar('T')
class EndpointPriority(Enum):
"""Priority levels for API endpoints."""
CRITICAL = 0 # Highest priority, must succeed
HIGH = 1 # High priority, should prioritize
NORMAL = 2 # Normal priority
LOW = 3 # Low priority, can be delayed
BACKGROUND = 4 # Lowest priority, can be dropped
class ResourceConsumption(Enum):
"""Resource consumption levels for requests."""
MINIMAL = 0 # Very light requests
LIGHT = 1 # Light requests
MODERATE = 2 # Moderate resource usage
HEAVY = 3 # Heavy resource usage
INTENSIVE = 4 # Very intensive resource usage
class EndpointConfiguration:
"""Configuration for an API endpoint."""
def __init__(
self,
priority: EndpointPriority = EndpointPriority.NORMAL,
resource_consumption: ResourceConsumption = ResourceConsumption.MODERATE,
timeout: float = 30.0,
retries: int = 3,
retry_delay: float = 1.0,
circuit_breaker_enabled: bool = True,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
cache_enabled: bool = True,
cache_duration: int = 300,
fallback_strategy: FallbackRegistryStrategy = FallbackRegistryStrategy.STATIC,
requires_authentication: bool = False,
allows_batching: bool = True,
idempotent: bool = False,
tags: Optional[Dict[str, str]] = None
):
"""
Initialize endpoint configuration.
Args:
priority: Priority level for the endpoint
resource_consumption: Resource consumption level
timeout: Request timeout in seconds
retries: Number of retries
retry_delay: Delay between retries in seconds
circuit_breaker_enabled: Whether to enable circuit breaker
failure_threshold: Number of failures before opening circuit
recovery_timeout: Time before trying to recover in seconds
cache_enabled: Whether to enable caching
cache_duration: Cache duration in seconds
fallback_strategy: Strategy for selecting fallbacks
requires_authentication: Whether authentication is required
allows_batching: Whether the endpoint can be batched
idempotent: Whether the endpoint is idempotent
tags: Tags for the endpoint
"""
self.priority = priority
self.resource_consumption = resource_consumption
self.timeout = timeout
self.retries = retries
self.retry_delay = retry_delay
self.circuit_breaker_enabled = circuit_breaker_enabled
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.cache_enabled = cache_enabled
self.cache_duration = cache_duration
self.fallback_strategy = fallback_strategy
self.requires_authentication = requires_authentication
self.allows_batching = allows_batching
self.idempotent = idempotent
self.tags = tags or {}
def to_dict(self) -> Dict[str, Any]:
"""
Convert to dictionary.
Returns:
Dictionary representation
"""
return {
"priority": self.priority.name,
"resource_consumption": self.resource_consumption.name,
"timeout": self.timeout,
"retries": self.retries,
"retry_delay": self.retry_delay,
"circuit_breaker_enabled": self.circuit_breaker_enabled,
"failure_threshold": self.failure_threshold,
"recovery_timeout": self.recovery_timeout,
"cache_enabled": self.cache_enabled,
"cache_duration": self.cache_duration,
"fallback_strategy": self.fallback_strategy.value,
"requires_authentication": self.requires_authentication,
"allows_batching": self.allows_batching,
"idempotent": self.idempotent,
"tags": self.tags
}
class ContextAwareAPIAdapter:
"""
Context-aware API adapter that enhances the existing API client.
This adapter provides:
- Intelligent request routing based on context
- Dynamic endpoint selection
- Adaptive timeout and retry strategies
- Resource-aware request throttling
- Integrated circuit breaking and fallbacks
"""
def __init__(self, options: Dict[str, Any] = None):
"""
Initialize the API adapter.
Args:
options: Configuration options
"""
# Default configuration
self.config = {
"default_timeout": 30.0,
"default_retries": 3,
"default_retry_delay": 1.0,
"default_priority": EndpointPriority.NORMAL,
"default_resource_consumption": ResourceConsumption.MODERATE,
"default_circuit_breaker_enabled": True,
"default_cache_enabled": True,
"default_cache_duration": 300,
"default_fallback_strategy": FallbackRegistryStrategy.STATIC,
"endpoint_config_path": None,
"load_from_storage": False,
"save_to_storage": False,
"resource_monitoring_enabled": True,
"resource_threshold_cpu": 80.0, # Percent
"resource_threshold_memory": 80.0, # Percent
"enable_telemetry": True,
"enable_adaptive_timeouts": True,
"adaptive_timeout_factor": 1.5,
"adaptive_retry_factor": 0.8,
"max_concurrent_requests": 100,
"request_throttling_enabled": True
}
# Update with user-provided options
if options:
self.config.update(options)
# Initialize telemetry
self.telemetry = get_telemetry()
# Get API utilities instance
self.api_utils = get_api_utils()
# Get fallback registry instance
self.fallback_registry = get_fallback_registry()
# Endpoint configurations
self.endpoint_configs: Dict[str, EndpointConfiguration] = {}
# Circuit breakers by endpoint
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
# Request stats by endpoint
self.request_stats: Dict[str, Dict[str, Any]] = {}
# Resource usage stats
self.resource_usage = {
"cpu": 0.0,
"memory": 0.0,
"last_updated": 0.0
}
# Locks
self.config_lock = threading.RLock()
self.stats_lock = threading.RLock()
self.resource_lock = threading.RLock()
# Semaphore for limiting concurrent requests
self.request_semaphore = threading.Semaphore(self.config["max_concurrent_requests"])
# Load endpoint configurations if enabled
if self.config["load_from_storage"] and self.config["endpoint_config_path"]:
self._load_endpoint_configs()
# Initialize resource monitoring if enabled
if self.config["resource_monitoring_enabled"]:
self._start_resource_monitoring()
def _start_resource_monitoring(self) -> None:
"""Initialize resource monitoring."""
try:
import psutil
# Initial resource check
self._update_resource_usage()
# Schedule periodic updates through telemetry's system metrics
# This leverages existing system metrics collection
except ImportError:
self.telemetry.warning(
"psutil not available, resource monitoring disabled",
category=EventCategory.API
)
self.config["resource_monitoring_enabled"] = False
def _update_resource_usage(self) -> None:
"""Update resource usage statistics."""
try:
import psutil
with self.resource_lock:
self.resource_usage["cpu"] = psutil.cpu_percent(interval=0.1)
self.resource_usage["memory"] = psutil.virtual_memory().percent
self.resource_usage["last_updated"] = time.time()
except Exception as e:
self.telemetry.error(
f"Error updating resource usage: {e}",
category=EventCategory.API,
exc_info=True
)
def _load_endpoint_configs(self) -> None:
"""Load endpoint configurations from storage."""
import os
import json
try:
if not os.path.exists(self.config["endpoint_config_path"]):
return
with open(self.config["endpoint_config_path"], "r") as f:
config_data = json.load(f)
with self.config_lock:
for endpoint, config in config_data.items():
self.endpoint_configs[endpoint] = EndpointConfiguration(
priority=EndpointPriority[config["priority"]],
resource_consumption=ResourceConsumption[config["resource_consumption"]],
timeout=config["timeout"],
retries=config["retries"],
retry_delay=config["retry_delay"],
circuit_breaker_enabled=config["circuit_breaker_enabled"],
failure_threshold=config["failure_threshold"],
recovery_timeout=config["recovery_timeout"],
cache_enabled=config["cache_enabled"],
cache_duration=config["cache_duration"],
fallback_strategy=FallbackRegistryStrategy(config["fallback_strategy"]),
requires_authentication=config["requires_authentication"],
allows_batching=config["allows_batching"],
idempotent=config["idempotent"],
tags=config["tags"]
)
self.telemetry.info(
"Loaded endpoint configurations from storage",
category=EventCategory.API,
context={"path": self.config["endpoint_config_path"]}
)
except Exception as e:
self.telemetry.error(
f"Failed to load endpoint configurations: {e}",
category=EventCategory.API,
exc_info=True
)
def _save_endpoint_configs(self) -> None:
"""Save endpoint configurations to storage."""
import os
import json
try:
if not self.config["endpoint_config_path"]:
return
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.config["endpoint_config_path"]), exist_ok=True)
config_data = {}
with self.config_lock:
for endpoint, config in self.endpoint_configs.items():
config_data[endpoint] = config.to_dict()
with open(self.config["endpoint_config_path"], "w") as f:
json.dump(config_data, f, indent=2)
self.telemetry.debug(
"Saved endpoint configurations to storage",
category=EventCategory.API,
context={"path": self.config["endpoint_config_path"]}
)
except Exception as e:
self.telemetry.error(
f"Failed to save endpoint configurations: {e}",
category=EventCategory.API,
exc_info=True
)
def set_endpoint_config(self, endpoint: str, config: EndpointConfiguration) -> None:
"""
Set configuration for an endpoint.
Args:
endpoint: API endpoint
config: Endpoint configuration
"""
with self.config_lock:
self.endpoint_configs[endpoint] = config
# Create circuit breaker if needed
if config.circuit_breaker_enabled and endpoint not in self.circuit_breakers:
self.circuit_breakers[endpoint] = CircuitBreaker(
failure_threshold=config.failure_threshold,
recovery_timeout=config.recovery_timeout
)
if self.config["save_to_storage"] and self.config["endpoint_config_path"]:
self._save_endpoint_configs()
def get_endpoint_config(self, endpoint: str) -> EndpointConfiguration:
"""
Get configuration for an endpoint.
Args:
endpoint: API endpoint
Returns:
Endpoint configuration
"""
with self.config_lock:
# Check for exact match
if endpoint in self.endpoint_configs:
return self.endpoint_configs[endpoint]
# Check for pattern matches
for pattern, config in self.endpoint_configs.items():
if "*" in pattern:
# Convert pattern to simpler matching logic
if pattern.startswith("*") and pattern.endswith("*"):
# *pattern* - contains
pattern_part = pattern.strip("*")
if pattern_part in endpoint:
return config
elif pattern.startswith("*"):
# *pattern - ends with
pattern_part = pattern[1:]
if endpoint.endswith(pattern_part):
return config
elif pattern.endswith("*"):
# pattern* - starts with
pattern_part = pattern[:-1]
if endpoint.startswith(pattern_part):
return config
# Return default configuration
return EndpointConfiguration(
priority=self.config["default_priority"],
resource_consumption=self.config["default_resource_consumption"],
timeout=self.config["default_timeout"],
retries=self.config["default_retries"],
retry_delay=self.config["default_retry_delay"],
circuit_breaker_enabled=self.config["default_circuit_breaker_enabled"],
cache_enabled=self.config["default_cache_enabled"],
cache_duration=self.config["default_cache_duration"],
fallback_strategy=self.config["default_fallback_strategy"]
)
def _get_circuit_breaker(self, endpoint: str) -> Optional[CircuitBreaker]:
"""
Get circuit breaker for an endpoint.
Args:
endpoint: API endpoint
Returns:
Circuit breaker or None if not enabled
"""
with self.config_lock:
# Check for exact match
if endpoint in self.circuit_breakers:
return self.circuit_breakers[endpoint]
# Get endpoint configuration
config = self.get_endpoint_config(endpoint)
if not config.circuit_breaker_enabled:
return None
# Create new circuit breaker
circuit_breaker = CircuitBreaker(
failure_threshold=config.failure_threshold,
recovery_timeout=config.recovery_timeout
)
self.circuit_breakers[endpoint] = circuit_breaker
return circuit_breaker
def _update_request_stats(self, endpoint: str, success: bool, duration: float) -> None:
"""
Update request statistics for an endpoint.
Args:
endpoint: API endpoint
success: Whether the request was successful
duration: Request duration in seconds
"""
with self.stats_lock:
if endpoint not in self.request_stats:
self.request_stats[endpoint] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_duration": 0.0,
"avg_duration": 0.0,
"last_request_time": 0.0,
"min_duration": float('inf'),
"max_duration": 0.0
}
stats = self.request_stats[endpoint]
stats["total_requests"] += 1
stats["last_request_time"] = time.time()
stats["total_duration"] += duration
stats["avg_duration"] = stats["total_duration"] / stats["total_requests"]
stats["min_duration"] = min(stats["min_duration"], duration)
stats["max_duration"] = max(stats["max_duration"], duration)
if success:
stats["successful_requests"] += 1
else:
stats["failed_requests"] += 1
def _should_throttle_request(self, endpoint: str) -> bool:
"""
Check if a request should be throttled based on resource usage.
Args:
endpoint: API endpoint
Returns:
Whether the request should be throttled
"""
if not self.config["request_throttling_enabled"]:
return False
# Get endpoint configuration
config = self.get_endpoint_config(endpoint)
# Don't throttle critical requests
if config.priority == EndpointPriority.CRITICAL:
return False
# Check resource usage
with self.resource_lock:
# If resource usage data is stale, update it
if time.time() - self.resource_usage["last_updated"] > 5.0:
self._update_resource_usage()
cpu_usage = self.resource_usage["cpu"]
memory_usage = self.resource_usage["memory"]
# Check if we're over thresholds
cpu_threshold = self.config["resource_threshold_cpu"]
memory_threshold = self.config["resource_threshold_memory"]
# Apply more aggressive throttling for resource-intensive requests
if config.resource_consumption == ResourceConsumption.INTENSIVE:
cpu_threshold *= 0.8
memory_threshold *= 0.8
elif config.resource_consumption == ResourceConsumption.HEAVY:
cpu_threshold *= 0.9
memory_threshold *= 0.9
# Only throttle low priority requests initially
if config.priority == EndpointPriority.LOW or config.priority == EndpointPriority.BACKGROUND:
if cpu_usage > cpu_threshold * 0.8 or memory_usage > memory_threshold * 0.8:
return True
# Throttle normal priority requests only when closer to thresholds
if config.priority == EndpointPriority.NORMAL:
if cpu_usage > cpu_threshold * 0.9 or memory_usage > memory_threshold * 0.9:
return True
# Throttle high priority requests only when at thresholds
if config.priority == EndpointPriority.HIGH:
if cpu_usage > cpu_threshold or memory_usage > memory_threshold:
return True
return False
def _adjust_timeout(self, endpoint: str, base_timeout: float) -> float:
"""
Adjust timeout based on request statistics.
Args:
endpoint: API endpoint
base_timeout: Base timeout in seconds
Returns:
Adjusted timeout in seconds
"""
if not self.config["enable_adaptive_timeouts"]:
return base_timeout
with self.stats_lock:
if endpoint not in self.request_stats:
return base_timeout
stats = self.request_stats[endpoint]
if stats["total_requests"] < 5:
return base_timeout
# Calculate timeout based on average duration plus buffer
avg_duration = stats["avg_duration"]
max_duration = stats["max_duration"]
# Use a higher factor of the average for more consistent endpoints
# and a lower factor of the max for less consistent endpoints
if stats["max_duration"] > stats["avg_duration"] * 3:
# High variance, use max as reference
adjusted_timeout = max_duration * 1.2
else:
# Low variance, use average as reference
adjusted_timeout = avg_duration * self.config["adaptive_timeout_factor"]
# Don't go below base timeout
return max(base_timeout, adjusted_timeout)
def _adjust_retries(self, endpoint: str, base_retries: int) -> int:
"""
Adjust retry count based on request statistics.
Args:
endpoint: API endpoint
base_retries: Base retry count
Returns:
Adjusted retry count
"""
if not self.config["enable_adaptive_timeouts"]:
return base_retries
with self.stats_lock:
if endpoint not in self.request_stats:
return base_retries
stats = self.request_stats[endpoint]
if stats["total_requests"] < 5:
return base_retries
# Calculate success rate
success_rate = stats["successful_requests"] / stats["total_requests"]
# Adjust retries based on success rate
if success_rate > 0.95:
# Very reliable, reduce retries
return max(1, int(base_retries * self.config["adaptive_retry_factor"]))
elif success_rate < 0.5:
# Unreliable, increase retries
return min(10, int(base_retries / self.config["adaptive_retry_factor"]))
return base_retries
def request(
self,
method: str,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a context-aware API request.
Args:
method: HTTP method
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
Raises:
MGError: If the request fails
"""
context = context or {}
options = options or {}
# Get endpoint configuration
config = self.get_endpoint_config(endpoint)
# Create request options based on configuration
request_options = {
"timeout": config.timeout,
"retries": config.retries,
"retry_delay": config.retry_delay,
"cache": config.cache_enabled,
"cache_duration": config.cache_duration,
"circuit_breaker": config.circuit_breaker_enabled,
"critical": config.priority == EndpointPriority.CRITICAL,
"headers": options.get("headers", {})
}
# Apply adaptive timeouts and retries
if self.config["enable_adaptive_timeouts"]:
request_options["timeout"] = self._adjust_timeout(endpoint, config.timeout)
request_options["retries"] = self._adjust_retries(endpoint, config.retries)
# Override with explicit options
if "timeout" in options:
request_options["timeout"] = options["timeout"]
if "retries" in options:
request_options["retries"] = options["retries"]
if "retry_delay" in options:
request_options["retry_delay"] = options["retry_delay"]
if "cache" in options:
request_options["cache"] = options["cache"]
if "cache_duration" in options:
request_options["cache_duration"] = options["cache_duration"]
if "circuit_breaker" in options:
request_options["circuit_breaker"] = options["circuit_breaker"]
if "critical" in options:
request_options["critical"] = options["critical"]
# Set priority based on endpoint configuration
if config.priority == EndpointPriority.CRITICAL:
request_options["priority"] = "high"
elif config.priority == EndpointPriority.HIGH:
request_options["priority"] = "high"
elif config.priority == EndpointPriority.LOW:
request_options["priority"] = "low"
elif config.priority == EndpointPriority.BACKGROUND:
request_options["priority"] = "low"
# Set up fallback
fallback_value = None
if not options.get("no_fallback", False):
fallback_strategy = options.get("fallback_strategy", config.fallback_strategy)
# Map fallback strategy to APIUtils strategy
if fallback_strategy == FallbackRegistryStrategy.STATIC:
api_fallback_strategy = FallbackStrategy.STATIC
elif fallback_strategy == FallbackRegistryStrategy.RANDOM:
api_fallback_strategy = FallbackStrategy.RANDOM
elif fallback_strategy == FallbackRegistryStrategy.WEIGHTED:
api_fallback_strategy = FallbackStrategy.WEIGHTED
else:
api_fallback_strategy = FallbackStrategy.STATIC
# Try to get fallback from registry
fallback_value = self.fallback_registry.get_fallback(
endpoint,
strategy=fallback_strategy,
context=context,
args=[method, endpoint, data, request_options],
kwargs={}
)
if fallback_value is not None:
request_options["fallback"] = fallback_value
# Check if we should throttle this request
if self._should_throttle_request(endpoint):
if config.priority == EndpointPriority.BACKGROUND:
# Drop background requests when throttling
self.telemetry.debug(
f"Dropping background request {method} {endpoint} due to resource constraints",
category=EventCategory.API,
context={"priority": config.priority.name}
)
# Use fallback if available
if fallback_value is not None:
return fallback_value
# Otherwise, raise throttled error
raise MGError(
message="Request throttled due to resource constraints",
code=ErrorCode.THROTTLED,
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.WARNING
)
self.telemetry.debug(
f"Throttling request {method} {endpoint} due to resource constraints",
category=EventCategory.API,
context={"priority": config.priority.name}
)
# Check circuit breaker
circuit_breaker = self._get_circuit_breaker(endpoint)
if circuit_breaker and not circuit_breaker.allow_request():
self.telemetry.warning(
f"Circuit breaker open for {endpoint}",
category=EventCategory.API,
context={"endpoint": endpoint}
)
# Use fallback if available
if fallback_value is not None:
return fallback_value
# Otherwise, raise circuit open error
raise MGError(
message=f"Circuit breaker open for {endpoint}",
code=ErrorCode.SERVICE_UNAVAILABLE,
category=ErrorCategory.API,
severity=ErrorSeverity.WARNING
)
# Record start time
start_time = time.time()
try:
# Acquire semaphore if limiting concurrent requests
if self.config["max_concurrent_requests"] > 0:
self.request_semaphore.acquire()
# Make the request
response = self.api_utils.request(method, endpoint, data, request_options)
# Record success
if circuit_breaker:
circuit_breaker.record_success()
duration = time.time() - start_time
self._update_request_stats(endpoint, True, duration)
return response
except Exception as error:
# Record failure
if circuit_breaker:
circuit_breaker.record_failure()
duration = time.time() - start_time
self._update_request_stats(endpoint, False, duration)
# Log error with context
self.telemetry.error(
f"Error in API request {method} {endpoint}: {error}",
category=EventCategory.API,
context={
"method": method,
"endpoint": endpoint,
"duration": duration,
"error": str(error)
}
)
raise error
finally:
# Release semaphore if acquired
if self.config["max_concurrent_requests"] > 0:
self.request_semaphore.release()
def get(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a GET request.
Args:
endpoint: API endpoint
data: Query parameters
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("GET", endpoint, data, context, options)
def post(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a POST request.
Args:
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("POST", endpoint, data, context, options)
def put(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a PUT request.
Args:
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("PUT", endpoint, data, context, options)
def patch(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a PATCH request.
Args:
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("PATCH", endpoint, data, context, options)
def delete(
self,
endpoint: str,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a DELETE request.
Args:
endpoint: API endpoint
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("DELETE", endpoint, None, context, options)
def get_request_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]:
"""
Get request statistics.
Args:
endpoint: Optional endpoint to get stats for
Returns:
Request statistics
"""
with self.stats_lock:
if endpoint:
return self.request_stats.get(endpoint, {}).copy()
else:
return {
endpoint: stats.copy()
for endpoint, stats in self.request_stats.items()
}
def get_resource_usage(self) -> Dict[str, Any]:
"""
Get current resource usage.
Returns:
Resource usage statistics
"""
with self.resource_lock:
return self.resource_usage.copy()
def reset_circuit_breaker(self, endpoint: str) -> None:
"""
Reset circuit breaker for an endpoint.
Args:
endpoint: API endpoint
"""
with self.config_lock:
if endpoint in self.circuit_breakers:
self.circuit_breakers[endpoint].reset()
def reset_all_circuit_breakers(self) -> None:
"""Reset all circuit breakers."""
with self.config_lock:
for circuit_breaker in self.circuit_breakers.values():
circuit_breaker.reset()
def is_in_degraded_mode(self) -> bool:
"""
Check if the system is in degraded mode.
Returns:
Whether the system is in degraded mode
"""
return self.fallback_registry.is_degraded_mode()
def register_fallback(
self,
endpoint: str,
key: str,
fallback: Union[Any, Callable],
is_function: bool = False,
tier: FallbackTier = FallbackTier.PRIMARY,
context_rules: Optional[Dict[str, Any]] = None,
tags: Optional[Dict[str, str]] = None
) -> None:
"""
Register a fallback for an endpoint.
Args:
endpoint: API endpoint
key: Fallback key
fallback: Fallback value or function
is_function: Whether the fallback is a function
tier: Fallback tier
context_rules: Rules for context-based selection
tags: Tags for the fallback
"""
self.fallback_registry.register_fallback(
endpoint=endpoint,
key=key,
fallback=fallback,
is_function=is_function,
tier=tier,
context_rules=context_rules,
tags=tags
)
def shutdown(self) -> None:
"""Shutdown the adapter and save configurations."""
# Save endpoint configurations if enabled
if self.config["save_to_storage"] and self.config["endpoint_config_path"]:
self._save_endpoint_configs()
# Shutdown fallback registry
self.fallback_registry.shutdown()
# Singleton instance
_instance = None
def get_context_aware_api(options: Dict[str, Any] = None) -> ContextAwareAPIAdapter:
"""
Get the global context-aware API adapter instance.
Args:
options: Configuration options
Returns:
ContextAwareAPIAdapter instance
"""
global _instance
if _instance is None:
_instance = ContextAwareAPIAdapter(options)
return _instance