Spaces:
Paused
Paused
| """ | |
| Provider Fallback Manager | |
| Manages fallback to external providers when HF cannot provide data | |
| Uses /mnt/data/api-config-complete.txt as authoritative source | |
| """ | |
| import json | |
| import os | |
| import asyncio | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from datetime import datetime, timezone | |
| import aiohttp | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from ..enhanced_logger import logger | |
| # ==================== | |
| # CONFIGURATION | |
| # ==================== | |
| FALLBACK_CONFIG_PATH = "/mnt/data/api-config-complete.txt" | |
| FALLBACK_CONFIG_URL = os.getenv("FALLBACK_CONFIG_URL", None) | |
| HF_PRIORITY = True # Always try HF first | |
| MAX_RETRIES = 3 | |
| TIMEOUT_SECONDS = 10 | |
| CIRCUIT_BREAKER_THRESHOLD = 5 | |
| CIRCUIT_BREAKER_TIMEOUT = 300 # 5 minutes | |
| # ==================== | |
| # ENUMS & MODELS | |
| # ==================== | |
| class ProviderStatus(Enum): | |
| """Provider availability status""" | |
| AVAILABLE = "available" | |
| DEGRADED = "degraded" | |
| UNAVAILABLE = "unavailable" | |
| CIRCUIT_OPEN = "circuit_open" | |
| class Provider: | |
| """Provider configuration""" | |
| name: str | |
| base_url: str | |
| api_key: Optional[str] = None | |
| priority: int = 100 | |
| endpoints: Dict[str, str] = None | |
| rate_limit: Optional[int] = None | |
| status: ProviderStatus = ProviderStatus.AVAILABLE | |
| failures: int = 0 | |
| last_success: Optional[datetime] = None | |
| last_failure: Optional[datetime] = None | |
| circuit_open_until: Optional[datetime] = None | |
| def is_available(self) -> bool: | |
| """Check if provider is available""" | |
| if self.status == ProviderStatus.CIRCUIT_OPEN: | |
| if self.circuit_open_until and datetime.now(timezone.utc) > self.circuit_open_until: | |
| # Circuit breaker timeout expired, try again | |
| self.status = ProviderStatus.AVAILABLE | |
| self.failures = 0 | |
| return True | |
| return False | |
| return self.status in [ProviderStatus.AVAILABLE, ProviderStatus.DEGRADED] | |
| def record_success(self): | |
| """Record successful request""" | |
| self.failures = 0 | |
| self.last_success = datetime.now(timezone.utc) | |
| self.status = ProviderStatus.AVAILABLE | |
| def record_failure(self): | |
| """Record failed request""" | |
| self.failures += 1 | |
| self.last_failure = datetime.now(timezone.utc) | |
| if self.failures >= CIRCUIT_BREAKER_THRESHOLD: | |
| # Open circuit breaker | |
| self.status = ProviderStatus.CIRCUIT_OPEN | |
| self.circuit_open_until = datetime.now(timezone.utc).timestamp() + CIRCUIT_BREAKER_TIMEOUT | |
| logger.warning(f"Circuit breaker opened for {self.name} until {self.circuit_open_until}") | |
| elif self.failures >= 2: | |
| self.status = ProviderStatus.DEGRADED | |
| class FallbackResult: | |
| """Result from fallback attempt""" | |
| data: Optional[Any] | |
| source: str | |
| attempted: List[str] | |
| success: bool | |
| error: Optional[str] = None | |
| latency_ms: Optional[int] = None | |
| # ==================== | |
| # PROVIDER FALLBACK MANAGER | |
| # ==================== | |
| class ProviderFallbackManager: | |
| """Manages fallback to external providers with circuit breaker pattern""" | |
| def __init__(self): | |
| self.providers: List[Provider] = [] | |
| self.hf_handler = None | |
| self._load_providers() | |
| self._session: Optional[aiohttp.ClientSession] = None | |
| def _load_providers(self): | |
| """Load provider configurations from file or URL""" | |
| config_data = None | |
| # Try local file first | |
| if Path(FALLBACK_CONFIG_PATH).exists(): | |
| try: | |
| with open(FALLBACK_CONFIG_PATH, 'r') as f: | |
| content = f.read() | |
| # Handle both JSON and text format | |
| if content.strip().startswith('{'): | |
| config_data = json.loads(content) | |
| else: | |
| # Parse text format | |
| config_data = self._parse_text_config(content) | |
| logger.info(f"Loaded {len(config_data.get('providers', []))} providers from local file") | |
| except Exception as e: | |
| logger.error(f"Failed to load local config: {e}") | |
| # Try URL if configured | |
| if not config_data and FALLBACK_CONFIG_URL: | |
| try: | |
| import requests | |
| response = requests.get(FALLBACK_CONFIG_URL, timeout=5) | |
| if response.status_code == 200: | |
| config_data = response.json() | |
| logger.info(f"Loaded {len(config_data.get('providers', []))} providers from URL") | |
| except Exception as e: | |
| logger.error(f"Failed to load config from URL: {e}") | |
| # Parse providers | |
| if config_data and 'providers' in config_data: | |
| for idx, provider_config in enumerate(config_data['providers']): | |
| provider = Provider( | |
| name=provider_config.get('name', f'provider_{idx}'), | |
| base_url=provider_config.get('base_url', ''), | |
| api_key=provider_config.get('api_key') or os.getenv(f"{provider_config.get('name', '').upper()}_API_KEY"), | |
| priority=provider_config.get('priority', 100), | |
| endpoints=provider_config.get('endpoints', {}), | |
| rate_limit=provider_config.get('rate_limit') | |
| ) | |
| self.providers.append(provider) | |
| # Sort by priority (lower number = higher priority) | |
| self.providers.sort(key=lambda p: p.priority) | |
| # Add default providers if none loaded | |
| if not self.providers: | |
| self._add_default_providers() | |
| def _parse_text_config(self, content: str) -> Dict: | |
| """Parse text format config into JSON structure""" | |
| providers = [] | |
| lines = content.strip().split('\n') | |
| for line in lines: | |
| if line.strip() and not line.startswith('#'): | |
| parts = line.split(',') | |
| if len(parts) >= 2: | |
| providers.append({ | |
| 'name': parts[0].strip(), | |
| 'base_url': parts[1].strip(), | |
| 'api_key': parts[2].strip() if len(parts) > 2 else None, | |
| 'priority': int(parts[3].strip()) if len(parts) > 3 else 100 | |
| }) | |
| return {'providers': providers} | |
| def _add_default_providers(self): | |
| """Add default fallback providers""" | |
| defaults = [ | |
| Provider( | |
| name="coingecko", | |
| base_url="https://api.coingecko.com/api/v3", | |
| priority=10, | |
| endpoints={ | |
| "rate": "/simple/price", | |
| "market": "/coins/markets", | |
| "history": "/coins/{id}/market_chart" | |
| } | |
| ), | |
| Provider( | |
| name="binance", | |
| base_url="https://api.binance.com/api/v3", | |
| priority=20, | |
| endpoints={ | |
| "rate": "/ticker/price", | |
| "history": "/klines", | |
| "depth": "/depth" | |
| } | |
| ), | |
| Provider( | |
| name="coinmarketcap", | |
| base_url="https://pro-api.coinmarketcap.com/v1", | |
| api_key=os.getenv("CMC_API_KEY"), | |
| priority=30, | |
| endpoints={ | |
| "rate": "/cryptocurrency/quotes/latest", | |
| "market": "/cryptocurrency/listings/latest" | |
| } | |
| ) | |
| ] | |
| self.providers.extend(defaults) | |
| logger.info(f"Added {len(defaults)} default providers") | |
| async def _get_session(self) -> aiohttp.ClientSession: | |
| """Get or create aiohttp session""" | |
| if not self._session: | |
| self._session = aiohttp.ClientSession( | |
| timeout=aiohttp.ClientTimeout(total=TIMEOUT_SECONDS) | |
| ) | |
| return self._session | |
| async def close(self): | |
| """Close aiohttp session""" | |
| if self._session: | |
| await self._session.close() | |
| self._session = None | |
| async def _call_hf(self, endpoint: str, params: Dict = None) -> Tuple[Optional[Any], Optional[str]]: | |
| """Try to get data from HF first""" | |
| if not HF_PRIORITY: | |
| return None, None | |
| try: | |
| # This would call actual HF models/datasets | |
| # For now, HF integration is not fully implemented in this method | |
| # Return None to trigger fallback to external providers | |
| return None, "HF integration pending" | |
| except Exception as e: | |
| logger.debug(f"HF call failed: {e}") | |
| return None, str(e) | |
| async def _call_provider( | |
| self, | |
| provider: Provider, | |
| endpoint: str, | |
| params: Dict = None, | |
| method: str = "GET" | |
| ) -> Tuple[Optional[Any], Optional[str]]: | |
| """Call a specific provider""" | |
| if not provider.is_available(): | |
| return None, f"Provider {provider.name} unavailable (circuit open)" | |
| try: | |
| session = await self._get_session() | |
| # Build URL | |
| url = f"{provider.base_url}{endpoint}" | |
| # Add API key if needed | |
| headers = {} | |
| if provider.api_key: | |
| # Different providers use different auth methods | |
| if "coinmarketcap" in provider.name.lower(): | |
| headers["X-CMC_PRO_API_KEY"] = provider.api_key | |
| elif "alphavantage" in provider.name.lower(): | |
| if params is None: | |
| params = {} | |
| params["apikey"] = provider.api_key | |
| else: | |
| headers["Authorization"] = f"Bearer {provider.api_key}" | |
| # Make request | |
| start_time = datetime.now(timezone.utc) | |
| if method == "GET": | |
| async with session.get(url, params=params, headers=headers) as response: | |
| latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) | |
| if response.status == 200: | |
| data = await response.json() | |
| provider.record_success() | |
| logger.debug(f"Provider {provider.name} succeeded in {latency_ms}ms") | |
| return data, None | |
| else: | |
| error = f"HTTP {response.status}" | |
| provider.record_failure() | |
| return None, error | |
| elif method == "POST": | |
| async with session.post(url, json=params, headers=headers) as response: | |
| latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) | |
| if response.status == 200: | |
| data = await response.json() | |
| provider.record_success() | |
| logger.debug(f"Provider {provider.name} succeeded in {latency_ms}ms") | |
| return data, None | |
| else: | |
| error = f"HTTP {response.status}" | |
| provider.record_failure() | |
| return None, error | |
| except asyncio.TimeoutError: | |
| provider.record_failure() | |
| return None, "Timeout" | |
| except Exception as e: | |
| provider.record_failure() | |
| logger.error(f"Provider {provider.name} error: {e}") | |
| return None, str(e) | |
| async def fetch_with_fallback( | |
| self, | |
| endpoint: str, | |
| params: Dict = None, | |
| method: str = "GET", | |
| transform_func: callable = None | |
| ) -> FallbackResult: | |
| """ | |
| Fetch data with HF-first then fallback strategy | |
| Args: | |
| endpoint: API endpoint path | |
| params: Query parameters | |
| method: HTTP method | |
| transform_func: Function to transform provider response to standard format | |
| Returns: | |
| FallbackResult with data, source, and metadata | |
| """ | |
| attempted = [] | |
| start_time = datetime.now(timezone.utc) | |
| # 1. Try HF first | |
| if HF_PRIORITY: | |
| attempted.append("hf") | |
| hf_data, hf_error = await self._call_hf(endpoint, params) | |
| if hf_data: | |
| latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) | |
| return FallbackResult( | |
| data=hf_data, | |
| source="hf", | |
| attempted=attempted, | |
| success=True, | |
| latency_ms=latency_ms | |
| ) | |
| # 2. Try fallback providers in priority order | |
| for provider in self.providers: | |
| if not provider.is_available(): | |
| logger.debug(f"Skipping unavailable provider {provider.name}") | |
| continue | |
| attempted.append(provider.base_url) | |
| # Map endpoint to provider-specific endpoint if configured | |
| provider_endpoint = endpoint | |
| if provider.endpoints: | |
| # Find matching endpoint pattern | |
| for key, value in provider.endpoints.items(): | |
| if key in endpoint: | |
| provider_endpoint = value | |
| break | |
| # Call provider | |
| data, error = await self._call_provider( | |
| provider, | |
| provider_endpoint, | |
| params, | |
| method | |
| ) | |
| if data: | |
| # Transform data if function provided | |
| if transform_func: | |
| try: | |
| data = transform_func(data, provider.name) | |
| except Exception as e: | |
| logger.error(f"Transform failed for {provider.name}: {e}") | |
| continue | |
| latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) | |
| return FallbackResult( | |
| data=data, | |
| source=provider.base_url, | |
| attempted=attempted, | |
| success=True, | |
| latency_ms=latency_ms | |
| ) | |
| # All failed | |
| latency_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) | |
| return FallbackResult( | |
| data=None, | |
| source="none", | |
| attempted=attempted, | |
| success=False, | |
| error="All providers failed", | |
| latency_ms=latency_ms | |
| ) | |
| def get_provider_status(self) -> Dict[str, Any]: | |
| """Get current status of all providers""" | |
| status = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "providers": [] | |
| } | |
| for provider in self.providers: | |
| status["providers"].append({ | |
| "name": provider.name, | |
| "base_url": provider.base_url, | |
| "priority": provider.priority, | |
| "status": provider.status.value, | |
| "failures": provider.failures, | |
| "is_available": provider.is_available(), | |
| "last_success": provider.last_success.isoformat() if provider.last_success else None, | |
| "last_failure": provider.last_failure.isoformat() if provider.last_failure else None, | |
| "circuit_open_until": provider.circuit_open_until if provider.circuit_open_until else None | |
| }) | |
| # Count available providers | |
| available_count = sum(1 for p in self.providers if p.is_available()) | |
| status["available_providers"] = available_count | |
| status["total_providers"] = len(self.providers) | |
| status["hf_priority"] = HF_PRIORITY | |
| return status | |
| def reset_provider(self, provider_name: str) -> bool: | |
| """Reset a specific provider's circuit breaker""" | |
| for provider in self.providers: | |
| if provider.name == provider_name: | |
| provider.status = ProviderStatus.AVAILABLE | |
| provider.failures = 0 | |
| provider.circuit_open_until = None | |
| logger.info(f"Reset provider {provider_name}") | |
| return True | |
| return False | |
| def reset_all_providers(self): | |
| """Reset all providers' circuit breakers""" | |
| for provider in self.providers: | |
| provider.status = ProviderStatus.AVAILABLE | |
| provider.failures = 0 | |
| provider.circuit_open_until = None | |
| logger.info("Reset all providers") | |
| # ==================== | |
| # TRANSFORM FUNCTIONS | |
| # ==================== | |
| def transform_coingecko_rate(data: Dict, provider: str) -> Dict: | |
| """Transform CoinGecko rate response to standard format""" | |
| # CoinGecko returns: {"bitcoin": {"usd": 50000}} | |
| if data and isinstance(data, dict): | |
| for coin, prices in data.items(): | |
| for currency, price in prices.items(): | |
| return { | |
| "pair": f"{coin.upper()}/{currency.upper()}", | |
| "price": price, | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| } | |
| return data | |
| def transform_binance_rate(data: Dict, provider: str) -> Dict: | |
| """Transform Binance rate response to standard format""" | |
| # Binance returns: {"symbol": "BTCUSDT", "price": "50000.00"} | |
| if data and "symbol" in data: | |
| return { | |
| "pair": f"{data['symbol'][:-4]}/{data['symbol'][-4:]}", # Assumes 4-char quote | |
| "price": float(data["price"]), | |
| "ts": datetime.now(timezone.utc).isoformat() | |
| } | |
| return data | |
| # ==================== | |
| # SINGLETON INSTANCE | |
| # ==================== | |
| # Create singleton instance | |
| fallback_manager = ProviderFallbackManager() | |
| # Export for use in routers | |
| __all__ = [ | |
| 'ProviderFallbackManager', | |
| 'FallbackResult', | |
| 'Provider', | |
| 'ProviderStatus', | |
| 'fallback_manager', | |
| 'transform_coingecko_rate', | |
| 'transform_binance_rate' | |
| ] |