MorphGuard / src /api_resilience.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
20.3 kB
"""
API resilience system for MorphGuard.
This module provides a comprehensive API resilience system with:
- Intelligent fallbacks
- Context-aware request handling
- Dynamic routing
- Health-aware service selection
- Degraded mode operations
- Resource-aware request throttling
Usage:
from src.api_resilience import get_resilient_api
# Get the resilient API client
api = get_resilient_api()
# Make requests with context
response = api.get("/users", context={"user_id": "123"})
"""
import os
import json
import logging
from typing import Dict, Any, Optional, List, Union, Callable
# Import modules
from src.telemetry import get_telemetry, EventCategory
from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory
from src.api_fallback_registry import (
get_fallback_registry, FallbackTier, FallbackStrategy, HealthStatus
)
from src.context_aware_api import (
get_context_aware_api, EndpointPriority, ResourceConsumption, EndpointConfiguration
)
from src.api_utils import get_api_utils
class ResilientAPIClient:
"""
Resilient API client for MorphGuard.
This client integrates:
- The core API utilities
- The fallback registry
- The context-aware API adapter
Providing a comprehensive resilient API client with:
- Intelligent fallbacks
- Context-aware request handling
- Dynamic routing
- Health-aware service selection
- Degraded mode operations
- Resource-aware request throttling
"""
def __init__(self, options: Dict[str, Any] = None):
"""
Initialize the resilient API client.
Args:
options: Configuration options
"""
# Default configuration
self.config = {
"base_url": "http://localhost:5000/api",
"cache_dir": ".mg_api_cache",
"fallbacks_dir": ".mg_fallbacks",
"timeout": 30.0,
"retries": 3,
"enable_telemetry": True,
"enable_fallbacks": True,
"enable_circuit_breaker": True,
"enable_context_aware": True,
"enable_resource_monitoring": True,
"endpoint_config_path": None,
"fallback_storage_path": None,
"static_fallbacks": {},
"register_default_fallbacks": True,
"auto_register_fallbacks": True
}
# Update with user-provided options
if options:
self.config.update(options)
# Create directories if needed
os.makedirs(self.config["cache_dir"], exist_ok=True)
os.makedirs(self.config["fallbacks_dir"], exist_ok=True)
# Initialize telemetry
self.telemetry = get_telemetry()
# Configure paths if not provided
if not self.config["endpoint_config_path"]:
self.config["endpoint_config_path"] = os.path.join(
self.config["fallbacks_dir"], "endpoint_configs.json"
)
if not self.config["fallback_storage_path"]:
self.config["fallback_storage_path"] = os.path.join(
self.config["fallbacks_dir"], "fallbacks.json"
)
# Get API utilities
api_utils_options = {
"base_url": self.config["base_url"],
"cache_dir": self.config["cache_dir"],
"timeout": self.config["timeout"],
"retries": self.config["retries"],
"circuit_breaker_enabled": self.config["enable_circuit_breaker"]
}
self.api_utils = get_api_utils(api_utils_options)
# Get fallback registry
fallback_registry_options = {
"fallback_storage_path": self.config["fallback_storage_path"],
"load_from_storage": True,
"save_to_storage": True,
"enable_telemetry": self.config["enable_telemetry"]
}
self.fallback_registry = get_fallback_registry(fallback_registry_options)
# Get context-aware API adapter
context_api_options = {
"endpoint_config_path": self.config["endpoint_config_path"],
"load_from_storage": True,
"save_to_storage": True,
"resource_monitoring_enabled": self.config["enable_resource_monitoring"],
"enable_telemetry": self.config["enable_telemetry"]
}
self.context_api = get_context_aware_api(context_api_options)
# Register default fallbacks if enabled
if self.config["register_default_fallbacks"]:
self._register_default_fallbacks()
# Register static fallbacks from config
if self.config["static_fallbacks"]:
self._register_static_fallbacks()
# Log initialization
self.telemetry.info(
"Resilient API client initialized",
category=EventCategory.API,
context={
"base_url": self.config["base_url"],
"fallbacks_enabled": self.config["enable_fallbacks"],
"circuit_breaker_enabled": self.config["enable_circuit_breaker"],
"context_aware_enabled": self.config["enable_context_aware"]
}
)
def _register_default_fallbacks(self):
"""Register default fallbacks for common endpoints."""
# User profile fallback
self.fallback_registry.register_fallback(
endpoint="/users/profile",
key="default_profile",
fallback={"id": None, "name": "Guest", "email": None, "role": "guest"},
tier=FallbackTier.PRIMARY,
tags={"type": "static", "category": "user"}
)
# User settings fallback
self.fallback_registry.register_fallback(
endpoint="/users/settings",
key="default_settings",
fallback={"theme": "light", "notifications": True, "language": "en"},
tier=FallbackTier.PRIMARY,
tags={"type": "static", "category": "user"}
)
# Stats fallback
self.fallback_registry.register_fallback(
endpoint="/stats/*",
key="empty_stats",
fallback={},
tier=FallbackTier.PRIMARY,
tags={"type": "static", "category": "stats"}
)
# Configuration fallback for empty response
self.fallback_registry.register_fallback(
endpoint="/config/*",
key="empty_config",
fallback={},
tier=FallbackTier.PRIMARY,
tags={"type": "static", "category": "config"}
)
# Function to generate fallback data
def generate_empty_list():
return []
# Register dynamic fallbacks
self.fallback_registry.register_fallback(
endpoint="/items/*",
key="empty_items",
fallback=generate_empty_list,
is_function=True,
tier=FallbackTier.PRIMARY,
tags={"type": "dynamic", "category": "items"}
)
def _register_static_fallbacks(self):
"""Register static fallbacks from configuration."""
for endpoint, fallbacks in self.config["static_fallbacks"].items():
for key, fallback_data in fallbacks.items():
value = fallback_data.get("value")
is_function = fallback_data.get("is_function", False)
tier_name = fallback_data.get("tier", "PRIMARY")
tier = getattr(FallbackTier, tier_name, FallbackTier.PRIMARY)
context_rules = fallback_data.get("context_rules")
tags = fallback_data.get("tags")
self.fallback_registry.register_fallback(
endpoint=endpoint,
key=key,
fallback=value,
is_function=is_function,
tier=tier,
context_rules=context_rules,
tags=tags
)
def _should_use_context_api(self, options: Dict[str, Any]) -> bool:
"""
Check if the context-aware API should be used.
Args:
options: Request options
Returns:
Whether to use the context-aware API
"""
# Check if explicitly disabled in options
if "use_context_api" in options:
return options["use_context_api"]
# Check if globally enabled
return self.config["enable_context_aware"]
def request(
self,
method: str,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a resilient API request.
Args:
method: HTTP method
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
Raises:
MGError: If the request fails
"""
options = options or {}
try:
# Use context-aware API if enabled
if self._should_use_context_api(options):
return self.context_api.request(method, endpoint, data, context, options)
else:
# Otherwise, use base API utilities
return self.api_utils.request(method, endpoint, data, options)
except Exception as error:
# Try to use fallback if enabled
if self.config["enable_fallbacks"] and not options.get("no_fallback", False):
fallback = self.fallback_registry.get_fallback(
endpoint,
context=context,
args=[method, endpoint, data, options],
kwargs={}
)
if fallback is not None:
self.telemetry.info(
f"Using fallback for {method} {endpoint}",
category=EventCategory.API,
context={"method": method, "endpoint": endpoint}
)
return fallback
# Re-raise the error
raise error
def get(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a GET request.
Args:
endpoint: API endpoint
data: Query parameters
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("GET", endpoint, data, context, options)
def post(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a POST request.
Args:
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("POST", endpoint, data, context, options)
def put(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a PUT request.
Args:
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("PUT", endpoint, data, context, options)
def patch(
self,
endpoint: str,
data: Any = None,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a PATCH request.
Args:
endpoint: API endpoint
data: Request data
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("PATCH", endpoint, data, context, options)
def delete(
self,
endpoint: str,
context: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None
) -> Any:
"""
Make a DELETE request.
Args:
endpoint: API endpoint
context: Request context
options: Request options
Returns:
Response data
"""
return self.request("DELETE", endpoint, None, context, options)
def register_fallback(
self,
endpoint: str,
key: str,
fallback: Union[Any, Callable],
is_function: bool = False,
tier: FallbackTier = FallbackTier.PRIMARY,
context_rules: Optional[Dict[str, Any]] = None,
tags: Optional[Dict[str, str]] = None
) -> None:
"""
Register a fallback for an endpoint.
Args:
endpoint: API endpoint
key: Fallback key
fallback: Fallback value or function
is_function: Whether the fallback is a function
tier: Fallback tier
context_rules: Rules for context-based selection
tags: Tags for the fallback
"""
self.fallback_registry.register_fallback(
endpoint=endpoint,
key=key,
fallback=fallback,
is_function=is_function,
tier=tier,
context_rules=context_rules,
tags=tags
)
def configure_endpoint(
self,
endpoint: str,
priority: Optional[EndpointPriority] = None,
resource_consumption: Optional[ResourceConsumption] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
retry_delay: Optional[float] = None,
circuit_breaker_enabled: Optional[bool] = None,
failure_threshold: Optional[int] = None,
recovery_timeout: Optional[float] = None,
cache_enabled: Optional[bool] = None,
cache_duration: Optional[int] = None,
fallback_strategy: Optional[FallbackStrategy] = None,
requires_authentication: Optional[bool] = None,
allows_batching: Optional[bool] = None,
idempotent: Optional[bool] = None,
tags: Optional[Dict[str, str]] = None
) -> None:
"""
Configure an endpoint.
Args:
endpoint: API endpoint
priority: Priority level for the endpoint
resource_consumption: Resource consumption level
timeout: Request timeout in seconds
retries: Number of retries
retry_delay: Delay between retries in seconds
circuit_breaker_enabled: Whether to enable circuit breaker
failure_threshold: Number of failures before opening circuit
recovery_timeout: Time before trying to recover in seconds
cache_enabled: Whether to enable caching
cache_duration: Cache duration in seconds
fallback_strategy: Strategy for selecting fallbacks
requires_authentication: Whether authentication is required
allows_batching: Whether the endpoint can be batched
idempotent: Whether the endpoint is idempotent
tags: Tags for the endpoint
"""
# Get existing configuration if any
config = self.context_api.get_endpoint_config(endpoint)
# Update with provided values
if priority is not None:
config.priority = priority
if resource_consumption is not None:
config.resource_consumption = resource_consumption
if timeout is not None:
config.timeout = timeout
if retries is not None:
config.retries = retries
if retry_delay is not None:
config.retry_delay = retry_delay
if circuit_breaker_enabled is not None:
config.circuit_breaker_enabled = circuit_breaker_enabled
if failure_threshold is not None:
config.failure_threshold = failure_threshold
if recovery_timeout is not None:
config.recovery_timeout = recovery_timeout
if cache_enabled is not None:
config.cache_enabled = cache_enabled
if cache_duration is not None:
config.cache_duration = cache_duration
if fallback_strategy is not None:
config.fallback_strategy = fallback_strategy
if requires_authentication is not None:
config.requires_authentication = requires_authentication
if allows_batching is not None:
config.allows_batching = allows_batching
if idempotent is not None:
config.idempotent = idempotent
if tags is not None:
config.tags.update(tags)
# Set updated configuration
self.context_api.set_endpoint_config(endpoint, config)
def get_endpoint_configuration(self, endpoint: str) -> Dict[str, Any]:
"""
Get configuration for an endpoint.
Args:
endpoint: API endpoint
Returns:
Endpoint configuration as dictionary
"""
config = self.context_api.get_endpoint_config(endpoint)
return config.to_dict()
def reset_circuit_breaker(self, endpoint: str) -> None:
"""
Reset circuit breaker for an endpoint.
Args:
endpoint: API endpoint
"""
self.context_api.reset_circuit_breaker(endpoint)
def reset_all_circuit_breakers(self) -> None:
"""Reset all circuit breakers."""
self.context_api.reset_all_circuit_breakers()
def clear_cache(self, endpoint: Optional[str] = None) -> None:
"""
Clear the cache.
Args:
endpoint: Optional endpoint to clear (clears all if None)
"""
self.api_utils.clear_cache(endpoint)
def is_in_degraded_mode(self) -> bool:
"""
Check if the system is in degraded mode.
Returns:
Whether the system is in degraded mode
"""
return self.context_api.is_in_degraded_mode()
def get_request_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]:
"""
Get request statistics.
Args:
endpoint: Optional endpoint to get stats for
Returns:
Request statistics
"""
return self.context_api.get_request_stats(endpoint)
def get_fallbacks(self, endpoint: Optional[str] = None) -> Dict[str, Any]:
"""
Get registered fallbacks.
Args:
endpoint: Optional endpoint to get fallbacks for
Returns:
Fallback information
"""
if endpoint:
return self.fallback_registry.get_fallbacks_for_endpoint(endpoint)
else:
return self.fallback_registry.get_all_fallbacks()
def shutdown(self) -> None:
"""Shutdown the client and save state."""
# Shutdown context-aware API
self.context_api.shutdown()
# Shutdown fallback registry
self.fallback_registry.shutdown()
# Singleton instance
_instance = None
def get_resilient_api(options: Dict[str, Any] = None) -> ResilientAPIClient:
"""
Get the global resilient API client instance.
Args:
options: Configuration options
Returns:
ResilientAPIClient instance
"""
global _instance
if _instance is None:
_instance = ResilientAPIClient(options)
return _instance