| """Data aggregator with multi-provider fallback""" |
| from __future__ import annotations |
| from typing import List, Optional |
| from datetime import datetime |
| import time |
| import logging |
| import sys |
| import os |
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) |
|
|
| from providers import BinanceProvider, CoinGeckoProvider, KrakenProvider, CoinCapProvider |
| from core.models import ( |
| OHLCV, Price, SentimentData, FearGreedIndex, NewsSentiment, |
| OverallSentiment, MarketOverview, ProviderHealth |
| ) |
| from core.config import settings |
| from core.cache import cache, cache_key, get_or_set |
| import httpx |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DataAggregator: |
| """Aggregates data from multiple providers with fallback""" |
|
|
| def __init__(self): |
| |
| self.ohlcv_providers = [ |
| BinanceProvider(), |
| KrakenProvider(), |
| ] |
|
|
| self.price_providers = [ |
| CoinGeckoProvider(api_key=settings.COINGECKO_API_KEY), |
| CoinCapProvider(), |
| BinanceProvider(), |
| ] |
|
|
| self.market_provider = CoinGeckoProvider(api_key=settings.COINGECKO_API_KEY) |
|
|
| self.start_time = time.time() |
|
|
| async def close(self): |
| """Close all provider connections""" |
| for provider in self.ohlcv_providers + self.price_providers: |
| await provider.close() |
|
|
| async def fetch_ohlcv( |
| self, |
| symbol: str, |
| interval: str = "1h", |
| limit: int = 100 |
| ) -> tuple[List[OHLCV], str]: |
| """Fetch OHLCV data with provider fallback""" |
|
|
| |
| for provider in self.ohlcv_providers: |
| try: |
| logger.info(f"Trying {provider.name} for OHLCV data: {symbol} {interval}") |
| data = await provider.fetch_ohlcv(symbol, interval, limit) |
|
|
| if data and len(data) > 0: |
| logger.info(f"Successfully fetched {len(data)} candles from {provider.name}") |
| return data, provider.name |
|
|
| except Exception as e: |
| logger.warning(f"Provider {provider.name} failed: {e}") |
| continue |
|
|
| raise Exception("All OHLCV providers failed") |
|
|
| async def fetch_prices(self, symbols: List[str]) -> tuple[List[Price], str]: |
| """Fetch prices with aggregation from multiple providers""" |
|
|
| all_prices = {} |
| sources_used = [] |
|
|
| |
| for provider in self.price_providers: |
| try: |
| logger.info(f"Fetching prices from {provider.name}") |
| prices = await provider.fetch_prices(symbols) |
|
|
| for price in prices: |
| if price.symbol not in all_prices: |
| all_prices[price.symbol] = [] |
| all_prices[price.symbol].append((provider.name, price)) |
|
|
| sources_used.append(provider.name) |
|
|
| except Exception as e: |
| logger.warning(f"Provider {provider.name} failed for prices: {e}") |
| continue |
|
|
| if not all_prices: |
| raise Exception("All price providers failed") |
|
|
| |
| aggregated = [] |
| for symbol, price_list in all_prices.items(): |
| if price_list: |
| |
| |
| _, price = price_list[0] |
| aggregated.append(price) |
|
|
| source_str = "+".join(sources_used) if sources_used else "multi-provider" |
|
|
| return aggregated, source_str |
|
|
| async def fetch_fear_greed_index(self) -> FearGreedIndex: |
| """Fetch Fear & Greed Index from Alternative.me""" |
| try: |
| async with httpx.AsyncClient(timeout=10) as client: |
| response = await client.get("https://api.alternative.me/fng/") |
| data = response.json() |
|
|
| if "data" in data and len(data["data"]) > 0: |
| fng_data = data["data"][0] |
| return FearGreedIndex( |
| value=int(fng_data["value"]), |
| classification=fng_data["value_classification"], |
| timestamp=datetime.now().isoformat() |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Failed to fetch Fear & Greed Index: {e}") |
|
|
| |
| return FearGreedIndex( |
| value=50, |
| classification="Neutral", |
| timestamp=datetime.now().isoformat() |
| ) |
|
|
| async def fetch_sentiment(self) -> SentimentData: |
| """Fetch sentiment data""" |
| fear_greed = await self.fetch_fear_greed_index() |
|
|
| |
| if fear_greed.value >= 75: |
| sentiment = "extreme_greed" |
| score = fear_greed.value |
| elif fear_greed.value >= 55: |
| sentiment = "bullish" |
| score = fear_greed.value |
| elif fear_greed.value >= 45: |
| sentiment = "neutral" |
| score = fear_greed.value |
| elif fear_greed.value >= 25: |
| sentiment = "bearish" |
| score = fear_greed.value |
| else: |
| sentiment = "extreme_fear" |
| score = fear_greed.value |
|
|
| return SentimentData( |
| fearGreed=fear_greed, |
| news=NewsSentiment(total=0), |
| overall=OverallSentiment( |
| sentiment=sentiment, |
| score=score, |
| confidence=0.8 |
| ) |
| ) |
|
|
| async def fetch_market_overview(self) -> MarketOverview: |
| """Fetch market overview data""" |
| try: |
| market_data = await self.market_provider.fetch_market_data() |
|
|
| return MarketOverview( |
| totalMarketCap=market_data.get("total_market_cap", {}).get("usd", 0), |
| totalVolume24h=market_data.get("total_volume", {}).get("usd", 0), |
| btcDominance=market_data.get("market_cap_percentage", {}).get("btc", 0), |
| ethDominance=market_data.get("market_cap_percentage", {}).get("eth", 0), |
| activeCoins=market_data.get("active_cryptocurrencies", 0) |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Failed to fetch market overview: {e}") |
| |
| return MarketOverview( |
| totalMarketCap=0, |
| totalVolume24h=0, |
| btcDominance=0, |
| ethDominance=0, |
| activeCoins=0 |
| ) |
|
|
| async def get_all_provider_health(self) -> List[ProviderHealth]: |
| """Get health status of all providers""" |
| all_providers = set(self.ohlcv_providers + self.price_providers + [self.market_provider]) |
| health_list = [] |
|
|
| for provider in all_providers: |
| health = await provider.get_health() |
| health_list.append(health) |
|
|
| return health_list |
|
|
| def get_uptime(self) -> int: |
| """Get service uptime in seconds""" |
| return int(time.time() - self.start_time) |
|
|
|
|
| |
| aggregator: Optional[DataAggregator] = None |
|
|
|
|
| def get_aggregator() -> DataAggregator: |
| """Get global aggregator instance""" |
| global aggregator |
| if aggregator is None: |
| aggregator = DataAggregator() |
| return aggregator |
|
|