Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import logging | |
| import time | |
| from typing import Optional, Callable, Any, Awaitable | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from enum import Enum | |
| logger = logging.getLogger(__name__) | |
| class HealthStatus(Enum): | |
| HEALTHY = "healthy" | |
| DEGRADED = "degraded" | |
| UNHEALTHY = "unhealthy" | |
| class HealthCheck: | |
| component: str | |
| status: HealthStatus | |
| latency_ms: float | |
| message: str = "" | |
| checked_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) | |
| class CircuitBreaker: | |
| def __init__( | |
| self, | |
| failure_threshold: int = 5, | |
| recovery_timeout: float = 60.0, | |
| expected_exception: type = Exception, | |
| ): | |
| self.failure_threshold = failure_threshold | |
| self.recovery_timeout = recovery_timeout | |
| self.expected_exception = expected_exception | |
| self._failure_count = 0 | |
| self._last_failure_time: Optional[float] = None | |
| self._state = "closed" | |
| self._lock = asyncio.Lock() | |
| def state(self) -> str: | |
| return self._state | |
| async def call(self, fn: Callable, *args, **kwargs): | |
| async with self._lock: | |
| if self._state == "open": | |
| if ( | |
| self._last_failure_time | |
| and time.time() - self._last_failure_time > self.recovery_timeout | |
| ): | |
| logger.info(f"CircuitBreaker: transitioning open -> half-open") | |
| self._state = "half-open" | |
| self._failure_count = 0 | |
| else: | |
| raise Exception(f"CircuitBreaker is open, call rejected") | |
| try: | |
| result = await fn(*args, **kwargs) | |
| async with self._lock: | |
| if self._state == "half-open": | |
| logger.info(f"CircuitBreaker: half-open -> closed") | |
| self._state = "closed" | |
| self._failure_count = 0 | |
| return result | |
| except self.expected_exception as e: | |
| async with self._lock: | |
| self._failure_count += 1 | |
| self._last_failure_time = time.time() | |
| if self._failure_count >= self.failure_threshold: | |
| logger.warning(f"CircuitBreaker: closed -> open (failures={self._failure_count})") | |
| self._state = "open" | |
| raise | |
| class FallbackChain: | |
| def __init__(self, providers: list[tuple[str, Callable]]): | |
| self.providers = providers | |
| async def execute(self, *args, **kwargs): | |
| errors = [] | |
| for name, fn in self.providers: | |
| try: | |
| logger.info(f"[FallbackChain] Trying provider: {name}") | |
| return await fn(*args, **kwargs) | |
| except Exception as e: | |
| logger.warning(f"[FallbackChain] {name} failed: {e}") | |
| errors.append(f"{name}: {str(e)}") | |
| continue | |
| raise Exception(f"All providers failed: {'; '.join(errors)}") | |
| class AdaptiveConcurrency: | |
| def __init__(self, initial: int = 3, min_val: int = 1, max_val: int = 10): | |
| self.current = initial | |
| self.min = min_val | |
| self.max = max_val | |
| self._successes = 0 | |
| self._failures = 0 | |
| self._lock = asyncio.Lock() | |
| async def acquire(self) -> asyncio.Semaphore: | |
| async with self._lock: | |
| sem = asyncio.Semaphore(self.current) | |
| return sem | |
| def record_success(self): | |
| self._successes += 1 | |
| if self._successes >= 5 and self.current < self.max: | |
| self.current = min(self.max, self.current + 1) | |
| self._successes = 0 | |
| logger.debug(f"[Concurrency] Increased to {self.current}") | |
| def record_failure(self): | |
| self._failures += 1 | |
| if self._failures >= 2 and self.current > self.min: | |
| self.current = max(self.min, self.current - 1) | |
| self._failures = 0 | |
| logger.debug(f"[Concurrency] Decreased to {self.current}") | |
| class PersistentCache: | |
| def __init__(self, cache_dir: str = ".cache", max_age_seconds: float = 3600.0): | |
| self.cache_dir = cache_dir | |
| self.max_age = max_age_seconds | |
| self._memory: dict[str, tuple[Any, float]] = {} | |
| self._hits = 0 | |
| self._misses = 0 | |
| def _cache_path(self, key: str) -> str: | |
| import hashlib | |
| hashed = hashlib.md5(key.encode()).hexdigest() | |
| return os.path.join(self.cache_dir, f"{hashed}.json") | |
| async def get(self, key: str) -> Optional[Any]: | |
| if key in self._memory: | |
| value, timestamp = self._memory[key] | |
| if time.time() - timestamp < self.max_age: | |
| self._hits += 1 | |
| return value | |
| else: | |
| del self._memory[key] | |
| cache_file = self._cache_path(key) | |
| if os.path.exists(cache_file): | |
| try: | |
| import json | |
| stat = os.stat(cache_file) | |
| if time.time() - stat.st_mtime < self.max_age: | |
| with open(cache_file) as f: | |
| value = json.load(f) | |
| self._memory[key] = (value, time.time()) | |
| self._hits += 1 | |
| return value | |
| else: | |
| os.remove(cache_file) | |
| except Exception as e: | |
| logger.debug(f"[Cache] Failed to read cache file: {e}") | |
| self._misses += 1 | |
| return None | |
| async def set(self, key: str, value: Any): | |
| self._memory[key] = (value, time.time()) | |
| try: | |
| import json | |
| os.makedirs(self.cache_dir, exist_ok=True) | |
| cache_file = self._cache_path(key) | |
| with open(cache_file, "w") as f: | |
| json.dump(value, f) | |
| except Exception as e: | |
| logger.debug(f"[Cache] Failed to write cache file: {e}") | |
| def stats(self) -> dict: | |
| total = self._hits + self._misses | |
| hit_rate = (self._hits / total * 100) if total > 0 else 0 | |
| return {"hits": self._hits, "misses": self._misses, "hit_rate_pct": round(hit_rate, 1)} | |
| class HealthMonitor: | |
| def __init__(self): | |
| self._checks: dict[str, HealthCheck] = {} | |
| self._lock = asyncio.Lock() | |
| async def register( | |
| self, | |
| component: str, | |
| check_fn: Callable[[], Awaitable[HealthCheck]], | |
| ): | |
| async with self._lock: | |
| self._checks[component] = check_fn | |
| async def run_check(self, component: str) -> HealthCheck: | |
| if component not in self._checks: | |
| return HealthCheck(component, HealthStatus.UNHEALTHY, 0, "Unknown component") | |
| try: | |
| start = time.perf_counter() | |
| result = await self._checks[component]() | |
| latency = (time.perf_counter() - start) * 1000 | |
| result.latency_ms = round(latency, 1) | |
| return result | |
| except Exception as e: | |
| return HealthCheck(component, HealthStatus.UNHEALTHY, 0, str(e)) | |
| async def run_all_checks(self) -> dict[str, HealthCheck]: | |
| results = {} | |
| for component in list(self._checks.keys()): | |
| results[component] = await self.run_check(component) | |
| return results | |
| def get_overall_status(self, checks: dict[str, HealthCheck]) -> HealthStatus: | |
| if not checks: | |
| return HealthStatus.UNHEALTHY | |
| statuses = [c.status for c in checks.values()] | |
| if any(s == HealthStatus.UNHEALTHY for s in statuses): | |
| return HealthStatus.UNHEALTHY | |
| if any(s == HealthStatus.DEGRADED for s in statuses): | |
| return HealthStatus.DEGRADED | |
| return HealthStatus.HEALTHY | |
| _global_health_monitor = HealthMonitor() | |
| def get_health_monitor() -> HealthMonitor: | |
| return _global_health_monitor | |