Datasourceforcryptocurrency-2 / backend /services /enhanced_provider_manager.py
Cursor Agent
feat: Implement enhanced provider manager for load balancing
6358ba6
#!/usr/bin/env python3
"""
Enhanced Provider Manager - Universal Load Balancing for All Data Types
Extends the existing intelligent provider service to support:
- Market data (prices, charts, OHLCV)
- News (crypto news feeds)
- Sentiment (Fear & Greed, social sentiment)
- AI/Predictions (analysis, sentiment analysis)
- Technical data (indicators, correlations)
- Metadata (exchanges, coin lists)
Integrates:
- Binance DNS connector (multi-endpoint failover)
- Render.com backup service
- CoinGecko, CoinPaprika, CoinCap
- CryptoCompare, Alternative.me
- RSS feeds
"""
import asyncio
import logging
import time
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import httpx
from backend.services.binance_dns_connector import get_binance_connector, BinanceDNSConnector
from backend.services.crypto_dt_source_client import get_crypto_dt_source_service
logger = logging.getLogger(__name__)
class ProviderStatus(Enum):
"""Provider health status"""
HEALTHY = "healthy"
DEGRADED = "degraded"
DOWN = "down"
class DataCategory(Enum):
"""Data category types"""
MARKET_PRICE = "market_price" # Real-time prices
MARKET_OHLCV = "market_ohlcv" # Candlestick/historical data
MARKET_VOLUME = "market_volume" # Trading volume data
MARKET_ORDERBOOK = "market_orderbook" # Order book depth
MARKET_METADATA = "market_metadata" # Exchanges, coins list
NEWS = "news" # Crypto news
SENTIMENT = "sentiment" # Fear & Greed, sentiment
AI_PREDICTION = "ai_prediction" # Price predictions
TECHNICAL = "technical" # Technical indicators
SOCIAL = "social" # Social media data
@dataclass
class Provider:
"""Provider configuration and health tracking"""
name: str
category: DataCategory
priority: int # 1=highest, 2=backup, 3=fallback, 4=ultimate fallback
fetch_func: Callable # Async function to fetch data
status: ProviderStatus = ProviderStatus.HEALTHY
last_check: Optional[datetime] = None
response_time: float = 0.0
success_count: int = 0
failure_count: int = 0
consecutive_failures: int = 0
backoff_until: float = 0
cache_duration: int = 30 # seconds
@property
def success_rate(self) -> float:
"""Calculate success rate percentage"""
total = self.success_count + self.failure_count
if total == 0:
return 100.0
return (self.success_count / total) * 100
@property
def is_available(self) -> bool:
"""Check if provider is available (not in backoff)"""
return time.time() >= self.backoff_until
@property
def load_score(self) -> float:
"""Calculate load score (lower is better)"""
now = time.time()
score = 100 - self.success_rate
# Penalty for recent failures
score += self.consecutive_failures * 10
# Penalty for being in backoff
if not self.is_available:
score += 1000
return score
class EnhancedProviderManager:
"""
Universal provider manager with intelligent load balancing
Features:
- Category-based provider registration
- Round-robin with health-based selection
- Circuit breaker pattern
- Exponential backoff
- Multi-provider failover
- Binance DNS failover integration
- Render.com ultimate fallback
"""
def __init__(self):
self.providers: Dict[DataCategory, List[Provider]] = {
category: [] for category in DataCategory
}
self.circuit_breaker_threshold = 3
self.binance_connector = get_binance_connector(use_us=False)
self.render_service = get_crypto_dt_source_service()
logger.info("🚀 Enhanced Provider Manager initialized")
# Auto-register all providers
self._register_all_providers()
def _register_all_providers(self):
"""Register all available providers for each category"""
# ===== MARKET PRICE PROVIDERS =====
self.register_provider(Provider(
name="Binance",
category=DataCategory.MARKET_PRICE,
priority=1,
fetch_func=self._fetch_binance_price,
cache_duration=10
))
self.register_provider(Provider(
name="CoinCap",
category=DataCategory.MARKET_PRICE,
priority=2,
fetch_func=self._fetch_coincap_price,
cache_duration=30
))
self.register_provider(Provider(
name="CoinGecko",
category=DataCategory.MARKET_PRICE,
priority=2,
fetch_func=self._fetch_coingecko_price,
cache_duration=60
))
self.register_provider(Provider(
name="Render-Backup",
category=DataCategory.MARKET_PRICE,
priority=4,
fetch_func=self._fetch_render_price,
cache_duration=30
))
# ===== MARKET OHLCV PROVIDERS =====
self.register_provider(Provider(
name="Binance",
category=DataCategory.MARKET_OHLCV,
priority=1,
fetch_func=self._fetch_binance_ohlcv,
cache_duration=60
))
self.register_provider(Provider(
name="CryptoCompare",
category=DataCategory.MARKET_OHLCV,
priority=2,
fetch_func=self._fetch_cryptocompare_ohlcv,
cache_duration=60
))
self.register_provider(Provider(
name="Render-Backup",
category=DataCategory.MARKET_OHLCV,
priority=4,
fetch_func=self._fetch_render_ohlcv,
cache_duration=60
))
# ===== MARKET VOLUME PROVIDERS =====
self.register_provider(Provider(
name="Binance",
category=DataCategory.MARKET_VOLUME,
priority=1,
fetch_func=self._fetch_binance_volume,
cache_duration=30
))
# ===== MARKET ORDERBOOK PROVIDERS =====
self.register_provider(Provider(
name="Binance",
category=DataCategory.MARKET_ORDERBOOK,
priority=1,
fetch_func=self._fetch_binance_orderbook,
cache_duration=5
))
# ===== MARKET METADATA PROVIDERS =====
self.register_provider(Provider(
name="CoinGecko",
category=DataCategory.MARKET_METADATA,
priority=1,
fetch_func=self._fetch_coingecko_metadata,
cache_duration=3600 # 1 hour
))
self.register_provider(Provider(
name="CoinPaprika",
category=DataCategory.MARKET_METADATA,
priority=2,
fetch_func=self._fetch_coinpaprika_metadata,
cache_duration=3600
))
# ===== NEWS PROVIDERS =====
self.register_provider(Provider(
name="CryptoCompare",
category=DataCategory.NEWS,
priority=1,
fetch_func=self._fetch_cryptocompare_news,
cache_duration=300 # 5 min
))
self.register_provider(Provider(
name="Render-Backup",
category=DataCategory.NEWS,
priority=3,
fetch_func=self._fetch_render_news,
cache_duration=300
))
# ===== SENTIMENT PROVIDERS =====
self.register_provider(Provider(
name="Alternative.me",
category=DataCategory.SENTIMENT,
priority=1,
fetch_func=self._fetch_alternative_sentiment,
cache_duration=3600 # 1 hour
))
self.register_provider(Provider(
name="Render-Backup",
category=DataCategory.SENTIMENT,
priority=3,
fetch_func=self._fetch_render_sentiment,
cache_duration=3600
))
logger.info(f"✅ Registered providers for {len(self.providers)} categories")
def register_provider(self, provider: Provider):
"""Register a provider for a specific category"""
self.providers[provider.category].append(provider)
logger.debug(f"📝 Registered {provider.name} for {provider.category.value} (priority {provider.priority})")
async def fetch_data(
self,
category: DataCategory,
**kwargs
) -> Optional[Dict[str, Any]]:
"""
Fetch data with intelligent provider selection and failover
Args:
category: Data category to fetch
**kwargs: Parameters to pass to provider fetch function
Returns:
Data dict with provider info or None if all failed
"""
providers = self.providers.get(category, [])
if not providers:
logger.error(f"❌ No providers registered for {category.value}")
return None
# Sort by priority and load score
sorted_providers = sorted(
[p for p in providers if p.is_available],
key=lambda p: (p.priority, p.load_score)
)
if not sorted_providers:
logger.warning(f"⚠️ All providers for {category.value} in backoff!")
sorted_providers = sorted(providers, key=lambda p: p.backoff_until)
# Try each provider in order
for provider in sorted_providers:
start_time = time.time()
try:
logger.debug(f"🔄 Trying {provider.name} for {category.value}...")
result = await provider.fetch_func(**kwargs)
if result:
response_time = time.time() - start_time
self._record_success(provider, response_time)
return {
"success": True,
"data": result,
"provider": provider.name,
"category": category.value,
"response_time": response_time,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ {provider.name} failed for {category.value}: {e}")
self._record_failure(provider, str(e))
logger.error(f"❌ All providers failed for {category.value}")
return {
"success": False,
"data": None,
"error": "All providers failed",
"category": category.value,
"timestamp": datetime.now().isoformat()
}
def _record_success(self, provider: Provider, response_time: float):
"""Record successful request"""
provider.consecutive_failures = 0
provider.success_count += 1
provider.status = ProviderStatus.HEALTHY
provider.response_time = response_time
provider.last_check = datetime.now()
provider.backoff_until = 0
logger.info(
f"✅ {provider.name} ({provider.category.value}): "
f"{response_time*1000:.0f}ms, {provider.success_rate:.1f}% success"
)
def _record_failure(self, provider: Provider, error: str):
"""Record failed request with exponential backoff"""
provider.consecutive_failures += 1
provider.failure_count += 1
provider.last_check = datetime.now()
# Exponential backoff: 2^failures seconds (max 300s)
backoff_duration = min(2 ** provider.consecutive_failures, 300)
provider.backoff_until = time.time() + backoff_duration
if provider.consecutive_failures >= self.circuit_breaker_threshold:
provider.status = ProviderStatus.DOWN
else:
provider.status = ProviderStatus.DEGRADED
logger.warning(
f"❌ {provider.name} ({provider.category.value}): {error} "
f"(failures: {provider.consecutive_failures}, backoff: {backoff_duration}s)"
)
# ===== PROVIDER FETCH FUNCTIONS =====
async def _fetch_binance_price(self, symbol: str = "BTCUSDT") -> Optional[Dict]:
"""Fetch price from Binance with DNS failover"""
result = await self.binance_connector.get(
"/api/v3/ticker/price",
params={"symbol": symbol}
)
return result
async def _fetch_binance_ohlcv(
self,
symbol: str = "BTCUSDT",
interval: str = "1h",
limit: int = 100
) -> Optional[Dict]:
"""Fetch OHLCV from Binance"""
result = await self.binance_connector.get(
"/api/v3/klines",
params={"symbol": symbol, "interval": interval, "limit": limit}
)
return result
async def _fetch_binance_volume(self, symbol: Optional[str] = None) -> Optional[Dict]:
"""Fetch volume data from Binance"""
params = {"symbol": f"{symbol}USDT"} if symbol else {}
result = await self.binance_connector.get("/api/v3/ticker/24hr", params=params)
return result
async def _fetch_binance_orderbook(
self,
symbol: str = "BTCUSDT",
limit: int = 100
) -> Optional[Dict]:
"""Fetch orderbook from Binance"""
result = await self.binance_connector.get(
"/api/v3/depth",
params={"symbol": symbol, "limit": limit}
)
return result
async def _fetch_coincap_price(self, coin_id: str = "bitcoin") -> Optional[Dict]:
"""Fetch price from CoinCap"""
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"https://api.coincap.io/v2/assets/{coin_id}")
response.raise_for_status()
return response.json()
async def _fetch_coingecko_price(self, coin_id: str = "bitcoin") -> Optional[Dict]:
"""Fetch price from CoinGecko"""
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"https://api.coingecko.com/api/v3/simple/price",
params={"ids": coin_id, "vs_currencies": "usd"}
)
response.raise_for_status()
return response.json()
async def _fetch_coingecko_metadata(self, data_type: str = "exchanges") -> Optional[Dict]:
"""Fetch metadata from CoinGecko"""
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(f"https://api.coingecko.com/api/v3/{data_type}")
response.raise_for_status()
return response.json()
async def _fetch_coinpaprika_metadata(self, data_type: str = "coins") -> Optional[Dict]:
"""Fetch metadata from CoinPaprika"""
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(f"https://api.coinpaprika.com/v1/{data_type}")
response.raise_for_status()
return response.json()
async def _fetch_cryptocompare_ohlcv(
self,
symbol: str = "BTC",
interval: str = "hour",
limit: int = 100
) -> Optional[Dict]:
"""Fetch OHLCV from CryptoCompare"""
async with httpx.AsyncClient(timeout=10.0) as client:
endpoint = f"histo{interval}" if interval in ["day", "hour", "minute"] else "histohour"
response = await client.get(
f"https://min-api.cryptocompare.com/data/v2/{endpoint}",
params={"fsym": symbol, "tsym": "USD", "limit": limit}
)
response.raise_for_status()
return response.json()
async def _fetch_cryptocompare_news(self, limit: int = 50) -> Optional[Dict]:
"""Fetch news from CryptoCompare"""
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"https://min-api.cryptocompare.com/data/v2/news/",
params={"lang": "EN"}
)
response.raise_for_status()
return response.json()
async def _fetch_alternative_sentiment(self, limit: int = 1) -> Optional[Dict]:
"""Fetch Fear & Greed Index from Alternative.me"""
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"https://api.alternative.me/fng/",
params={"limit": limit}
)
response.raise_for_status()
return response.json()
async def _fetch_render_price(self, coin_id: str = "bitcoin") -> Optional[Dict]:
"""Fetch price from Render backup service"""
result = await self.render_service.get_coingecko_price(ids=coin_id)
return result.get("data") if result.get("success") else None
async def _fetch_render_ohlcv(
self,
symbol: str = "BTCUSDT",
interval: str = "1h",
limit: int = 100
) -> Optional[Dict]:
"""Fetch OHLCV from Render backup service"""
result = await self.render_service.get_binance_klines(
symbol=symbol,
interval=interval,
limit=limit
)
return result.get("data") if result.get("success") else None
async def _fetch_render_news(self, feed_name: str = "coindesk", limit: int = 20) -> Optional[Dict]:
"""Fetch news from Render backup service"""
result = await self.render_service.get_rss_feed(feed_name=feed_name, limit=limit)
return result.get("data") if result.get("success") else None
async def _fetch_render_sentiment(self, limit: int = 1) -> Optional[Dict]:
"""Fetch sentiment from Render backup service"""
result = await self.render_service.get_fear_greed_index(limit=limit)
return result.get("data") if result.get("success") else None
def get_provider_health(self) -> Dict[str, Any]:
"""Get health status of all providers"""
health_data = {}
for category, providers in self.providers.items():
health_data[category.value] = [
{
"name": p.name,
"priority": p.priority,
"status": p.status.value,
"available": p.is_available,
"success_rate": f"{p.success_rate:.1f}%",
"consecutive_failures": p.consecutive_failures,
"response_time_ms": f"{p.response_time*1000:.0f}",
"last_check": p.last_check.isoformat() if p.last_check else None
}
for p in sorted(providers, key=lambda x: x.priority)
]
# Add Binance connector health
binance_health = self.binance_connector.get_health_status()
health_data["binance_dns"] = binance_health
return health_data
# ===== GLOBAL INSTANCE =====
_enhanced_provider_manager: Optional[EnhancedProviderManager] = None
def get_enhanced_provider_manager() -> EnhancedProviderManager:
"""Get singleton instance of enhanced provider manager"""
global _enhanced_provider_manager
if _enhanced_provider_manager is None:
_enhanced_provider_manager = EnhancedProviderManager()
return _enhanced_provider_manager
# ===== CONVENIENCE FUNCTIONS =====
async def fetch_market_price(symbol: str = "BTCUSDT") -> Optional[Dict]:
"""Fetch market price with intelligent failover"""
manager = get_enhanced_provider_manager()
return await manager.fetch_data(DataCategory.MARKET_PRICE, symbol=symbol)
async def fetch_market_ohlcv(symbol: str = "BTCUSDT", interval: str = "1h", limit: int = 100) -> Optional[Dict]:
"""Fetch OHLCV data with intelligent failover"""
manager = get_enhanced_provider_manager()
return await manager.fetch_data(
DataCategory.MARKET_OHLCV,
symbol=symbol,
interval=interval,
limit=limit
)
async def fetch_news(limit: int = 50) -> Optional[Dict]:
"""Fetch news with intelligent failover"""
manager = get_enhanced_provider_manager()
return await manager.fetch_data(DataCategory.NEWS, limit=limit)
async def fetch_sentiment(limit: int = 1) -> Optional[Dict]:
"""Fetch sentiment with intelligent failover"""
manager = get_enhanced_provider_manager()
return await manager.fetch_data(DataCategory.SENTIMENT, limit=limit)