Spaces:
Running
Running
File size: 34,573 Bytes
2978bba | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 | """
Enhanced API fallback registry for MorphGuard.
This module provides a centralized registry for API fallbacks with:
- Context-aware fallback selection
- Tiered fallback strategies
- Dynamic fallback generation
- Health-aware routing
- Degraded mode operations
"""
import os
import json
import time
import logging
import threading
import random
from enum import Enum
from typing import Dict, Any, List, Optional, Union, Callable, Tuple, TypeVar
# Import error handling for proper error classification
from src.error_handling import MGError, ErrorCode, ErrorSeverity, ErrorCategory
from src.telemetry import get_telemetry, EventCategory
# Type definitions
T = TypeVar('T')
FallbackFunc = Callable[..., T]
FallbackValue = Any
class FallbackTier(Enum):
"""Tiers of fallbacks, ordered by preference."""
PRIMARY = 0 # First choice fallback
SECONDARY = 1 # Second choice fallback
TERTIARY = 2 # Third choice fallback
EMERGENCY = 3 # Last resort fallback
class FallbackStrategy(Enum):
"""Strategies for selecting fallbacks."""
STATIC = "static" # Always use the same fallback
RANDOM = "random" # Randomly select from available fallbacks
WEIGHTED = "weighted" # Select based on weights
LEAST_RECENTLY_USED = "lru" # Select least recently used fallback
ROUND_ROBIN = "round_robin" # Cycle through fallbacks
CONTEXT_BASED = "context" # Select based on request context
HEALTH_BASED = "health" # Select based on health checks
class HealthStatus(Enum):
"""Health status of services or endpoints."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
class FallbackMetadata:
"""Metadata for a fallback entry."""
def __init__(
self,
tier: FallbackTier = FallbackTier.PRIMARY,
weight: float = 1.0,
context_rules: Optional[Dict[str, Any]] = None,
last_used: float = 0,
use_count: int = 0,
success_count: int = 0,
failure_count: int = 0,
health_status: HealthStatus = HealthStatus.UNKNOWN,
last_health_check: float = 0,
tags: Optional[Dict[str, str]] = None
):
"""
Initialize fallback metadata.
Args:
tier: Fallback tier
weight: Weight for weighted selection (higher = more likely to be selected)
context_rules: Rules for context-based selection
last_used: Timestamp of last use
use_count: Number of times used
success_count: Number of successful uses
failure_count: Number of failed uses
health_status: Current health status
last_health_check: Timestamp of last health check
tags: Tags for categorizing fallbacks
"""
self.tier = tier
self.weight = weight
self.context_rules = context_rules or {}
self.last_used = last_used
self.use_count = use_count
self.success_count = success_count
self.failure_count = failure_count
self.health_status = health_status
self.last_health_check = last_health_check
self.tags = tags or {}
def update_usage(self, success: bool = True) -> None:
"""
Update usage statistics.
Args:
success: Whether the fallback was successful
"""
self.last_used = time.time()
self.use_count += 1
if success:
self.success_count += 1
else:
self.failure_count += 1
def update_health(self, status: HealthStatus) -> None:
"""
Update health status.
Args:
status: New health status
"""
self.health_status = status
self.last_health_check = time.time()
def success_rate(self) -> float:
"""
Calculate success rate.
Returns:
Success rate (0-1) or 0 if never used
"""
if self.use_count == 0:
return 0
return self.success_count / self.use_count
def to_dict(self) -> Dict[str, Any]:
"""
Convert to dictionary.
Returns:
Dictionary representation
"""
return {
"tier": self.tier.name,
"weight": self.weight,
"context_rules": self.context_rules,
"last_used": self.last_used,
"use_count": self.use_count,
"success_count": self.success_count,
"failure_count": self.failure_count,
"health_status": self.health_status.value,
"last_health_check": self.last_health_check,
"tags": self.tags,
"success_rate": self.success_rate()
}
class FallbackEntry:
"""A fallback entry in the registry."""
def __init__(
self,
key: str,
value: Union[Any, FallbackFunc],
is_function: bool = False,
metadata: Optional[FallbackMetadata] = None
):
"""
Initialize a fallback entry.
Args:
key: Unique identifier for the fallback
value: Fallback value or function
is_function: Whether the value is a function
metadata: Fallback metadata
"""
self.key = key
self.value = value
self.is_function = is_function
self.metadata = metadata or FallbackMetadata()
def get_value(self, *args, **kwargs) -> Any:
"""
Get the fallback value.
Args:
*args: Arguments to pass to the function
**kwargs: Keyword arguments to pass to the function
Returns:
Fallback value
"""
try:
if self.is_function:
result = self.value(*args, **kwargs)
self.metadata.update_usage(success=True)
return result
else:
self.metadata.update_usage(success=True)
return self.value
except Exception as e:
self.metadata.update_usage(success=False)
raise e
def to_dict(self) -> Dict[str, Any]:
"""
Convert to dictionary.
Returns:
Dictionary representation
"""
return {
"key": self.key,
"is_function": self.is_function,
"metadata": self.metadata.to_dict(),
"value_type": type(self.value).__name__ if not self.is_function else "function"
}
class APIFallbackRegistry:
"""
Registry for API fallbacks with intelligent selection.
This registry provides:
- Endpoint-specific fallbacks
- Tiered fallback selection
- Multiple selection strategies
- Health-aware routing
- Degraded mode operations
"""
def __init__(self, options: Dict[str, Any] = None):
"""
Initialize the fallback registry.
Args:
options: Configuration options
"""
# Default configuration
self.config = {
"default_strategy": FallbackStrategy.STATIC,
"health_check_interval": 60, # seconds
"auto_health_checks": True,
"fallback_storage_path": None,
"load_from_storage": False,
"save_to_storage": False,
"context_rules_path": None,
"degraded_mode_threshold": 0.5, # 50% failure rate
"enable_telemetry": True
}
# Update with user-provided options
if options:
self.config.update(options)
# Initialize telemetry
self.telemetry = get_telemetry()
# Fallback entries by endpoint
self.fallbacks: Dict[str, List[FallbackEntry]] = {}
# Current selection indexes for round-robin
self.round_robin_indexes: Dict[str, int] = {}
# System health status
self.system_health = HealthStatus.HEALTHY
# Health check thread
self.health_check_thread = None
# Degraded mode flag
self.degraded_mode = False
# Locks
self.fallbacks_lock = threading.RLock()
self.health_lock = threading.RLock()
# Load fallbacks from storage if enabled
if self.config["load_from_storage"] and self.config["fallback_storage_path"]:
self._load_from_storage()
# Start health check thread if enabled
if self.config["auto_health_checks"]:
self._start_health_check_thread()
def _start_health_check_thread(self) -> None:
"""Start the background health check thread."""
self.health_check_thread = threading.Thread(
target=self._health_check_worker,
daemon=True,
name="fallback_health_checker"
)
self.health_check_thread.start()
def _health_check_worker(self) -> None:
"""Background worker to check health of fallbacks."""
while True:
try:
# Wait for the check interval
time.sleep(self.config["health_check_interval"])
# Run health checks
self._check_all_health()
# Update degraded mode flag
self._update_degraded_mode()
# Save to storage if enabled
if self.config["save_to_storage"] and self.config["fallback_storage_path"]:
self._save_to_storage()
except Exception as e:
self.telemetry.error(
f"Error in fallback health check worker: {e}",
category=EventCategory.API,
exc_info=True
)
def _check_all_health(self) -> None:
"""Check health of all fallbacks."""
with self.fallbacks_lock:
for endpoint, entries in self.fallbacks.items():
for entry in entries:
if entry.is_function:
# For function fallbacks, check if they're callable
try:
# Simple health check - just make sure the function is callable
if callable(entry.value):
entry.metadata.update_health(HealthStatus.HEALTHY)
else:
entry.metadata.update_health(HealthStatus.UNHEALTHY)
except Exception:
entry.metadata.update_health(HealthStatus.UNHEALTHY)
def _update_degraded_mode(self) -> None:
"""Update the degraded mode flag based on health checks."""
with self.health_lock:
# Count unhealthy endpoints
total_endpoints = 0
unhealthy_endpoints = 0
with self.fallbacks_lock:
for endpoint, entries in self.fallbacks.items():
total_endpoints += 1
# Check if all fallbacks for this endpoint are unhealthy
all_unhealthy = all(
entry.metadata.health_status == HealthStatus.UNHEALTHY
for entry in entries
)
if all_unhealthy:
unhealthy_endpoints += 1
# Calculate percentage of unhealthy endpoints
if total_endpoints > 0:
unhealthy_ratio = unhealthy_endpoints / total_endpoints
# Update degraded mode flag
old_degraded_mode = self.degraded_mode
self.degraded_mode = unhealthy_ratio >= self.config["degraded_mode_threshold"]
# Log degraded mode changes
if self.degraded_mode != old_degraded_mode:
if self.degraded_mode:
self.telemetry.warning(
"System entered degraded mode",
category=EventCategory.API,
context={
"unhealthy_ratio": unhealthy_ratio,
"threshold": self.config["degraded_mode_threshold"]
}
)
else:
self.telemetry.info(
"System exited degraded mode",
category=EventCategory.API,
context={
"unhealthy_ratio": unhealthy_ratio,
"threshold": self.config["degraded_mode_threshold"]
}
)
def _load_from_storage(self) -> None:
"""Load fallbacks from storage."""
try:
if not os.path.exists(self.config["fallback_storage_path"]):
return
with open(self.config["fallback_storage_path"], "r") as f:
data = json.load(f)
with self.fallbacks_lock:
for endpoint, entries in data.items():
self.fallbacks[endpoint] = []
for entry_data in entries:
# Function fallbacks can't be serialized, so only load static values
if not entry_data.get("is_function", False):
metadata = FallbackMetadata(
tier=FallbackTier[entry_data["metadata"]["tier"]],
weight=entry_data["metadata"]["weight"],
context_rules=entry_data["metadata"]["context_rules"],
last_used=entry_data["metadata"]["last_used"],
use_count=entry_data["metadata"]["use_count"],
success_count=entry_data["metadata"]["success_count"],
failure_count=entry_data["metadata"]["failure_count"],
health_status=HealthStatus(entry_data["metadata"]["health_status"]),
last_health_check=entry_data["metadata"]["last_health_check"],
tags=entry_data["metadata"]["tags"]
)
entry = FallbackEntry(
key=entry_data["key"],
value=entry_data["value"],
is_function=False,
metadata=metadata
)
self.fallbacks[endpoint].append(entry)
self.telemetry.info(
"Loaded fallbacks from storage",
category=EventCategory.API,
context={"path": self.config["fallback_storage_path"]}
)
except Exception as e:
self.telemetry.error(
f"Failed to load fallbacks from storage: {e}",
category=EventCategory.API,
exc_info=True
)
def _save_to_storage(self) -> None:
"""Save fallbacks to storage."""
try:
if not self.config["fallback_storage_path"]:
return
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.config["fallback_storage_path"]), exist_ok=True)
data = {}
with self.fallbacks_lock:
for endpoint, entries in self.fallbacks.items():
data[endpoint] = []
for entry in entries:
# Skip function fallbacks as they can't be serialized
if entry.is_function:
continue
entry_data = entry.to_dict()
entry_data["value"] = entry.value # Add actual value
data[endpoint].append(entry_data)
with open(self.config["fallback_storage_path"], "w") as f:
json.dump(data, f, indent=2)
self.telemetry.debug(
"Saved fallbacks to storage",
category=EventCategory.API,
context={"path": self.config["fallback_storage_path"]}
)
except Exception as e:
self.telemetry.error(
f"Failed to save fallbacks to storage: {e}",
category=EventCategory.API,
exc_info=True
)
def register_fallback(
self,
endpoint: str,
key: str,
fallback: Union[Any, FallbackFunc],
is_function: bool = False,
tier: FallbackTier = FallbackTier.PRIMARY,
weight: float = 1.0,
context_rules: Optional[Dict[str, Any]] = None,
tags: Optional[Dict[str, str]] = None
) -> None:
"""
Register a fallback for an endpoint.
Args:
endpoint: API endpoint pattern (can use wildcards like * for pattern matching)
key: Unique identifier for the fallback
fallback: Fallback value or function
is_function: Whether the fallback is a function
tier: Fallback tier
weight: Weight for weighted selection
context_rules: Rules for context-based selection
tags: Tags for categorizing fallbacks
"""
with self.fallbacks_lock:
# Create entry list if it doesn't exist
if endpoint not in self.fallbacks:
self.fallbacks[endpoint] = []
# Check if entry with same key already exists
for i, entry in enumerate(self.fallbacks[endpoint]):
if entry.key == key:
# Replace existing entry
metadata = FallbackMetadata(
tier=tier,
weight=weight,
context_rules=context_rules,
last_used=entry.metadata.last_used,
use_count=entry.metadata.use_count,
success_count=entry.metadata.success_count,
failure_count=entry.metadata.failure_count,
health_status=entry.metadata.health_status,
last_health_check=entry.metadata.last_health_check,
tags=tags or entry.metadata.tags
)
self.fallbacks[endpoint][i] = FallbackEntry(
key=key,
value=fallback,
is_function=is_function,
metadata=metadata
)
self.telemetry.debug(
f"Updated fallback {key} for endpoint {endpoint}",
category=EventCategory.API
)
return
# Add new entry
metadata = FallbackMetadata(
tier=tier,
weight=weight,
context_rules=context_rules,
tags=tags
)
self.fallbacks[endpoint].append(FallbackEntry(
key=key,
value=fallback,
is_function=is_function,
metadata=metadata
))
self.telemetry.debug(
f"Registered fallback {key} for endpoint {endpoint}",
category=EventCategory.API
)
def unregister_fallback(self, endpoint: str, key: str) -> bool:
"""
Unregister a fallback.
Args:
endpoint: API endpoint
key: Fallback key
Returns:
Whether the fallback was found and unregistered
"""
with self.fallbacks_lock:
if endpoint not in self.fallbacks:
return False
# Find entry with matching key
for i, entry in enumerate(self.fallbacks[endpoint]):
if entry.key == key:
# Remove entry
self.fallbacks[endpoint].pop(i)
# Remove endpoint if empty
if not self.fallbacks[endpoint]:
del self.fallbacks[endpoint]
self.telemetry.debug(
f"Unregistered fallback {key} for endpoint {endpoint}",
category=EventCategory.API
)
return True
return False
def get_fallback(
self,
endpoint: str,
strategy: Optional[FallbackStrategy] = None,
context: Optional[Dict[str, Any]] = None,
args: Optional[List[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None
) -> Optional[Any]:
"""
Get a fallback value for an endpoint.
Args:
endpoint: API endpoint
strategy: Selection strategy
context: Request context for context-based selection
args: Arguments to pass to function fallbacks
kwargs: Keyword arguments to pass to function fallbacks
Returns:
Fallback value or None if no fallback is available
"""
with self.fallbacks_lock:
# Find matching endpoint (support wildcard patterns)
matching_endpoints = []
for pattern in self.fallbacks.keys():
# Convert pattern to regex
regex_pattern = pattern.replace("*", ".*")
# Check if endpoint matches pattern
if endpoint == pattern or (
"*" in pattern and
(endpoint.startswith(pattern.replace("*", "")) or
endpoint.endswith(pattern.replace("*", "")) or
endpoint.replace("/", "") == pattern.replace("*/", "").replace("/*", "").replace("*", ""))
):
matching_endpoints.append(pattern)
if not matching_endpoints:
self.telemetry.debug(
f"No fallbacks found for endpoint {endpoint}",
category=EventCategory.API
)
return None
# Get entries for all matching endpoints
all_entries = []
for pattern in matching_endpoints:
all_entries.extend(self.fallbacks[pattern])
if not all_entries:
return None
# Use provided strategy or default
if strategy is None:
strategy = FallbackStrategy(self.config["default_strategy"])
# Select fallback using the specified strategy
selected_entry = self._select_fallback(
endpoint, all_entries, strategy, context
)
if selected_entry is None:
return None
# Get the fallback value
args = args or []
kwargs = kwargs or {}
try:
result = selected_entry.get_value(*args, **kwargs)
self.telemetry.debug(
f"Used fallback {selected_entry.key} for endpoint {endpoint}",
category=EventCategory.API,
context={"strategy": strategy.value}
)
return result
except Exception as e:
self.telemetry.error(
f"Error using fallback {selected_entry.key} for endpoint {endpoint}: {e}",
category=EventCategory.API,
exc_info=True
)
# Try another fallback
remaining_entries = [
entry for entry in all_entries
if entry.key != selected_entry.key
]
if remaining_entries:
# Select another fallback
alternative_entry = self._select_fallback(
endpoint, remaining_entries, strategy, context
)
if alternative_entry is not None:
try:
result = alternative_entry.get_value(*args, **kwargs)
self.telemetry.debug(
f"Used alternative fallback {alternative_entry.key} for endpoint {endpoint}",
category=EventCategory.API,
context={"strategy": strategy.value}
)
return result
except Exception:
pass
return None
def _select_fallback(
self,
endpoint: str,
entries: List[FallbackEntry],
strategy: FallbackStrategy,
context: Optional[Dict[str, Any]] = None
) -> Optional[FallbackEntry]:
"""
Select a fallback using the specified strategy.
Args:
endpoint: API endpoint
entries: Available fallback entries
strategy: Selection strategy
context: Request context for context-based selection
Returns:
Selected fallback entry or None if no fallback is available
"""
if not entries:
return None
# Filter by health status if using health-based strategy
if strategy == FallbackStrategy.HEALTH_BASED:
healthy_entries = [
entry for entry in entries
if entry.metadata.health_status == HealthStatus.HEALTHY
]
if healthy_entries:
entries = healthy_entries
else:
# Fall back to degraded entries if no healthy ones
degraded_entries = [
entry for entry in entries
if entry.metadata.health_status == HealthStatus.DEGRADED
]
if degraded_entries:
entries = degraded_entries
# Group by tier
tier_groups: Dict[FallbackTier, List[FallbackEntry]] = {}
for entry in entries:
tier = entry.metadata.tier
if tier not in tier_groups:
tier_groups[tier] = []
tier_groups[tier].append(entry)
# Sort tiers by priority
sorted_tiers = sorted(tier_groups.keys())
# Select entries from highest priority tier
if sorted_tiers:
highest_tier = sorted_tiers[0]
tier_entries = tier_groups[highest_tier]
if len(tier_entries) == 1:
return tier_entries[0]
# Apply strategy within the tier
if strategy == FallbackStrategy.STATIC:
# Always use the first entry
return tier_entries[0]
elif strategy == FallbackStrategy.RANDOM:
# Randomly select an entry
return random.choice(tier_entries)
elif strategy == FallbackStrategy.WEIGHTED:
# Select based on weights
weights = [entry.metadata.weight for entry in tier_entries]
return random.choices(tier_entries, weights=weights, k=1)[0]
elif strategy == FallbackStrategy.LEAST_RECENTLY_USED:
# Select least recently used entry
return min(tier_entries, key=lambda e: e.metadata.last_used)
elif strategy == FallbackStrategy.ROUND_ROBIN:
# Cycle through entries
if endpoint not in self.round_robin_indexes:
self.round_robin_indexes[endpoint] = 0
index = self.round_robin_indexes[endpoint]
selected = tier_entries[index % len(tier_entries)]
# Update index for next time
self.round_robin_indexes[endpoint] = (index + 1) % len(tier_entries)
return selected
elif strategy == FallbackStrategy.CONTEXT_BASED:
# Select based on context rules
if context:
for entry in tier_entries:
if self._matches_context(entry, context):
return entry
# Fall back to first entry if no match
return tier_entries[0]
else:
# Unknown strategy, use first entry
return tier_entries[0]
return None
def _matches_context(self, entry: FallbackEntry, context: Dict[str, Any]) -> bool:
"""
Check if an entry matches the given context.
Args:
entry: Fallback entry
context: Request context
Returns:
Whether the entry matches the context
"""
if not entry.metadata.context_rules:
return False
# Check all rules
for key, rule_value in entry.metadata.context_rules.items():
if key not in context:
return False
context_value = context[key]
# Check if rule value is a condition
if isinstance(rule_value, dict) and "operator" in rule_value:
operator = rule_value["operator"]
value = rule_value["value"]
if operator == "eq" and context_value != value:
return False
elif operator == "ne" and context_value == value:
return False
elif operator == "gt" and not (isinstance(context_value, (int, float)) and context_value > value):
return False
elif operator == "lt" and not (isinstance(context_value, (int, float)) and context_value < value):
return False
elif operator == "gte" and not (isinstance(context_value, (int, float)) and context_value >= value):
return False
elif operator == "lte" and not (isinstance(context_value, (int, float)) and context_value <= value):
return False
elif operator == "contains" and not (
(isinstance(context_value, str) and value in context_value) or
(isinstance(context_value, (list, tuple)) and value in context_value)
):
return False
elif operator == "in" and context_value not in value:
return False
# Simple equality check
elif context_value != rule_value:
return False
return True
def is_degraded_mode(self) -> bool:
"""
Check if the system is in degraded mode.
Returns:
Whether the system is in degraded mode
"""
return self.degraded_mode
def get_fallbacks_for_endpoint(self, endpoint: str) -> List[Dict[str, Any]]:
"""
Get all fallbacks for an endpoint.
Args:
endpoint: API endpoint
Returns:
List of fallback entries as dictionaries
"""
with self.fallbacks_lock:
if endpoint not in self.fallbacks:
return []
return [entry.to_dict() for entry in self.fallbacks[endpoint]]
def get_all_fallbacks(self) -> Dict[str, List[Dict[str, Any]]]:
"""
Get all registered fallbacks.
Returns:
Dictionary of fallbacks by endpoint
"""
result = {}
with self.fallbacks_lock:
for endpoint, entries in self.fallbacks.items():
result[endpoint] = [entry.to_dict() for entry in entries]
return result
def clear_fallbacks(self, endpoint: Optional[str] = None) -> None:
"""
Clear fallbacks.
Args:
endpoint: Optional endpoint to clear (clears all if None)
"""
with self.fallbacks_lock:
if endpoint:
if endpoint in self.fallbacks:
del self.fallbacks[endpoint]
self.telemetry.debug(
f"Cleared fallbacks for endpoint {endpoint}",
category=EventCategory.API
)
else:
self.fallbacks.clear()
self.telemetry.debug(
"Cleared all fallbacks",
category=EventCategory.API
)
def shutdown(self) -> None:
"""Shutdown the registry and save to storage if enabled."""
# Save to storage if enabled
if self.config["save_to_storage"] and self.config["fallback_storage_path"]:
self._save_to_storage()
# Singleton instance
_instance = None
def get_fallback_registry(options: Dict[str, Any] = None) -> APIFallbackRegistry:
"""
Get the global fallback registry instance.
Args:
options: Configuration options
Returns:
APIFallbackRegistry instance
"""
global _instance
if _instance is None:
_instance = APIFallbackRegistry(options)
return _instance |