| """ |
| API Fallback Manager |
| Automatically switches to alternative API providers when primary fails |
| """ |
|
|
| import asyncio |
| import logging |
| from typing import Dict, List, Any, Optional, Callable |
| from datetime import datetime, timedelta |
| from enum import Enum |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ProviderStatus(Enum): |
| """Provider status""" |
| ACTIVE = "active" |
| DEGRADED = "degraded" |
| FAILED = "failed" |
| COOLDOWN = "cooldown" |
|
|
|
|
| class APIProvider: |
| """Represents an API provider with health tracking""" |
| |
| def __init__( |
| self, |
| name: str, |
| priority: int, |
| fetch_function: Callable, |
| cooldown_seconds: int = 300, |
| max_failures: int = 3 |
| ): |
| self.name = name |
| self.priority = priority |
| self.fetch_function = fetch_function |
| self.cooldown_seconds = cooldown_seconds |
| self.max_failures = max_failures |
| |
| self.failures = 0 |
| self.total_requests = 0 |
| self.successful_requests = 0 |
| self.status = ProviderStatus.ACTIVE |
| self.last_failure_time = None |
| self.last_success_time = None |
| |
| def record_success(self): |
| """Record successful request""" |
| self.successful_requests += 1 |
| self.total_requests += 1 |
| self.failures = 0 |
| self.status = ProviderStatus.ACTIVE |
| self.last_success_time = datetime.now() |
| logger.info(f"✅ {self.name}: Success (total: {self.successful_requests}/{self.total_requests})") |
| |
| def record_failure(self, error: Exception): |
| """Record failed request""" |
| self.failures += 1 |
| self.total_requests += 1 |
| self.last_failure_time = datetime.now() |
| |
| if self.failures >= self.max_failures: |
| self.status = ProviderStatus.COOLDOWN |
| logger.warning( |
| f"❌ {self.name}: Entering cooldown after {self.failures} failures. " |
| f"Last error: {str(error)}" |
| ) |
| else: |
| self.status = ProviderStatus.DEGRADED |
| logger.warning(f"⚠️ {self.name}: Failure {self.failures}/{self.max_failures} - {str(error)}") |
| |
| def is_available(self) -> bool: |
| """Check if provider is available""" |
| if self.status == ProviderStatus.COOLDOWN: |
| |
| if self.last_failure_time: |
| cooldown_end = self.last_failure_time + timedelta(seconds=self.cooldown_seconds) |
| if datetime.now() >= cooldown_end: |
| self.status = ProviderStatus.ACTIVE |
| self.failures = 0 |
| logger.info(f"🔄 {self.name}: Cooldown ended, provider reactivated") |
| return True |
| return False |
| |
| return self.status in [ProviderStatus.ACTIVE, ProviderStatus.DEGRADED] |
| |
| def get_health_score(self) -> float: |
| """Get health score (0-100)""" |
| if self.total_requests == 0: |
| return 100.0 |
| return (self.successful_requests / self.total_requests) * 100 |
|
|
|
|
| class APIFallbackManager: |
| """ |
| Manages API fallback across multiple providers |
| |
| Usage: |
| manager = APIFallbackManager("OHLCV") |
| manager.add_provider("Binance", 1, fetch_binance_ohlcv) |
| manager.add_provider("CoinGecko", 2, fetch_coingecko_ohlcv) |
| |
| result = await manager.fetch_with_fallback(symbol="BTC", timeframe="1h") |
| """ |
| |
| def __init__(self, service_name: str): |
| self.service_name = service_name |
| self.providers: List[APIProvider] = [] |
| logger.info(f"📡 Initialized fallback manager for {service_name}") |
| |
| def add_provider( |
| self, |
| name: str, |
| priority: int, |
| fetch_function: Callable, |
| cooldown_seconds: int = 300, |
| max_failures: int = 3 |
| ): |
| """Add a provider to the fallback chain""" |
| provider = APIProvider(name, priority, fetch_function, cooldown_seconds, max_failures) |
| self.providers.append(provider) |
| |
| self.providers.sort(key=lambda p: p.priority) |
| logger.info(f"✅ Added provider '{name}' (priority: {priority}) to {self.service_name}") |
| |
| async def fetch_with_fallback(self, **kwargs) -> Dict[str, Any]: |
| """ |
| Fetch data with automatic fallback |
| |
| Args: |
| **kwargs: Parameters to pass to fetch functions |
| |
| Returns: |
| Dict with: |
| - success: bool |
| - data: Any (if successful) |
| - provider: str (which provider succeeded) |
| - attempts: List of attempts |
| - error: str (if all failed) |
| """ |
| attempts = [] |
| last_error = None |
| |
| for provider in self.providers: |
| if not provider.is_available(): |
| attempts.append({ |
| "provider": provider.name, |
| "status": "skipped", |
| "reason": f"Provider in {provider.status.value} state" |
| }) |
| continue |
| |
| try: |
| logger.info(f"🔄 {self.service_name}: Trying {provider.name}...") |
| start_time = datetime.now() |
| |
| |
| data = await provider.fetch_function(**kwargs) |
| |
| duration = (datetime.now() - start_time).total_seconds() |
| provider.record_success() |
| |
| attempts.append({ |
| "provider": provider.name, |
| "status": "success", |
| "duration": duration |
| }) |
| |
| logger.info( |
| f"✅ {self.service_name}: {provider.name} succeeded in {duration:.2f}s" |
| ) |
| |
| return { |
| "success": True, |
| "data": data, |
| "provider": provider.name, |
| "attempts": attempts, |
| "health_score": provider.get_health_score() |
| } |
| |
| except Exception as e: |
| last_error = e |
| provider.record_failure(e) |
| |
| attempts.append({ |
| "provider": provider.name, |
| "status": "failed", |
| "error": str(e), |
| "error_type": type(e).__name__ |
| }) |
| |
| logger.warning( |
| f"❌ {self.service_name}: {provider.name} failed - {str(e)}" |
| ) |
| |
| |
| logger.error( |
| f"🚨 {self.service_name}: ALL PROVIDERS FAILED! " |
| f"Tried {len(attempts)} provider(s)" |
| ) |
| |
| return { |
| "success": False, |
| "data": None, |
| "provider": None, |
| "attempts": attempts, |
| "error": f"All providers failed. Last error: {str(last_error)}" |
| } |
| |
| def get_status(self) -> Dict[str, Any]: |
| """Get status of all providers""" |
| return { |
| "service": self.service_name, |
| "providers": [ |
| { |
| "name": p.name, |
| "priority": p.priority, |
| "status": p.status.value, |
| "health_score": p.get_health_score(), |
| "total_requests": p.total_requests, |
| "successful_requests": p.successful_requests, |
| "failures": p.failures, |
| "available": p.is_available() |
| } |
| for p in self.providers |
| ] |
| } |
|
|
|
|
| |
|
|
| async def example_ohlcv_binance(symbol: str, timeframe: str, limit: int = 100): |
| """Example: Fetch from Binance""" |
| from backend.services.binance_client import BinanceClient |
| client = BinanceClient() |
| return await client.get_ohlcv(symbol, timeframe=timeframe, limit=limit) |
|
|
|
|
| async def example_ohlcv_coingecko(symbol: str, timeframe: str, limit: int = 100): |
| """Example: Fetch from CoinGecko (would need implementation)""" |
| |
| raise NotImplementedError("CoinGecko OHLCV not implemented yet") |
|
|
|
|
| async def example_news_newsapi(q: str, **kwargs): |
| """Example: Fetch news from NewsAPI""" |
| import httpx |
| api_key = "968a5e25552b4cb5ba3280361d8444ab" |
| url = f"https://newsapi.org/v2/everything?q={q}&sortBy=publishedAt&apiKey={api_key}" |
| async with httpx.AsyncClient() as client: |
| response = await client.get(url, timeout=10.0) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| async def example_news_cryptocompare(q: str, **kwargs): |
| """Example: Fetch news from CryptoCompare""" |
| import httpx |
| url = f"https://min-api.cryptocompare.com/data/v2/news/?categories={q}" |
| async with httpx.AsyncClient() as client: |
| response = await client.get(url, timeout=10.0) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| |
| _managers: Dict[str, APIFallbackManager] = {} |
|
|
|
|
| def get_fallback_manager(service_name: str) -> APIFallbackManager: |
| """Get or create a fallback manager for a service""" |
| if service_name not in _managers: |
| _managers[service_name] = APIFallbackManager(service_name) |
| return _managers[service_name] |
|
|
|
|
| def get_all_managers_status() -> Dict[str, Any]: |
| """Get status of all fallback managers""" |
| return { |
| name: manager.get_status() |
| for name, manager in _managers.items() |
| } |
|
|
|
|