| |
| |
| |
| |
| @@ -1,23 +1,18 @@ |
| -""" |
| -WebSocket Data Broadcaster |
| -Broadcasts real-time cryptocurrency data from database to connected clients |
| -""" |
| - |
| import asyncio |
| import logging |
| from datetime import datetime |
| from typing import Dict, Any |
| |
| -from database.db_manager import db_manager |
| +from backend.orchestration.provider_manager import provider_manager |
| from backend.services.ws_service_manager import ws_manager, ServiceType |
| from utils.logger import setup_logger |
| |
| logger = setup_logger("ws_data_broadcaster") |
| |
| - |
| class DataBroadcaster: |
| """ |
| Broadcasts cryptocurrency data updates to WebSocket clients |
| + using the Provider Orchestrator for data fetching. |
| """ |
| |
| def __init__(self): |
| @@ -37,7 +32,6 @@ class DataBroadcaster: |
| self.broadcast_market_data(), |
| self.broadcast_news(), |
| self.broadcast_sentiment(), |
| - self.broadcast_whales(), |
| self.broadcast_gas_prices() |
| ] |
| |
| @@ -59,25 +53,49 @@ class DataBroadcaster: |
| |
| while self.is_running: |
| try: |
| - prices = db_manager.get_latest_prices(limit=50) |
| - |
| - if prices: |
| + # Use Orchestrator to fetch market data |
| + # Using 30s TTL to prevent provider spam, but broadcast often |
| + response = await provider_manager.fetch_data( |
| + "market", |
| + params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"}, |
| + use_cache=True, |
| + ttl=10 # Short TTL for live prices if provider allows |
| + ) |
| + |
| + if response["success"] and response["data"]: |
| + coins = response["data"] |
| + |
| # Format data for broadcast |
| + prices = {} |
| + price_changes = {} |
| + volumes = {} |
| + market_caps = {} |
| + |
| + for coin in coins: |
| + symbol = coin.get("symbol", "").upper() |
| + prices[symbol] = coin.get("current_price") |
| + price_changes[symbol] = coin.get("price_change_percentage_24h") |
| + volumes[symbol] = coin.get("total_volume") |
| + market_caps[symbol] = coin.get("market_cap") |
| + |
| data = { |
| "type": "market_data", |
| "data": { |
| - "prices": {p.symbol: p.price_usd for p in prices}, |
| - "volumes": {p.symbol: p.volume_24h for p in prices if p.volume_24h}, |
| - "market_caps": {p.symbol: p.market_cap for p in prices if p.market_cap}, |
| - "price_changes": {p.symbol: p.price_change_24h for p in prices if p.price_change_24h} |
| + "prices": prices, |
| + "volumes": volumes, |
| + "market_caps": market_caps, |
| + "price_changes": price_changes |
| }, |
| - "count": len(prices), |
| - "timestamp": datetime.utcnow().isoformat() |
| + "count": len(coins), |
| + "timestamp": datetime.utcnow().isoformat(), |
| + "source": response["source"] |
| } |
| |
| + # Diff check could be here (optimization) |
| + |
| # Broadcast to subscribed clients |
| await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data) |
| - logger.debug(f"Broadcasted {len(prices)} price updates") |
| + logger.debug(f"Broadcasted {len(coins)} price updates from {response['source']}") |
| |
| except Exception as e: |
| logger.error(f"Error broadcasting market data: {e}", exc_info=True) |
| @@ -87,113 +105,98 @@ class DataBroadcaster: |
| async def broadcast_news(self): |
| """Broadcast news updates""" |
| logger.info("Starting news broadcast...") |
| - last_news_id = 0 |
| - |
| + |
| while self.is_running: |
| try: |
| - news = db_manager.get_latest_news(limit=10) |
| - |
| - if news and (not last_news_id or news[0].id != last_news_id): |
| - # New news available |
| - last_news_id = news[0].id |
| - |
| - data = { |
| - "type": "news", |
| - "data": { |
| - "articles": [ |
| - { |
| - "id": article.id, |
| - "title": article.title, |
| - "source": article.source, |
| - "url": article.url, |
| - "published_at": article.published_at.isoformat(), |
| - "sentiment": article.sentiment |
| - } |
| - for article in news[:5] # Only send 5 latest |
| - ] |
| - }, |
| - "count": len(news[:5]), |
| - "timestamp": datetime.utcnow().isoformat() |
| - } |
| - |
| - await ws_manager.broadcast_to_service(ServiceType.NEWS, data) |
| - logger.info(f"Broadcasted {len(news[:5])} news articles") |
| + response = await provider_manager.fetch_data( |
| + "news", |
| + params={"filter": "hot"}, |
| + use_cache=True, |
| + ttl=300 |
| + ) |
| + |
| + if response["success"] and response["data"]: |
| + # Transform/Normalize |
| + data = response["data"] |
| + articles = [] |
| + |
| + if "results" in data: # CryptoPanic |
| + for post in data.get('results', [])[:5]: |
| + articles.append({ |
| + "id": str(post.get('id')), |
| + "title": post.get('title', ''), |
| + "source": post.get('source', {}).get('title', 'Unknown'), |
| + "url": post.get('url', ''), |
| + "published_at": post.get('published_at', datetime.now().isoformat()) |
| + }) |
| + elif "articles" in data: # NewsAPI |
| + for post in data.get('articles', [])[:5]: |
| + articles.append({ |
| + "id": str(hash(post.get('url', ''))), |
| + "title": post.get('title', ''), |
| + "source": post.get('source', {}).get('name', 'Unknown'), |
| + "url": post.get('url', ''), |
| + "published_at": post.get('publishedAt', datetime.now().isoformat()) |
| + }) |
| + |
| + if articles: |
| + payload = { |
| + "type": "news", |
| + "data": {"articles": articles}, |
| + "count": len(articles), |
| + "timestamp": datetime.utcnow().isoformat(), |
| + "source": response["source"] |
| + } |
| + |
| + await ws_manager.broadcast_to_service(ServiceType.NEWS, payload) |
| + logger.info(f"Broadcasted {len(articles)} news articles from {response['source']}") |
| |
| except Exception as e: |
| logger.error(f"Error broadcasting news: {e}", exc_info=True) |
| |
| - await asyncio.sleep(30) # Check every 30 seconds |
| + await asyncio.sleep(60) |
| |
| async def broadcast_sentiment(self): |
| """Broadcast sentiment updates""" |
| logger.info("Starting sentiment broadcast...") |
| - last_sentiment_value = None |
| |
| while self.is_running: |
| try: |
| - sentiment = db_manager.get_latest_sentiment() |
| - |
| - if sentiment and sentiment.value != last_sentiment_value: |
| - last_sentiment_value = sentiment.value |
| - |
| - data = { |
| + response = await provider_manager.fetch_data( |
| + "sentiment", |
| + params={"limit": 1}, |
| + use_cache=True, |
| + ttl=3600 |
| + ) |
| + |
| + if response["success"] and response["data"]: |
| + data = response["data"] |
| + fng_value = 50 |
| + classification = "Neutral" |
| + |
| + if data.get('data'): |
| + item = data['data'][0] |
| + fng_value = int(item.get('value', 50)) |
| + classification = item.get('value_classification', 'Neutral') |
| + |
| + payload = { |
| "type": "sentiment", |
| "data": { |
| - "fear_greed_index": sentiment.value, |
| - "classification": sentiment.classification, |
| - "metric_name": sentiment.metric_name, |
| - "source": sentiment.source, |
| - "timestamp": sentiment.timestamp.isoformat() |
| + "fear_greed_index": fng_value, |
| + "classification": classification, |
| + "timestamp": datetime.utcnow().isoformat() |
| }, |
| - "timestamp": datetime.utcnow().isoformat() |
| + "timestamp": datetime.utcnow().isoformat(), |
| + "source": response["source"] |
| } |
| |
| - await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, data) |
| - logger.info(f"Broadcasted sentiment: {sentiment.value} ({sentiment.classification})") |
| + await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, payload) |
| + logger.info(f"Broadcasted sentiment: {fng_value} from {response['source']}") |
| |
| except Exception as e: |
| logger.error(f"Error broadcasting sentiment: {e}", exc_info=True) |
| |
| - await asyncio.sleep(60) # Check every minute |
| - |
| - async def broadcast_whales(self): |
| - """Broadcast whale transaction updates""" |
| - logger.info("Starting whale transaction broadcast...") |
| - last_whale_id = 0 |
| - |
| - while self.is_running: |
| - try: |
| - whales = db_manager.get_whale_transactions(limit=5) |
| - |
| - if whales and (not last_whale_id or whales[0].id != last_whale_id): |
| - last_whale_id = whales[0].id |
| - |
| - data = { |
| - "type": "whale_transaction", |
| - "data": { |
| - "transactions": [ |
| - { |
| - "id": tx.id, |
| - "blockchain": tx.blockchain, |
| - "amount_usd": tx.amount_usd, |
| - "from_address": tx.from_address[:20] + "...", |
| - "to_address": tx.to_address[:20] + "...", |
| - "timestamp": tx.timestamp.isoformat() |
| - } |
| - for tx in whales |
| - ] |
| - }, |
| - "count": len(whales), |
| - "timestamp": datetime.utcnow().isoformat() |
| - } |
| - |
| - await ws_manager.broadcast_to_service(ServiceType.WHALE_TRACKING, data) |
| - logger.info(f"Broadcasted {len(whales)} whale transactions") |
| - |
| - except Exception as e: |
| - logger.error(f"Error broadcasting whales: {e}", exc_info=True) |
| - |
| - await asyncio.sleep(15) # Check every 15 seconds |
| + await asyncio.sleep(60) |
| |
| async def broadcast_gas_prices(self): |
| """Broadcast gas price updates""" |
| @@ -201,23 +204,37 @@ class DataBroadcaster: |
| |
| while self.is_running: |
| try: |
| - gas_prices = db_manager.get_latest_gas_prices() |
| - |
| - if gas_prices: |
| - data = { |
| - "type": "gas_prices", |
| - "data": gas_prices, |
| - "timestamp": datetime.utcnow().isoformat() |
| - } |
| - |
| - # Broadcast to RPC_NODES service type (gas prices are blockchain-related) |
| - await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, data) |
| - logger.debug("Broadcasted gas prices") |
| + response = await provider_manager.fetch_data( |
| + "onchain", |
| + params={}, |
| + use_cache=True, |
| + ttl=15 |
| + ) |
| + |
| + if response["success"] and response["data"]: |
| + data = response["data"] |
| + result = data.get("result", {}) |
| + |
| + if result: |
| + payload = { |
| + "type": "gas_prices", |
| + "data": { |
| + "fast": result.get("FastGasPrice"), |
| + "standard": result.get("ProposeGasPrice"), |
| + "slow": result.get("SafeGasPrice") |
| + }, |
| + "timestamp": datetime.utcnow().isoformat(), |
| + "source": response["source"] |
| + } |
| + |
| + # Broadcast to RPC_NODES service type (gas prices are blockchain-related) |
| + await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, payload) |
| + logger.debug(f"Broadcasted gas prices from {response['source']}") |
| |
| except Exception as e: |
| logger.error(f"Error broadcasting gas prices: {e}", exc_info=True) |
| |
| - await asyncio.sleep(30) # Every 30 seconds |
| + await asyncio.sleep(30) |
| |
| |
| # Global broadcaster instance |
| |
| |
| |
| |
| @@ -4,125 +4,264 @@ import os |
| import asyncio |
| from typing import Dict, List, Optional, Any |
| from datetime import datetime |
| +from backend.orchestration.provider_manager import provider_manager, ProviderConfig |
| |
| logger = logging.getLogger(__name__) |
| |
| -class BaseProvider: |
| - def __init__(self, name: str, base_url: str): |
| - self.name = name |
| - self.base_url = base_url |
| - self.session = None |
| - |
| - async def _get_session(self): |
| - if self.session is None or self.session.closed: |
| - self.session = aiohttp.ClientSession() |
| - return self.session |
| - |
| - async def close(self): |
| - if self.session and not self.session.closed: |
| - await self.session.close() |
| - |
| - async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Any: |
| - try: |
| - session = await self._get_session() |
| - url = f"{self.base_url}{endpoint}" |
| - async with session.get(url, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response: |
| - response.raise_for_status() |
| - return await response.json() |
| - except Exception as e: |
| - logger.error(f"Error fetching from {self.name}: {e}") |
| - raise |
| - |
| -class CoinGeckoProvider(BaseProvider): |
| - def __init__(self): |
| - super().__init__("CoinGecko", "https://api.coingecko.com/api/v3") |
| - self.api_key = os.getenv("COINGECKO_API_KEY") |
| - |
| - async def get_market_data(self, vs_currency: str = "usd", ids: str = "bitcoin,ethereum") -> List[Dict]: |
| - params = { |
| - "vs_currency": vs_currency, |
| - "ids": ids, |
| - "order": "market_cap_desc", |
| - "per_page": 100, |
| - "page": 1, |
| - "sparkline": "false", |
| - "price_change_percentage": "24h" |
| - } |
| - if self.api_key: |
| - params["x_cg_demo_api_key"] = self.api_key |
| - |
| - return await self._get("/coins/markets", params=params) |
| - |
| - async def get_coin_price(self, coin_id: str, vs_currencies: str = "usd") -> Dict: |
| - params = {"ids": coin_id, "vs_currencies": vs_currencies} |
| - return await self._get("/simple/price", params=params) |
| - |
| -class BinanceProvider(BaseProvider): |
| - def __init__(self): |
| - super().__init__("Binance", "https://api.binance.com/api/v3") |
| - |
| - async def get_ticker_price(self, symbol: str) -> Dict: |
| - # Symbol example: BTCUSDT |
| - return await self._get("/ticker/price", params={"symbol": symbol.upper()}) |
| - |
| - async def get_klines(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[List]: |
| - params = { |
| - "symbol": symbol.upper(), |
| - "interval": interval, |
| - "limit": limit |
| - } |
| - return await self._get("/klines", params=params) |
| - |
| -class CryptoPanicProvider(BaseProvider): |
| - def __init__(self): |
| - super().__init__("CryptoPanic", "https://cryptopanic.com/api/v1") |
| - self.api_key = os.getenv("CRYPTOPANIC_API_KEY") |
| - |
| - async def get_news(self, filter_type: str = "hot") -> Dict: |
| - if not self.api_key: |
| - logger.warning("CryptoPanic API key not set") |
| - # Fallback to public RSS feed logic elsewhere or return empty |
| - return {"results": []} |
| - |
| - params = { |
| - "auth_token": self.api_key, |
| - "filter": filter_type, |
| - "public": "true" |
| - } |
| - return await self._get("/posts/", params=params) |
| - |
| -class AlternativeMeProvider(BaseProvider): |
| - def __init__(self): |
| - super().__init__("Alternative.me", "https://api.alternative.me") |
| - |
| - async def get_fear_and_greed(self, limit: int = 1) -> Dict: |
| - return await self._get("/fng/", params={"limit": limit}) |
| - |
| -# Singleton instances |
| -coingecko_provider = CoinGeckoProvider() |
| -binance_provider = BinanceProvider() |
| -cryptopanic_provider = CryptoPanicProvider() |
| -alternative_me_provider = AlternativeMeProvider() |
| - |
| -async def get_all_providers_status(): |
| - results = {} |
| - # Simple check |
| - try: |
| - await coingecko_provider.get_coin_price("bitcoin") |
| - results["coingecko"] = "online" |
| - except: |
| - results["coingecko"] = "offline" |
| - |
| - try: |
| - await binance_provider.get_ticker_price("BTCUSDT") |
| - results["binance"] = "online" |
| - except: |
| - results["binance"] = "offline" |
| - |
| - try: |
| - await alternative_me_provider.get_fear_and_greed() |
| - results["alternative_me"] = "online" |
| - except: |
| - results["alternative_me"] = "offline" |
| +# ============================================================================== |
| +# FETCH IMPLEMENTATIONS |
| +# ============================================================================== |
| + |
| +async def fetch_coingecko_market(config: ProviderConfig, **kwargs) -> Any: |
| + ids = kwargs.get("ids", "bitcoin,ethereum") |
| + vs_currency = kwargs.get("vs_currency", "usd") |
| + |
| + url = f"{config.base_url}/coins/markets" |
| + params = { |
| + "vs_currency": vs_currency, |
| + "ids": ids, |
| + "order": "market_cap_desc", |
| + "per_page": 100, |
| + "page": 1, |
| + "sparkline": "false", |
| + "price_change_percentage": "24h" |
| + } |
| + |
| + # Pro API key support |
| + if config.api_key: |
| + params["x_cg_pro_api_key"] = config.api_key |
| |
| - return results |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + if response.status == 429: |
| + raise Exception("Rate limit exceeded (429)") |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +async def fetch_coingecko_price(config: ProviderConfig, **kwargs) -> Any: |
| + coin_id = kwargs.get("coin_id", "bitcoin") |
| + vs_currencies = kwargs.get("vs_currencies", "usd") |
| + |
| + url = f"{config.base_url}/simple/price" |
| + params = {"ids": coin_id, "vs_currencies": vs_currencies} |
| + |
| + if config.api_key: |
| + params["x_cg_pro_api_key"] = config.api_key |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +async def fetch_binance_ticker(config: ProviderConfig, **kwargs) -> Any: |
| + symbol = kwargs.get("symbol", "BTCUSDT").upper() |
| + url = f"{config.base_url}/ticker/price" |
| + params = {"symbol": symbol} |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + if response.status == 451: |
| + raise Exception("Geo-blocked (451)") |
| + response.raise_for_status() |
| + data = await response.json() |
| + # Normalize to look somewhat like CoinGecko for generic usage if needed |
| + return {"price": float(data.get("price", 0)), "symbol": data.get("symbol")} |
| + |
| +async def fetch_binance_klines(config: ProviderConfig, **kwargs) -> Any: |
| + symbol = kwargs.get("symbol", "BTCUSDT").upper() |
| + interval = kwargs.get("interval", "1h") |
| + limit = kwargs.get("limit", 100) |
| + |
| + url = f"{config.base_url}/klines" |
| + params = { |
| + "symbol": symbol, |
| + "interval": interval, |
| + "limit": limit |
| + } |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + if response.status == 451: |
| + raise Exception("Geo-blocked (451)") |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +async def fetch_cryptopanic_news(config: ProviderConfig, **kwargs) -> Any: |
| + filter_type = kwargs.get("filter", "hot") |
| + url = f"{config.base_url}/posts/" |
| + |
| + params = { |
| + "auth_token": config.api_key, |
| + "filter": filter_type, |
| + "public": "true" |
| + } |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +async def fetch_newsapi(config: ProviderConfig, **kwargs) -> Any: |
| + query = kwargs.get("query", "crypto") |
| + url = f"{config.base_url}/everything" |
| + |
| + params = { |
| + "q": query, |
| + "apiKey": config.api_key, |
| + "sortBy": "publishedAt", |
| + "language": "en" |
| + } |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +async def fetch_alternative_me_fng(config: ProviderConfig, **kwargs) -> Any: |
| + limit = kwargs.get("limit", 1) |
| + url = f"{config.base_url}/fng/" |
| + params = {"limit": limit} |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +async def fetch_etherscan_gas(config: ProviderConfig, **kwargs) -> Any: |
| + url = config.base_url |
| + params = { |
| + "module": "gastracker", |
| + "action": "gasoracle", |
| + "apikey": config.api_key |
| + } |
| + |
| + async with aiohttp.ClientSession() as session: |
| + async with session.get(url, params=params, timeout=config.timeout) as response: |
| + response.raise_for_status() |
| + return await response.json() |
| + |
| +# ============================================================================== |
| +# REGISTRATION |
| +# ============================================================================== |
| + |
| +def initialize_providers(): |
| + # Market Data Providers |
| + provider_manager.register_provider( |
| + "market", |
| + ProviderConfig( |
| + name="coingecko_free", |
| + category="market", |
| + base_url="https://api.coingecko.com/api/v3", |
| + rate_limit_per_min=30, # Conservative for free tier |
| + weight=100 |
| + ), |
| + fetch_coingecko_market |
| + ) |
| + |
| + provider_manager.register_provider( |
| + "market_pro", |
| + ProviderConfig( |
| + name="coingecko_pro", |
| + category="market", |
| + base_url="https://pro-api.coingecko.com/api/v3", # Assuming Pro URL |
| + api_key=os.getenv("COINGECKO_PRO_API_KEY", "04cf4b5b-9868-465c-8ba0-9f2e78c92eb1"), |
| + rate_limit_per_min=500, |
| + weight=200 |
| + ), |
| + fetch_coingecko_market |
| + ) |
| + |
| + provider_manager.register_provider( |
| + "market", |
| + ProviderConfig( |
| + name="binance", |
| + category="market", |
| + base_url="https://api.binance.com/api/v3", |
| + rate_limit_per_min=1200, |
| + weight=90 |
| + ), |
| + fetch_binance_ticker # Note: This fetch function behaves differently (ticker vs market list), router needs to handle |
| + ) |
| + |
| + # OHLC Providers |
| + provider_manager.register_provider( |
| + "ohlc", |
| + ProviderConfig( |
| + name="binance_ohlc", |
| + category="ohlc", |
| + base_url="https://api.binance.com/api/v3", |
| + rate_limit_per_min=1200, |
| + weight=100 |
| + ), |
| + fetch_binance_klines |
| + ) |
| + |
| + # News Providers |
| + provider_manager.register_provider( |
| + "news", |
| + ProviderConfig( |
| + name="cryptopanic", |
| + category="news", |
| + base_url="https://cryptopanic.com/api/v1", |
| + api_key=os.getenv("CRYPTOPANIC_API_KEY", "7832690f05026639556837583758"), # Placeholder if env not set |
| + rate_limit_per_min=60, |
| + weight=100 |
| + ), |
| + fetch_cryptopanic_news |
| + ) |
| + |
| + provider_manager.register_provider( |
| + "news", |
| + ProviderConfig( |
| + name="newsapi", |
| + category="news", |
| + base_url="https://newsapi.org/v2", |
| + api_key=os.getenv("NEWS_API_KEY", "968a5e25552b4cb5ba3280361d8444ab"), |
| + rate_limit_per_min=100, |
| + weight=90 |
| + ), |
| + fetch_newsapi |
| + ) |
| + |
| + # Sentiment |
| + provider_manager.register_provider( |
| + "sentiment", |
| + ProviderConfig( |
| + name="alternative_me", |
| + category="sentiment", |
| + base_url="https://api.alternative.me", |
| + rate_limit_per_min=60, |
| + weight=100 |
| + ), |
| + fetch_alternative_me_fng |
| + ) |
| + |
| + # OnChain / RPC |
| + provider_manager.register_provider( |
| + "onchain", |
| + ProviderConfig( |
| + name="etherscan", |
| + category="onchain", |
| + base_url="https://api.etherscan.io/api", |
| + api_key=os.getenv("ETHERSCAN_API_KEY", "SZHYFZK2RR8H9TIMJBVW54V4H81K2Z2KR2"), |
| + rate_limit_per_min=5, # Free tier limit |
| + weight=100 |
| + ), |
| + fetch_etherscan_gas |
| + ) |
| + |
| + provider_manager.register_provider( |
| + "onchain", |
| + ProviderConfig( |
| + name="etherscan_backup", |
| + category="onchain", |
| + base_url="https://api.etherscan.io/api", |
| + api_key=os.getenv("ETHERSCAN_API_KEY_2", "T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45"), |
| + rate_limit_per_min=5, |
| + weight=90 |
| + ), |
| + fetch_etherscan_gas |
| + ) |
| + |
| +# Auto-initialize |
| +initialize_providers() |
| |
| |
| |
| |
| @@ -1,7 +1,7 @@ |
| """ |
| HF Space Complete API Router |
| Implements all required endpoints for Hugging Face Space deployment |
| -using REAL data providers. |
| +using REAL data providers managed by the Orchestrator. |
| """ |
| from fastapi import APIRouter, HTTPException, Query, Body, Depends |
| from fastapi.responses import JSONResponse |
| @@ -14,14 +14,8 @@ import json |
| import os |
| from pathlib import Path |
| |
| -# Import Real Data Providers |
| -from backend.live_data.providers import ( |
| - coingecko_provider, |
| - binance_provider, |
| - cryptopanic_provider, |
| - alternative_me_provider |
| -) |
| -from backend.cache.cache_manager import cache_manager |
| +# Import Orchestrator |
| +from backend.orchestration.provider_manager import provider_manager |
| |
| logger = logging.getLogger(__name__) |
| |
| @@ -36,6 +30,7 @@ class MetaInfo(BaseModel): |
| cache_ttl_seconds: int = Field(default=30, description="Cache TTL in seconds") |
| generated_at: str = Field(default_factory=lambda: datetime.now().isoformat()) |
| source: str = Field(default="live", description="Data source") |
| + latency_ms: Optional[float] = None |
| |
| class MarketItem(BaseModel): |
| """Market ticker item""" |
| @@ -94,39 +89,42 @@ class GasResponse(BaseModel): |
| async def get_market_snapshot(): |
| """ |
| Get current market snapshot with prices, changes, and volumes. |
| - Uses CoinGecko API. |
| + Uses Provider Orchestrator (CoinGecko, Binance, etc.) |
| """ |
| - cache_key = "market_snapshot" |
| - cached = await cache_manager.get(cache_key) |
| - if cached: |
| - return cached |
| - |
| - try: |
| - data = await coingecko_provider.get_market_data(ids="bitcoin,ethereum,tron,solana,binancecoin,ripple") |
| + response = await provider_manager.fetch_data( |
| + "market", |
| + params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"}, |
| + use_cache=True, |
| + ttl=60 |
| + ) |
| + |
| + if not response["success"]: |
| + raise HTTPException(status_code=503, detail=response["error"]) |
| |
| - items = [] |
| + data = response["data"] |
| + items = [] |
| + |
| + # Handle different provider formats if needed, but fetch functions should normalize |
| + # Assuming coingecko format for "market" category list |
| + if isinstance(data, list): |
| for coin in data: |
| items.append(MarketItem( |
| symbol=coin.get('symbol', '').upper(), |
| price=coin.get('current_price', 0), |
| change_24h=coin.get('price_change_percentage_24h', 0), |
| volume_24h=coin.get('total_volume', 0), |
| - source="coingecko" |
| + source=response["source"] |
| )) |
| - |
| - response = MarketResponse( |
| - last_updated=datetime.now().isoformat(), |
| - items=items, |
| - meta=MetaInfo(cache_ttl_seconds=60, source="coingecko") |
| + |
| + return MarketResponse( |
| + last_updated=response["timestamp"], |
| + items=items, |
| + meta=MetaInfo( |
| + cache_ttl_seconds=60, |
| + source=response["source"], |
| + latency_ms=response.get("latency_ms") |
| ) |
| - |
| - await cache_manager.set(cache_key, response, ttl=60) |
| - return response |
| - |
| - except Exception as e: |
| - logger.error(f"Error in get_market_snapshot: {e}") |
| - # Return empty list or cached stale data if available, but NEVER fake data |
| - raise HTTPException(status_code=503, detail="Market data unavailable") |
| + ) |
| |
| @router.get("/api/market/ohlc") |
| async def get_ohlc( |
| @@ -134,55 +132,61 @@ async def get_ohlc( |
| interval: int = Query(60, description="Interval in minutes"), |
| limit: int = Query(100, description="Number of candles") |
| ): |
| - """Get OHLC candlestick data from Binance""" |
| - cache_key = f"ohlc_{symbol}_{interval}_{limit}" |
| - cached = await cache_manager.get(cache_key) |
| - if cached: |
| - return cached |
| + """Get OHLC candlestick data via Orchestrator""" |
| + |
| + # Map minutes to common string format if needed by providers, |
| + # but fetch_binance_klines handles it. |
| + interval_str = "1h" |
| + if interval < 60: |
| + interval_str = f"{interval}m" |
| + elif interval == 60: |
| + interval_str = "1h" |
| + elif interval == 240: |
| + interval_str = "4h" |
| + elif interval == 1440: |
| + interval_str = "1d" |
| |
| - try: |
| - # Map minutes to Binance intervals |
| - binance_interval = "1h" |
| - if interval == 1: binance_interval = "1m" |
| - elif interval == 5: binance_interval = "5m" |
| - elif interval == 15: binance_interval = "15m" |
| - elif interval == 60: binance_interval = "1h" |
| - elif interval == 240: binance_interval = "4h" |
| - elif interval == 1440: binance_interval = "1d" |
| + response = await provider_manager.fetch_data( |
| + "ohlc", |
| + params={ |
| + "symbol": symbol, |
| + "interval": interval_str, |
| + "limit": limit |
| + }, |
| + use_cache=True, |
| + ttl=60 |
| + ) |
| |
| - # Binance symbol needs to be e.g., BTCUSDT |
| - formatted_symbol = symbol.upper() |
| - if not formatted_symbol.endswith("USDT") and not formatted_symbol.endswith("USD"): |
| - formatted_symbol += "USDT" |
| - |
| - klines = await binance_provider.get_klines(formatted_symbol, interval=binance_interval, limit=limit) |
| - |
| - ohlc_data = [] |
| + if not response["success"]: |
| + raise HTTPException(status_code=503, detail=response["error"]) |
| + |
| + # Transform Binance Klines to standard OHLC |
| + # [time, open, high, low, close, volume, ...] |
| + klines = response["data"] |
| + ohlc_data = [] |
| + |
| + if isinstance(klines, list): |
| for k in klines: |
| - # Binance kline: [open_time, open, high, low, close, volume, ...] |
| - ohlc_data.append({ |
| - "ts": int(k[0] / 1000), |
| - "open": float(k[1]), |
| - "high": float(k[2]), |
| - "low": float(k[3]), |
| - "close": float(k[4]), |
| - "volume": float(k[5]) |
| - }) |
| - |
| - response = { |
| - "symbol": symbol, |
| - "interval": interval, |
| - "data": ohlc_data, |
| - "meta": MetaInfo(cache_ttl_seconds=60, source="binance").dict() |
| - } |
| - |
| - await cache_manager.set(cache_key, response, ttl=60) |
| - return response |
| + if isinstance(k, list) and len(k) >= 6: |
| + ohlc_data.append({ |
| + "ts": int(k[0] / 1000), |
| + "open": float(k[1]), |
| + "high": float(k[2]), |
| + "low": float(k[3]), |
| + "close": float(k[4]), |
| + "volume": float(k[5]) |
| + }) |
| |
| - except Exception as e: |
| - logger.error(f"Error in get_ohlc: {e}") |
| - # Try fallbacks? For now, fail gracefully. |
| - raise HTTPException(status_code=503, detail="OHLC data unavailable") |
| + return { |
| + "symbol": symbol, |
| + "interval": interval, |
| + "data": ohlc_data, |
| + "meta": MetaInfo( |
| + cache_ttl_seconds=60, |
| + source=response["source"], |
| + latency_ms=response.get("latency_ms") |
| + ).dict() |
| + } |
| |
| # |
| # News & Sentiment Endpoints |
| @@ -193,19 +197,24 @@ async def get_news( |
| limit: int = Query(20, description="Number of articles"), |
| source: Optional[str] = Query(None, description="Filter by source") |
| ): |
| - """Get cryptocurrency news from CryptoPanic""" |
| - cache_key = f"news_{limit}_{source}" |
| - cached = await cache_manager.get(cache_key) |
| - if cached: |
| - return cached |
| + """Get cryptocurrency news via Orchestrator""" |
| + |
| + response = await provider_manager.fetch_data( |
| + "news", |
| + params={"filter": "hot", "query": "crypto"}, # Params for different providers |
| + use_cache=True, |
| + ttl=300 |
| + ) |
| + |
| + if not response["success"]: |
| + return NewsResponse(articles=[], meta=MetaInfo(source="error")) |
| |
| - try: |
| - data = await cryptopanic_provider.get_news() |
| - |
| - articles = [] |
| - results = data.get('results', [])[:limit] |
| - |
| - for post in results: |
| + data = response["data"] |
| + articles = [] |
| + |
| + # Normalize CryptoPanic / NewsAPI formats |
| + if "results" in data: # CryptoPanic |
| + for post in data.get('results', [])[:limit]: |
| articles.append(NewsArticle( |
| id=str(post.get('id')), |
| title=post.get('title', ''), |
| @@ -214,49 +223,60 @@ async def get_news( |
| summary=post.get('slug', ''), |
| published_at=post.get('published_at', datetime.now().isoformat()) |
| )) |
| - |
| - response = NewsResponse( |
| - articles=articles, |
| - meta=MetaInfo(cache_ttl_seconds=300, source="cryptopanic") |
| + elif "articles" in data: # NewsAPI |
| + for post in data.get('articles', [])[:limit]: |
| + articles.append(NewsArticle( |
| + id=str(hash(post.get('url', ''))), |
| + title=post.get('title', ''), |
| + url=post.get('url', ''), |
| + source=post.get('source', {}).get('name', 'Unknown'), |
| + summary=post.get('description', ''), |
| + published_at=post.get('publishedAt', datetime.now().isoformat()) |
| + )) |
| + |
| + return NewsResponse( |
| + articles=articles, |
| + meta=MetaInfo( |
| + cache_ttl_seconds=300, |
| + source=response["source"], |
| + latency_ms=response.get("latency_ms") |
| ) |
| - |
| - await cache_manager.set(cache_key, response, ttl=300) |
| - return response |
| - |
| - except Exception as e: |
| - logger.error(f"Error in get_news: {e}") |
| - return NewsResponse(articles=[], meta=MetaInfo(source="error")) |
| + ) |
| |
| |
| @router.get("/api/sentiment/global") |
| async def get_global_sentiment(): |
| - """Get global market sentiment (Fear & Greed Index)""" |
| - cache_key = "sentiment_global" |
| - cached = await cache_manager.get(cache_key) |
| - if cached: |
| - return cached |
| - |
| - try: |
| - data = await alternative_me_provider.get_fear_and_greed() |
| - fng_value = 50 |
| - classification = "Neutral" |
| + """Get global market sentiment via Orchestrator""" |
| + |
| + response = await provider_manager.fetch_data( |
| + "sentiment", |
| + params={"limit": 1}, |
| + use_cache=True, |
| + ttl=3600 |
| + ) |
| + |
| + if not response["success"]: |
| + raise HTTPException(status_code=503, detail=response["error"]) |
| |
| - if data.get('data'): |
| - item = data['data'][0] |
| - fng_value = int(item.get('value', 50)) |
| - classification = item.get('value_classification', 'Neutral') |
| - |
| - result = { |
| - "score": fng_value, |
| - "label": classification, |
| - "meta": MetaInfo(cache_ttl_seconds=3600, source="alternative.me").dict() |
| - } |
| + data = response["data"] |
| + fng_value = 50 |
| + classification = "Neutral" |
| + |
| + # Alternative.me format |
| + if data.get('data'): |
| + item = data['data'][0] |
| + fng_value = int(item.get('value', 50)) |
| + classification = item.get('value_classification', 'Neutral') |
| |
| - await cache_manager.set(cache_key, result, ttl=3600) |
| - return result |
| - except Exception as e: |
| - logger.error(f"Error in get_global_sentiment: {e}") |
| - raise HTTPException(status_code=503, detail="Sentiment data unavailable") |
| + return { |
| + "score": fng_value, |
| + "label": classification, |
| + "meta": MetaInfo( |
| + cache_ttl_seconds=3600, |
| + source=response["source"], |
| + latency_ms=response.get("latency_ms") |
| + ).dict() |
| + } |
| |
| # |
| # Blockchain Endpoints |
| @@ -264,14 +284,56 @@ async def get_global_sentiment(): |
| |
| @router.get("/api/crypto/blockchain/gas", response_model=GasResponse) |
| async def get_gas_prices(chain: str = Query("ethereum", description="Blockchain network")): |
| - """Get gas prices - Placeholder for real implementation""" |
| - # TODO: Implement Etherscan or similar provider |
| - # For now, return empty/null to indicate no data rather than fake data |
| + """Get gas prices via Orchestrator""" |
| + |
| + if chain.lower() != "ethereum": |
| + # Fallback or implement other chains |
| + return GasResponse( |
| + chain=chain, |
| + gas_prices=None, |
| + timestamp=datetime.now().isoformat(), |
| + meta=MetaInfo(source="unavailable") |
| + ) |
| + |
| + response = await provider_manager.fetch_data( |
| + "onchain", |
| + params={}, |
| + use_cache=True, |
| + ttl=15 |
| + ) |
| + |
| + if not response["success"]: |
| + return GasResponse( |
| + chain=chain, |
| + gas_prices=None, |
| + timestamp=datetime.now().isoformat(), |
| + meta=MetaInfo(source="unavailable") |
| + ) |
| + |
| + data = response["data"] |
| + result = data.get("result", {}) |
| + |
| + gas_price = None |
| + if result: |
| + # Etherscan returns data in result |
| + try: |
| + gas_price = GasPrice( |
| + fast=float(result.get("FastGasPrice", 0)), |
| + standard=float(result.get("ProposeGasPrice", 0)), |
| + slow=float(result.get("SafeGasPrice", 0)) |
| + ) |
| + except: |
| + pass |
| + |
| return GasResponse( |
| chain=chain, |
| - gas_prices=None, |
| + gas_prices=gas_price, |
| timestamp=datetime.now().isoformat(), |
| - meta=MetaInfo(source="unavailable") |
| + meta=MetaInfo( |
| + cache_ttl_seconds=15, |
| + source=response["source"], |
| + latency_ms=response.get("latency_ms") |
| + ) |
| ) |
| |
| # |
| @@ -281,14 +343,12 @@ async def get_gas_prices(chain: str = Query("ethereum", description="Blockchain |
| @router.get("/api/status") |
| async def get_system_status(): |
| """Get overall system status""" |
| - from backend.live_data.providers import get_all_providers_status |
| - |
| - provider_status = await get_all_providers_status() |
| + stats = provider_manager.get_stats() |
| |
| return { |
| 'status': 'operational', |
| 'timestamp': datetime.now().isoformat(), |
| - 'providers': provider_status, |
| - 'version': '1.0.0', |
| + 'providers': stats, |
| + 'version': '2.0.0', |
| 'meta': MetaInfo(source="system").dict() |
| } |
|
|