Spaces:
Running
Running
| """ | |
| Enhanced API fallback registry for MorphGuard. | |
| This module provides a centralized registry for API fallbacks with: | |
| - Context-aware fallback selection | |
| - Tiered fallback strategies | |
| - Dynamic fallback generation | |
| - Health-aware routing | |
| - Degraded mode operations | |
| """ | |
| import os | |
| import json | |
| import time | |
| import logging | |
| import threading | |
| import random | |
| from enum import Enum | |
| from typing import Dict, Any, List, Optional, Union, Callable, Tuple, TypeVar | |
| # Import error handling for proper error classification | |
| from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory | |
| from src.telemetry import get_telemetry, EventCategory | |
| # Type definitions | |
| T = TypeVar('T') | |
| FallbackFunc = Callable[..., T] | |
| FallbackValue = Any | |
| class FallbackTier(Enum): | |
| """Tiers of fallbacks, ordered by preference.""" | |
| PRIMARY = 0 # First choice fallback | |
| SECONDARY = 1 # Second choice fallback | |
| TERTIARY = 2 # Third choice fallback | |
| EMERGENCY = 3 # Last resort fallback | |
| class FallbackStrategy(Enum): | |
| """Strategies for selecting fallbacks.""" | |
| STATIC = "static" # Always use the same fallback | |
| RANDOM = "random" # Randomly select from available fallbacks | |
| WEIGHTED = "weighted" # Select based on weights | |
| LEAST_RECENTLY_USED = "lru" # Select least recently used fallback | |
| ROUND_ROBIN = "round_robin" # Cycle through fallbacks | |
| CONTEXT_BASED = "context" # Select based on request context | |
| HEALTH_BASED = "health" # Select based on health checks | |
| class HealthStatus(Enum): | |
| """Health status of services or endpoints.""" | |
| HEALTHY = "healthy" | |
| DEGRADED = "degraded" | |
| UNHEALTHY = "unhealthy" | |
| UNKNOWN = "unknown" | |
| class FallbackMetadata: | |
| """Metadata for a fallback entry.""" | |
| def __init__( | |
| self, | |
| tier: FallbackTier = FallbackTier.PRIMARY, | |
| weight: float = 1.0, | |
| context_rules: Optional[Dict[str, Any]] = None, | |
| last_used: float = 0, | |
| use_count: int = 0, | |
| success_count: int = 0, | |
| failure_count: int = 0, | |
| health_status: HealthStatus = HealthStatus.UNKNOWN, | |
| last_health_check: float = 0, | |
| tags: Optional[Dict[str, str]] = None | |
| ): | |
| """ | |
| Initialize fallback metadata. | |
| Args: | |
| tier: Fallback tier | |
| weight: Weight for weighted selection (higher = more likely to be selected) | |
| context_rules: Rules for context-based selection | |
| last_used: Timestamp of last use | |
| use_count: Number of times used | |
| success_count: Number of successful uses | |
| failure_count: Number of failed uses | |
| health_status: Current health status | |
| last_health_check: Timestamp of last health check | |
| tags: Tags for categorizing fallbacks | |
| """ | |
| self.tier = tier | |
| self.weight = weight | |
| self.context_rules = context_rules or {} | |
| self.last_used = last_used | |
| self.use_count = use_count | |
| self.success_count = success_count | |
| self.failure_count = failure_count | |
| self.health_status = health_status | |
| self.last_health_check = last_health_check | |
| self.tags = tags or {} | |
| def update_usage(self, success: bool = True) -> None: | |
| """ | |
| Update usage statistics. | |
| Args: | |
| success: Whether the fallback was successful | |
| """ | |
| self.last_used = time.time() | |
| self.use_count += 1 | |
| if success: | |
| self.success_count += 1 | |
| else: | |
| self.failure_count += 1 | |
| def update_health(self, status: HealthStatus) -> None: | |
| """ | |
| Update health status. | |
| Args: | |
| status: New health status | |
| """ | |
| self.health_status = status | |
| self.last_health_check = time.time() | |
| def success_rate(self) -> float: | |
| """ | |
| Calculate success rate. | |
| Returns: | |
| Success rate (0-1) or 0 if never used | |
| """ | |
| if self.use_count == 0: | |
| return 0 | |
| return self.success_count / self.use_count | |
| def to_dict(self) -> Dict[str, Any]: | |
| """ | |
| Convert to dictionary. | |
| Returns: | |
| Dictionary representation | |
| """ | |
| return { | |
| "tier": self.tier.name, | |
| "weight": self.weight, | |
| "context_rules": self.context_rules, | |
| "last_used": self.last_used, | |
| "use_count": self.use_count, | |
| "success_count": self.success_count, | |
| "failure_count": self.failure_count, | |
| "health_status": self.health_status.value, | |
| "last_health_check": self.last_health_check, | |
| "tags": self.tags, | |
| "success_rate": self.success_rate() | |
| } | |
| class FallbackEntry: | |
| """A fallback entry in the registry.""" | |
| def __init__( | |
| self, | |
| key: str, | |
| value: Union[Any, FallbackFunc], | |
| is_function: bool = False, | |
| metadata: Optional[FallbackMetadata] = None | |
| ): | |
| """ | |
| Initialize a fallback entry. | |
| Args: | |
| key: Unique identifier for the fallback | |
| value: Fallback value or function | |
| is_function: Whether the value is a function | |
| metadata: Fallback metadata | |
| """ | |
| self.key = key | |
| self.value = value | |
| self.is_function = is_function | |
| self.metadata = metadata or FallbackMetadata() | |
| def get_value(self, *args, **kwargs) -> Any: | |
| """ | |
| Get the fallback value. | |
| Args: | |
| *args: Arguments to pass to the function | |
| **kwargs: Keyword arguments to pass to the function | |
| Returns: | |
| Fallback value | |
| """ | |
| try: | |
| if self.is_function: | |
| result = self.value(*args, **kwargs) | |
| self.metadata.update_usage(success=True) | |
| return result | |
| else: | |
| self.metadata.update_usage(success=True) | |
| return self.value | |
| except Exception as e: | |
| self.metadata.update_usage(success=False) | |
| raise e | |
| def to_dict(self) -> Dict[str, Any]: | |
| """ | |
| Convert to dictionary. | |
| Returns: | |
| Dictionary representation | |
| """ | |
| return { | |
| "key": self.key, | |
| "is_function": self.is_function, | |
| "metadata": self.metadata.to_dict(), | |
| "value_type": type(self.value).__name__ if not self.is_function else "function" | |
| } | |
| class APIFallbackRegistry: | |
| """ | |
| Registry for API fallbacks with intelligent selection. | |
| This registry provides: | |
| - Endpoint-specific fallbacks | |
| - Tiered fallback selection | |
| - Multiple selection strategies | |
| - Health-aware routing | |
| - Degraded mode operations | |
| """ | |
| def __init__(self, options: Dict[str, Any] = None): | |
| """ | |
| Initialize the fallback registry. | |
| Args: | |
| options: Configuration options | |
| """ | |
| # Default configuration | |
| self.config = { | |
| "default_strategy": FallbackStrategy.STATIC, | |
| "health_check_interval": 60, # seconds | |
| "auto_health_checks": True, | |
| "fallback_storage_path": None, | |
| "load_from_storage": False, | |
| "save_to_storage": False, | |
| "context_rules_path": None, | |
| "degraded_mode_threshold": 0.5, # 50% failure rate | |
| "enable_telemetry": True | |
| } | |
| # Update with user-provided options | |
| if options: | |
| self.config.update(options) | |
| # Initialize telemetry | |
| self.telemetry = get_telemetry() | |
| # Fallback entries by endpoint | |
| self.fallbacks: Dict[str, List[FallbackEntry]] = {} | |
| # Current selection indexes for round-robin | |
| self.round_robin_indexes: Dict[str, int] = {} | |
| # System health status | |
| self.system_health = HealthStatus.HEALTHY | |
| # Health check thread | |
| self.health_check_thread = None | |
| # Degraded mode flag | |
| self.degraded_mode = False | |
| # Locks | |
| self.fallbacks_lock = threading.RLock() | |
| self.health_lock = threading.RLock() | |
| # Load fallbacks from storage if enabled | |
| if self.config["load_from_storage"] and self.config["fallback_storage_path"]: | |
| self._load_from_storage() | |
| # Start health check thread if enabled | |
| if self.config["auto_health_checks"]: | |
| self._start_health_check_thread() | |
| def _start_health_check_thread(self) -> None: | |
| """Start the background health check thread.""" | |
| self.health_check_thread = threading.Thread( | |
| target=self._health_check_worker, | |
| daemon=True, | |
| name="fallback_health_checker" | |
| ) | |
| self.health_check_thread.start() | |
| def _health_check_worker(self) -> None: | |
| """Background worker to check health of fallbacks.""" | |
| while True: | |
| try: | |
| # Wait for the check interval | |
| time.sleep(self.config["health_check_interval"]) | |
| # Run health checks | |
| self._check_all_health() | |
| # Update degraded mode flag | |
| self._update_degraded_mode() | |
| # Save to storage if enabled | |
| if self.config["save_to_storage"] and self.config["fallback_storage_path"]: | |
| self._save_to_storage() | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Error in fallback health check worker: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| def _check_all_health(self) -> None: | |
| """Check health of all fallbacks.""" | |
| with self.fallbacks_lock: | |
| for endpoint, entries in self.fallbacks.items(): | |
| for entry in entries: | |
| if entry.is_function: | |
| # For function fallbacks, check if they're callable | |
| try: | |
| # Simple health check - just make sure the function is callable | |
| if callable(entry.value): | |
| entry.metadata.update_health(HealthStatus.HEALTHY) | |
| else: | |
| entry.metadata.update_health(HealthStatus.UNHEALTHY) | |
| except Exception: | |
| entry.metadata.update_health(HealthStatus.UNHEALTHY) | |
| def _update_degraded_mode(self) -> None: | |
| """Update the degraded mode flag based on health checks.""" | |
| with self.health_lock: | |
| # Count unhealthy endpoints | |
| total_endpoints = 0 | |
| unhealthy_endpoints = 0 | |
| with self.fallbacks_lock: | |
| for endpoint, entries in self.fallbacks.items(): | |
| total_endpoints += 1 | |
| # Check if all fallbacks for this endpoint are unhealthy | |
| all_unhealthy = all( | |
| entry.metadata.health_status == HealthStatus.UNHEALTHY | |
| for entry in entries | |
| ) | |
| if all_unhealthy: | |
| unhealthy_endpoints += 1 | |
| # Calculate percentage of unhealthy endpoints | |
| if total_endpoints > 0: | |
| unhealthy_ratio = unhealthy_endpoints / total_endpoints | |
| # Update degraded mode flag | |
| old_degraded_mode = self.degraded_mode | |
| self.degraded_mode = unhealthy_ratio >= self.config["degraded_mode_threshold"] | |
| # Log degraded mode changes | |
| if self.degraded_mode != old_degraded_mode: | |
| if self.degraded_mode: | |
| self.telemetry.warning( | |
| "System entered degraded mode", | |
| category=EventCategory.API, | |
| context={ | |
| "unhealthy_ratio": unhealthy_ratio, | |
| "threshold": self.config["degraded_mode_threshold"] | |
| } | |
| ) | |
| else: | |
| self.telemetry.info( | |
| "System exited degraded mode", | |
| category=EventCategory.API, | |
| context={ | |
| "unhealthy_ratio": unhealthy_ratio, | |
| "threshold": self.config["degraded_mode_threshold"] | |
| } | |
| ) | |
| def _load_from_storage(self) -> None: | |
| """Load fallbacks from storage.""" | |
| try: | |
| if not os.path.exists(self.config["fallback_storage_path"]): | |
| return | |
| with open(self.config["fallback_storage_path"], "r") as f: | |
| data = json.load(f) | |
| with self.fallbacks_lock: | |
| for endpoint, entries in data.items(): | |
| self.fallbacks[endpoint] = [] | |
| for entry_data in entries: | |
| # Function fallbacks can't be serialized, so only load static values | |
| if not entry_data.get("is_function", False): | |
| metadata = FallbackMetadata( | |
| tier=FallbackTier[entry_data["metadata"]["tier"]], | |
| weight=entry_data["metadata"]["weight"], | |
| context_rules=entry_data["metadata"]["context_rules"], | |
| last_used=entry_data["metadata"]["last_used"], | |
| use_count=entry_data["metadata"]["use_count"], | |
| success_count=entry_data["metadata"]["success_count"], | |
| failure_count=entry_data["metadata"]["failure_count"], | |
| health_status=HealthStatus(entry_data["metadata"]["health_status"]), | |
| last_health_check=entry_data["metadata"]["last_health_check"], | |
| tags=entry_data["metadata"]["tags"] | |
| ) | |
| entry = FallbackEntry( | |
| key=entry_data["key"], | |
| value=entry_data["value"], | |
| is_function=False, | |
| metadata=metadata | |
| ) | |
| self.fallbacks[endpoint].append(entry) | |
| self.telemetry.info( | |
| "Loaded fallbacks from storage", | |
| category=EventCategory.API, | |
| context={"path": self.config["fallback_storage_path"]} | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to load fallbacks from storage: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| def _save_to_storage(self) -> None: | |
| """Save fallbacks to storage.""" | |
| try: | |
| if not self.config["fallback_storage_path"]: | |
| return | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(self.config["fallback_storage_path"]), exist_ok=True) | |
| data = {} | |
| with self.fallbacks_lock: | |
| for endpoint, entries in self.fallbacks.items(): | |
| data[endpoint] = [] | |
| for entry in entries: | |
| # Skip function fallbacks as they can't be serialized | |
| if entry.is_function: | |
| continue | |
| entry_data = entry.to_dict() | |
| entry_data["value"] = entry.value # Add actual value | |
| data[endpoint].append(entry_data) | |
| with open(self.config["fallback_storage_path"], "w") as f: | |
| json.dump(data, f, indent=2) | |
| self.telemetry.debug( | |
| "Saved fallbacks to storage", | |
| category=EventCategory.API, | |
| context={"path": self.config["fallback_storage_path"]} | |
| ) | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Failed to save fallbacks to storage: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| def register_fallback( | |
| self, | |
| endpoint: str, | |
| key: str, | |
| fallback: Union[Any, FallbackFunc], | |
| is_function: bool = False, | |
| tier: FallbackTier = FallbackTier.PRIMARY, | |
| weight: float = 1.0, | |
| context_rules: Optional[Dict[str, Any]] = None, | |
| tags: Optional[Dict[str, str]] = None | |
| ) -> None: | |
| """ | |
| Register a fallback for an endpoint. | |
| Args: | |
| endpoint: API endpoint pattern (can use wildcards like * for pattern matching) | |
| key: Unique identifier for the fallback | |
| fallback: Fallback value or function | |
| is_function: Whether the fallback is a function | |
| tier: Fallback tier | |
| weight: Weight for weighted selection | |
| context_rules: Rules for context-based selection | |
| tags: Tags for categorizing fallbacks | |
| """ | |
| with self.fallbacks_lock: | |
| # Create entry list if it doesn't exist | |
| if endpoint not in self.fallbacks: | |
| self.fallbacks[endpoint] = [] | |
| # Check if entry with same key already exists | |
| for i, entry in enumerate(self.fallbacks[endpoint]): | |
| if entry.key == key: | |
| # Replace existing entry | |
| metadata = FallbackMetadata( | |
| tier=tier, | |
| weight=weight, | |
| context_rules=context_rules, | |
| last_used=entry.metadata.last_used, | |
| use_count=entry.metadata.use_count, | |
| success_count=entry.metadata.success_count, | |
| failure_count=entry.metadata.failure_count, | |
| health_status=entry.metadata.health_status, | |
| last_health_check=entry.metadata.last_health_check, | |
| tags=tags or entry.metadata.tags | |
| ) | |
| self.fallbacks[endpoint][i] = FallbackEntry( | |
| key=key, | |
| value=fallback, | |
| is_function=is_function, | |
| metadata=metadata | |
| ) | |
| self.telemetry.debug( | |
| f"Updated fallback {key} for endpoint {endpoint}", | |
| category=EventCategory.API | |
| ) | |
| return | |
| # Add new entry | |
| metadata = FallbackMetadata( | |
| tier=tier, | |
| weight=weight, | |
| context_rules=context_rules, | |
| tags=tags | |
| ) | |
| self.fallbacks[endpoint].append(FallbackEntry( | |
| key=key, | |
| value=fallback, | |
| is_function=is_function, | |
| metadata=metadata | |
| )) | |
| self.telemetry.debug( | |
| f"Registered fallback {key} for endpoint {endpoint}", | |
| category=EventCategory.API | |
| ) | |
| def unregister_fallback(self, endpoint: str, key: str) -> bool: | |
| """ | |
| Unregister a fallback. | |
| Args: | |
| endpoint: API endpoint | |
| key: Fallback key | |
| Returns: | |
| Whether the fallback was found and unregistered | |
| """ | |
| with self.fallbacks_lock: | |
| if endpoint not in self.fallbacks: | |
| return False | |
| # Find entry with matching key | |
| for i, entry in enumerate(self.fallbacks[endpoint]): | |
| if entry.key == key: | |
| # Remove entry | |
| self.fallbacks[endpoint].pop(i) | |
| # Remove endpoint if empty | |
| if not self.fallbacks[endpoint]: | |
| del self.fallbacks[endpoint] | |
| self.telemetry.debug( | |
| f"Unregistered fallback {key} for endpoint {endpoint}", | |
| category=EventCategory.API | |
| ) | |
| return True | |
| return False | |
| def get_fallback( | |
| self, | |
| endpoint: str, | |
| strategy: Optional[FallbackStrategy] = None, | |
| context: Optional[Dict[str, Any]] = None, | |
| args: Optional[List[Any]] = None, | |
| kwargs: Optional[Dict[str, Any]] = None | |
| ) -> Optional[Any]: | |
| """ | |
| Get a fallback value for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| strategy: Selection strategy | |
| context: Request context for context-based selection | |
| args: Arguments to pass to function fallbacks | |
| kwargs: Keyword arguments to pass to function fallbacks | |
| Returns: | |
| Fallback value or None if no fallback is available | |
| """ | |
| with self.fallbacks_lock: | |
| # Find matching endpoint (support wildcard patterns) | |
| matching_endpoints = [] | |
| for pattern in self.fallbacks.keys(): | |
| # Convert pattern to regex | |
| regex_pattern = pattern.replace("*", ".*") | |
| # Check if endpoint matches pattern | |
| if endpoint == pattern or ( | |
| "*" in pattern and | |
| (endpoint.startswith(pattern.replace("*", "")) or | |
| endpoint.endswith(pattern.replace("*", "")) or | |
| endpoint.replace("/", "") == pattern.replace("*/", "").replace("/*", "").replace("*", "")) | |
| ): | |
| matching_endpoints.append(pattern) | |
| if not matching_endpoints: | |
| self.telemetry.debug( | |
| f"No fallbacks found for endpoint {endpoint}", | |
| category=EventCategory.API | |
| ) | |
| return None | |
| # Get entries for all matching endpoints | |
| all_entries = [] | |
| for pattern in matching_endpoints: | |
| all_entries.extend(self.fallbacks[pattern]) | |
| if not all_entries: | |
| return None | |
| # Use provided strategy or default | |
| if strategy is None: | |
| strategy = FallbackStrategy(self.config["default_strategy"]) | |
| # Select fallback using the specified strategy | |
| selected_entry = self._select_fallback( | |
| endpoint, all_entries, strategy, context | |
| ) | |
| if selected_entry is None: | |
| return None | |
| # Get the fallback value | |
| args = args or [] | |
| kwargs = kwargs or {} | |
| try: | |
| result = selected_entry.get_value(*args, **kwargs) | |
| self.telemetry.debug( | |
| f"Used fallback {selected_entry.key} for endpoint {endpoint}", | |
| category=EventCategory.API, | |
| context={"strategy": strategy.value} | |
| ) | |
| return result | |
| except Exception as e: | |
| self.telemetry.error( | |
| f"Error using fallback {selected_entry.key} for endpoint {endpoint}: {e}", | |
| category=EventCategory.API, | |
| exc_info=True | |
| ) | |
| # Try another fallback | |
| remaining_entries = [ | |
| entry for entry in all_entries | |
| if entry.key != selected_entry.key | |
| ] | |
| if remaining_entries: | |
| # Select another fallback | |
| alternative_entry = self._select_fallback( | |
| endpoint, remaining_entries, strategy, context | |
| ) | |
| if alternative_entry is not None: | |
| try: | |
| result = alternative_entry.get_value(*args, **kwargs) | |
| self.telemetry.debug( | |
| f"Used alternative fallback {alternative_entry.key} for endpoint {endpoint}", | |
| category=EventCategory.API, | |
| context={"strategy": strategy.value} | |
| ) | |
| return result | |
| except Exception: | |
| pass | |
| return None | |
| def _select_fallback( | |
| self, | |
| endpoint: str, | |
| entries: List[FallbackEntry], | |
| strategy: FallbackStrategy, | |
| context: Optional[Dict[str, Any]] = None | |
| ) -> Optional[FallbackEntry]: | |
| """ | |
| Select a fallback using the specified strategy. | |
| Args: | |
| endpoint: API endpoint | |
| entries: Available fallback entries | |
| strategy: Selection strategy | |
| context: Request context for context-based selection | |
| Returns: | |
| Selected fallback entry or None if no fallback is available | |
| """ | |
| if not entries: | |
| return None | |
| # Filter by health status if using health-based strategy | |
| if strategy == FallbackStrategy.HEALTH_BASED: | |
| healthy_entries = [ | |
| entry for entry in entries | |
| if entry.metadata.health_status == HealthStatus.HEALTHY | |
| ] | |
| if healthy_entries: | |
| entries = healthy_entries | |
| else: | |
| # Fall back to degraded entries if no healthy ones | |
| degraded_entries = [ | |
| entry for entry in entries | |
| if entry.metadata.health_status == HealthStatus.DEGRADED | |
| ] | |
| if degraded_entries: | |
| entries = degraded_entries | |
| # Group by tier | |
| tier_groups: Dict[FallbackTier, List[FallbackEntry]] = {} | |
| for entry in entries: | |
| tier = entry.metadata.tier | |
| if tier not in tier_groups: | |
| tier_groups[tier] = [] | |
| tier_groups[tier].append(entry) | |
| # Sort tiers by priority | |
| sorted_tiers = sorted(tier_groups.keys()) | |
| # Select entries from highest priority tier | |
| if sorted_tiers: | |
| highest_tier = sorted_tiers[0] | |
| tier_entries = tier_groups[highest_tier] | |
| if len(tier_entries) == 1: | |
| return tier_entries[0] | |
| # Apply strategy within the tier | |
| if strategy == FallbackStrategy.STATIC: | |
| # Always use the first entry | |
| return tier_entries[0] | |
| elif strategy == FallbackStrategy.RANDOM: | |
| # Randomly select an entry | |
| return random.choice(tier_entries) | |
| elif strategy == FallbackStrategy.WEIGHTED: | |
| # Select based on weights | |
| weights = [entry.metadata.weight for entry in tier_entries] | |
| return random.choices(tier_entries, weights=weights, k=1)[0] | |
| elif strategy == FallbackStrategy.LEAST_RECENTLY_USED: | |
| # Select least recently used entry | |
| return min(tier_entries, key=lambda e: e.metadata.last_used) | |
| elif strategy == FallbackStrategy.ROUND_ROBIN: | |
| # Cycle through entries | |
| if endpoint not in self.round_robin_indexes: | |
| self.round_robin_indexes[endpoint] = 0 | |
| index = self.round_robin_indexes[endpoint] | |
| selected = tier_entries[index % len(tier_entries)] | |
| # Update index for next time | |
| self.round_robin_indexes[endpoint] = (index + 1) % len(tier_entries) | |
| return selected | |
| elif strategy == FallbackStrategy.CONTEXT_BASED: | |
| # Select based on context rules | |
| if context: | |
| for entry in tier_entries: | |
| if self._matches_context(entry, context): | |
| return entry | |
| # Fall back to first entry if no match | |
| return tier_entries[0] | |
| else: | |
| # Unknown strategy, use first entry | |
| return tier_entries[0] | |
| return None | |
| def _matches_context(self, entry: FallbackEntry, context: Dict[str, Any]) -> bool: | |
| """ | |
| Check if an entry matches the given context. | |
| Args: | |
| entry: Fallback entry | |
| context: Request context | |
| Returns: | |
| Whether the entry matches the context | |
| """ | |
| if not entry.metadata.context_rules: | |
| return False | |
| # Check all rules | |
| for key, rule_value in entry.metadata.context_rules.items(): | |
| if key not in context: | |
| return False | |
| context_value = context[key] | |
| # Check if rule value is a condition | |
| if isinstance(rule_value, dict) and "operator" in rule_value: | |
| operator = rule_value["operator"] | |
| value = rule_value["value"] | |
| if operator == "eq" and context_value != value: | |
| return False | |
| elif operator == "ne" and context_value == value: | |
| return False | |
| elif operator == "gt" and not (isinstance(context_value, (int, float)) and context_value > value): | |
| return False | |
| elif operator == "lt" and not (isinstance(context_value, (int, float)) and context_value < value): | |
| return False | |
| elif operator == "gte" and not (isinstance(context_value, (int, float)) and context_value >= value): | |
| return False | |
| elif operator == "lte" and not (isinstance(context_value, (int, float)) and context_value <= value): | |
| return False | |
| elif operator == "contains" and not ( | |
| (isinstance(context_value, str) and value in context_value) or | |
| (isinstance(context_value, (list, tuple)) and value in context_value) | |
| ): | |
| return False | |
| elif operator == "in" and context_value not in value: | |
| return False | |
| # Simple equality check | |
| elif context_value != rule_value: | |
| return False | |
| return True | |
| def is_degraded_mode(self) -> bool: | |
| """ | |
| Check if the system is in degraded mode. | |
| Returns: | |
| Whether the system is in degraded mode | |
| """ | |
| return self.degraded_mode | |
| def get_fallbacks_for_endpoint(self, endpoint: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get all fallbacks for an endpoint. | |
| Args: | |
| endpoint: API endpoint | |
| Returns: | |
| List of fallback entries as dictionaries | |
| """ | |
| with self.fallbacks_lock: | |
| if endpoint not in self.fallbacks: | |
| return [] | |
| return [entry.to_dict() for entry in self.fallbacks[endpoint]] | |
| def get_all_fallbacks(self) -> Dict[str, List[Dict[str, Any]]]: | |
| """ | |
| Get all registered fallbacks. | |
| Returns: | |
| Dictionary of fallbacks by endpoint | |
| """ | |
| result = {} | |
| with self.fallbacks_lock: | |
| for endpoint, entries in self.fallbacks.items(): | |
| result[endpoint] = [entry.to_dict() for entry in entries] | |
| return result | |
| def clear_fallbacks(self, endpoint: Optional[str] = None) -> None: | |
| """ | |
| Clear fallbacks. | |
| Args: | |
| endpoint: Optional endpoint to clear (clears all if None) | |
| """ | |
| with self.fallbacks_lock: | |
| if endpoint: | |
| if endpoint in self.fallbacks: | |
| del self.fallbacks[endpoint] | |
| self.telemetry.debug( | |
| f"Cleared fallbacks for endpoint {endpoint}", | |
| category=EventCategory.API | |
| ) | |
| else: | |
| self.fallbacks.clear() | |
| self.telemetry.debug( | |
| "Cleared all fallbacks", | |
| category=EventCategory.API | |
| ) | |
| def shutdown(self) -> None: | |
| """Shutdown the registry and save to storage if enabled.""" | |
| # Save to storage if enabled | |
| if self.config["save_to_storage"] and self.config["fallback_storage_path"]: | |
| self._save_to_storage() | |
| # Singleton instance | |
| _instance = None | |
| def get_fallback_registry(options: Dict[str, Any] = None) -> APIFallbackRegistry: | |
| """ | |
| Get the global fallback registry instance. | |
| Args: | |
| options: Configuration options | |
| Returns: | |
| APIFallbackRegistry instance | |
| """ | |
| global _instance | |
| if _instance is None: | |
| _instance = APIFallbackRegistry(options) | |
| return _instance |