| """ |
| OHLC Data Background Worker - REAL DATA FROM MULTIPLE FREE APIs |
| |
| CRITICAL RULES: |
| - MUST fetch REAL candlestick data from multiple sources with automatic fallback |
| - MUST store actual OHLC values, not fake data |
| - MUST use actual timestamps from API responses |
| - NEVER generate or interpolate candles |
| - If primary API fails, automatically try alternative sources |
| |
| SUPPORTED DATA SOURCES (in priority order): |
| 1. CoinGecko (FREE, no API key, 365-day history) |
| 2. Kraken (FREE, no API key, up to 720 candles) |
| 3. Coinbase Pro (FREE, no API key, up to 300 candles) |
| 4. Binance (FREE, but may be geo-restricted in some regions) |
| 5. CoinPaprika (FREE, no API key, 366-day history) |
| """ |
|
|
| import asyncio |
| import time |
| import logging |
| import os |
| from datetime import datetime |
| from typing import List, Dict, Any, Optional |
| import httpx |
|
|
| from database.cache_queries import get_cache_queries |
| from database.db_manager import db_manager |
| from utils.logger import setup_logger |
|
|
| logger = setup_logger("ohlc_worker") |
|
|
| |
| cache = get_cache_queries(db_manager) |
|
|
| |
| HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) |
| if HF_UPLOAD_ENABLED: |
| try: |
| from hf_dataset_uploader import get_dataset_uploader |
| hf_uploader = get_dataset_uploader() |
| logger.info("✅ HuggingFace Dataset upload ENABLED for OHLC data") |
| except Exception as e: |
| logger.warning(f"HuggingFace Dataset upload disabled: {e}") |
| HF_UPLOAD_ENABLED = False |
| hf_uploader = None |
| else: |
| logger.info("ℹ️ HuggingFace Dataset upload DISABLED (no HF_TOKEN)") |
| hf_uploader = None |
|
|
| |
| SYMBOLS = ["BTC", "ETH", "BNB", "XRP", "ADA", "SOL", "DOT", "DOGE", "MATIC", "AVAX", |
| "LINK", "LTC", "UNI", "ALGO", "XLM", "ATOM", "TRX", "XMR", "ETC", "XTZ"] |
|
|
| |
| INTERVALS = ["1h", "4h", "1d"] |
|
|
| |
| SYMBOL_MAP = { |
| "coingecko": { |
| "BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin", "XRP": "ripple", |
| "ADA": "cardano", "SOL": "solana", "DOT": "polkadot", "DOGE": "dogecoin", |
| "MATIC": "matic-network", "AVAX": "avalanche-2", "LINK": "chainlink", |
| "LTC": "litecoin", "UNI": "uniswap", "ALGO": "algorand", "XLM": "stellar", |
| "ATOM": "cosmos", "TRX": "tron", "XMR": "monero", "ETC": "ethereum-classic", |
| "XTZ": "tezos" |
| }, |
| "kraken": { |
| "BTC": "XXBTZUSD", "ETH": "XETHZUSD", "XRP": "XXRPZUSD", "ADA": "ADAUSD", |
| "SOL": "SOLUSD", "DOT": "DOTUSD", "DOGE": "XDGUSD", "LINK": "LINKUSD", |
| "LTC": "XLTCZUSD", "UNI": "UNIUSD", "ALGO": "ALGOUSD", "XLM": "XXLMZUSD", |
| "ATOM": "ATOMUSD", "TRX": "TRXUSD", "ETC": "XETCZUSD", "XTZ": "XTZUSD" |
| }, |
| "coinbase": { |
| "BTC": "BTC-USD", "ETH": "ETH-USD", "XRP": "XRP-USD", "ADA": "ADA-USD", |
| "SOL": "SOL-USD", "DOT": "DOT-USD", "DOGE": "DOGE-USD", "LINK": "LINK-USD", |
| "LTC": "LTC-USD", "UNI": "UNI-USD", "ALGO": "ALGO-USD", "XLM": "XLM-USD", |
| "ATOM": "ATOM-USD", "MATIC": "MATIC-USD", "AVAX": "AVAX-USD" |
| }, |
| "binance": { |
| "BTC": "BTCUSDT", "ETH": "ETHUSDT", "BNB": "BNBUSDT", "XRP": "XRPUSDT", |
| "ADA": "ADAUSDT", "SOL": "SOLUSDT", "DOT": "DOTUSDT", "DOGE": "DOGEUSDT", |
| "MATIC": "MATICUSDT", "AVAX": "AVAXUSDT", "LINK": "LINKUSDT", "LTC": "LTCUSDT", |
| "UNI": "UNIUSDT", "ALGO": "ALGOUSDT", "XLM": "XLMUSDT", "ATOM": "ATOMUSDT", |
| "TRX": "TRXUSDT", "XMR": "XMRUSDT", "ETC": "ETCUSDT", "XTZ": "XTZUSDT" |
| } |
| } |
|
|
|
|
| async def fetch_from_coingecko(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]: |
| """ |
| Fetch OHLC data from CoinGecko (FREE, no API key required) |
| |
| Args: |
| symbol: Base symbol (e.g., 'BTC') |
| interval: Interval (only '1d' supported by CoinGecko) |
| limit: Number of days to fetch (max 365) |
| |
| Returns: |
| List of OHLC candles |
| """ |
| try: |
| coin_id = SYMBOL_MAP["coingecko"].get(symbol) |
| if not coin_id: |
| logger.debug(f"CoinGecko: No mapping for {symbol}") |
| return [] |
| |
| |
| if interval not in ["1d", "4h", "1h"]: |
| return [] |
| |
| |
| days = min(limit if interval == "1d" else limit // 6 if interval == "4h" else limit // 24, 365) |
| |
| url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc" |
| params = {"vs_currency": "usd", "days": days} |
| |
| logger.debug(f"Fetching from CoinGecko: {coin_id} ({symbol})") |
| |
| async with httpx.AsyncClient(timeout=15.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| if not data or not isinstance(data, list): |
| return [] |
| |
| ohlc_data = [] |
| for candle in data: |
| try: |
| |
| ohlc_data.append({ |
| "symbol": symbol, |
| "interval": interval, |
| "timestamp": datetime.fromtimestamp(candle[0] / 1000), |
| "open": float(candle[1]), |
| "high": float(candle[2]), |
| "low": float(candle[3]), |
| "close": float(candle[4]), |
| "volume": 0.0, |
| "provider": "coingecko" |
| }) |
| except Exception as e: |
| logger.debug(f"Error parsing CoinGecko candle: {e}") |
| continue |
| |
| logger.info(f"✅ CoinGecko: Fetched {len(ohlc_data)} candles for {symbol}") |
| return ohlc_data |
| |
| except httpx.HTTPStatusError as e: |
| logger.debug(f"CoinGecko HTTP error for {symbol}: {e.response.status_code}") |
| return [] |
| except Exception as e: |
| logger.debug(f"CoinGecko error for {symbol}: {e}") |
| return [] |
|
|
|
|
| async def fetch_from_kraken(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]: |
| """ |
| Fetch OHLC data from Kraken (FREE, no API key required) |
| |
| Args: |
| symbol: Base symbol (e.g., 'BTC') |
| interval: Interval |
| limit: Number of candles |
| |
| Returns: |
| List of OHLC candles |
| """ |
| try: |
| pair = SYMBOL_MAP["kraken"].get(symbol) |
| if not pair: |
| logger.debug(f"Kraken: No mapping for {symbol}") |
| return [] |
| |
| |
| interval_map = {"1h": "60", "4h": "240", "1d": "1440"} |
| kraken_interval = interval_map.get(interval) |
| if not kraken_interval: |
| return [] |
| |
| url = "https://api.kraken.com/0/public/OHLC" |
| params = {"pair": pair, "interval": kraken_interval} |
| |
| logger.debug(f"Fetching from Kraken: {pair} ({symbol})") |
| |
| async with httpx.AsyncClient(timeout=15.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| if data.get("error") and len(data["error"]) > 0: |
| logger.debug(f"Kraken error for {symbol}: {data['error']}") |
| return [] |
| |
| result = data.get("result", {}) |
| candles = result.get(pair, []) |
| |
| if not candles: |
| return [] |
| |
| ohlc_data = [] |
| for candle in candles[:limit]: |
| try: |
| |
| ohlc_data.append({ |
| "symbol": symbol, |
| "interval": interval, |
| "timestamp": datetime.fromtimestamp(int(candle[0])), |
| "open": float(candle[1]), |
| "high": float(candle[2]), |
| "low": float(candle[3]), |
| "close": float(candle[4]), |
| "volume": float(candle[6]), |
| "provider": "kraken" |
| }) |
| except Exception as e: |
| logger.debug(f"Error parsing Kraken candle: {e}") |
| continue |
| |
| logger.info(f"✅ Kraken: Fetched {len(ohlc_data)} candles for {symbol}") |
| return ohlc_data |
| |
| except Exception as e: |
| logger.debug(f"Kraken error for {symbol}: {e}") |
| return [] |
|
|
|
|
| async def fetch_from_coinbase(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]: |
| """ |
| Fetch OHLC data from Coinbase Pro (FREE, no API key required) |
| |
| Args: |
| symbol: Base symbol (e.g., 'BTC') |
| interval: Interval |
| limit: Number of candles (max 300) |
| |
| Returns: |
| List of OHLC candles |
| """ |
| try: |
| pair = SYMBOL_MAP["coinbase"].get(symbol) |
| if not pair: |
| logger.debug(f"Coinbase: No mapping for {symbol}") |
| return [] |
| |
| |
| interval_map = {"1h": "3600", "4h": "21600", "1d": "86400"} |
| granularity = interval_map.get(interval) |
| if not granularity: |
| return [] |
| |
| url = f"https://api.exchange.coinbase.com/products/{pair}/candles" |
| params = {"granularity": granularity} |
| |
| logger.debug(f"Fetching from Coinbase: {pair} ({symbol})") |
| |
| async with httpx.AsyncClient(timeout=15.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| if not data or not isinstance(data, list): |
| return [] |
| |
| ohlc_data = [] |
| for candle in data[:limit]: |
| try: |
| |
| ohlc_data.append({ |
| "symbol": symbol, |
| "interval": interval, |
| "timestamp": datetime.fromtimestamp(int(candle[0])), |
| "open": float(candle[3]), |
| "high": float(candle[2]), |
| "low": float(candle[1]), |
| "close": float(candle[4]), |
| "volume": float(candle[5]), |
| "provider": "coinbase" |
| }) |
| except Exception as e: |
| logger.debug(f"Error parsing Coinbase candle: {e}") |
| continue |
| |
| logger.info(f"✅ Coinbase: Fetched {len(ohlc_data)} candles for {symbol}") |
| return ohlc_data |
| |
| except Exception as e: |
| logger.debug(f"Coinbase error for {symbol}: {e}") |
| return [] |
|
|
|
|
| async def fetch_from_binance(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]: |
| """ |
| Fetch OHLC data from Binance (FREE, may be geo-restricted) |
| |
| Args: |
| symbol: Base symbol (e.g., 'BTC') |
| interval: Interval |
| limit: Number of candles |
| |
| Returns: |
| List of OHLC candles |
| """ |
| try: |
| pair = SYMBOL_MAP["binance"].get(symbol) |
| if not pair: |
| logger.debug(f"Binance: No mapping for {symbol}") |
| return [] |
| |
| url = "https://api.binance.com/api/v3/klines" |
| params = {"symbol": pair, "interval": interval, "limit": limit} |
| |
| logger.debug(f"Fetching from Binance: {pair} ({symbol})") |
| |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| |
| if not data or not isinstance(data, list): |
| return [] |
| |
| ohlc_data = [] |
| for candle in data: |
| try: |
| |
| ohlc_data.append({ |
| "symbol": symbol, |
| "interval": interval, |
| "timestamp": datetime.fromtimestamp(int(candle[0]) / 1000), |
| "open": float(candle[1]), |
| "high": float(candle[2]), |
| "low": float(candle[3]), |
| "close": float(candle[4]), |
| "volume": float(candle[5]), |
| "provider": "binance" |
| }) |
| except Exception as e: |
| logger.debug(f"Error parsing Binance candle: {e}") |
| continue |
| |
| logger.info(f"✅ Binance: Fetched {len(ohlc_data)} candles for {symbol}") |
| return ohlc_data |
| |
| except httpx.HTTPStatusError as e: |
| if e.response.status_code == 451: |
| logger.debug(f"Binance geo-restricted for {symbol}") |
| else: |
| logger.debug(f"Binance HTTP error for {symbol}: {e.response.status_code}") |
| return [] |
| except Exception as e: |
| logger.debug(f"Binance error for {symbol}: {e}") |
| return [] |
|
|
|
|
| async def fetch_ohlc_with_fallback(symbol: str, interval: str, limit: int = 100) -> List[Dict[str, Any]]: |
| """ |
| Fetch OHLC data with automatic fallback across multiple sources |
| |
| Priority order: |
| 1. CoinGecko (most reliable, no auth, no geo-restrictions) |
| 2. Kraken (reliable, no auth) |
| 3. Coinbase (reliable, no auth) |
| 4. Binance (may be geo-restricted) |
| |
| Args: |
| symbol: Base symbol (e.g., 'BTC') |
| interval: Interval ('1h', '4h', '1d') |
| limit: Number of candles to fetch |
| |
| Returns: |
| List of OHLC candles from first successful source |
| """ |
| sources = [ |
| ("CoinGecko", fetch_from_coingecko), |
| ("Kraken", fetch_from_kraken), |
| ("Coinbase", fetch_from_coinbase), |
| ("Binance", fetch_from_binance), |
| ] |
| |
| for source_name, fetch_func in sources: |
| try: |
| data = await fetch_func(symbol, interval, limit) |
| if data and len(data) > 0: |
| logger.debug(f"✅ Successfully fetched {len(data)} candles from {source_name} for {symbol}") |
| return data |
| except Exception as e: |
| logger.debug(f"❌ {source_name} failed for {symbol}: {e}") |
| continue |
| |
| logger.warning(f"⚠️ All sources failed for {symbol} {interval}") |
| return [] |
|
|
|
|
| async def save_ohlc_data_to_cache(ohlc_data: List[Dict[str, Any]]) -> int: |
| """ |
| Save REAL OHLC data to database cache AND upload to HuggingFace Datasets |
| |
| Data Flow: |
| 1. Save to SQLite cache (local persistence) |
| 2. Upload to HuggingFace Datasets (cloud storage & hub) |
| 3. Clients can fetch from HuggingFace Datasets |
| |
| Args: |
| ohlc_data: List of REAL OHLC data dictionaries |
| |
| Returns: |
| int: Number of candles saved |
| """ |
| saved_count = 0 |
|
|
| |
| for data in ohlc_data: |
| try: |
| success = cache.save_ohlc_candle( |
| symbol=data["symbol"], |
| interval=data["interval"], |
| timestamp=data["timestamp"], |
| open_price=data["open"], |
| high=data["high"], |
| low=data["low"], |
| close=data["close"], |
| volume=data["volume"], |
| provider=data["provider"] |
| ) |
|
|
| if success: |
| saved_count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error saving OHLC data for {data.get('symbol')}: {e}") |
| continue |
|
|
| |
| if HF_UPLOAD_ENABLED and hf_uploader and ohlc_data: |
| try: |
| |
| upload_data = [] |
| for data in ohlc_data: |
| upload_record = data.copy() |
| if isinstance(upload_record.get("timestamp"), datetime): |
| upload_record["timestamp"] = upload_record["timestamp"].isoformat() + "Z" |
| upload_data.append(upload_record) |
|
|
| logger.info(f"📤 Uploading {len(upload_data)} OHLC records to HuggingFace Datasets...") |
| upload_success = await hf_uploader.upload_ohlc_data( |
| upload_data, |
| append=True |
| ) |
|
|
| if upload_success: |
| logger.info(f"✅ Successfully uploaded OHLC data to HuggingFace Datasets") |
| else: |
| logger.warning(f"⚠️ Failed to upload OHLC data to HuggingFace Datasets") |
|
|
| except Exception as e: |
| logger.error(f"Error uploading OHLC to HuggingFace Datasets: {e}") |
| |
|
|
| return saved_count |
|
|
|
|
| async def fetch_and_cache_ohlc_for_symbol(symbol: str, interval: str) -> int: |
| """ |
| Fetch and cache OHLC data for a single symbol and interval using multi-source fallback |
| |
| Args: |
| symbol: Base symbol (e.g., 'BTC') |
| interval: Candle interval ('1h', '4h', '1d') |
| |
| Returns: |
| int: Number of candles saved |
| """ |
| try: |
| |
| limit = 100 if interval == "1d" else 100 |
| |
| |
| ohlc_data = await fetch_ohlc_with_fallback(symbol, interval, limit) |
| |
| if not ohlc_data or len(ohlc_data) == 0: |
| logger.debug(f"No OHLC data received for {symbol} {interval}") |
| return 0 |
| |
| |
| saved_count = await save_ohlc_data_to_cache(ohlc_data) |
| |
| if saved_count > 0: |
| logger.debug(f"Saved {saved_count}/{len(ohlc_data)} candles for {symbol} {interval}") |
| return saved_count |
| |
| except Exception as e: |
| logger.error(f"Error fetching OHLC for {symbol} {interval}: {e}") |
| return 0 |
|
|
|
|
| async def ohlc_data_worker_loop(): |
| """ |
| Background worker loop - Fetch REAL OHLC data periodically with multi-source fallback |
| |
| CRITICAL RULES: |
| 1. Run continuously in background |
| 2. Fetch REAL data from multiple sources with automatic fallback |
| 3. Store REAL data in database |
| 4. NEVER generate fake candles as fallback |
| 5. If all sources fail, log error and retry on next iteration |
| """ |
| |
| logger.info("Starting OHLC data background worker with multi-source fallback") |
| logger.info("📊 Data sources: CoinGecko, Kraken, Coinbase, Binance") |
| iteration = 0 |
| |
| while True: |
| try: |
| iteration += 1 |
| start_time = time.time() |
| |
| logger.info(f"[Iteration {iteration}] Fetching REAL OHLC data from multiple sources...") |
| |
| total_saved = 0 |
| total_combinations = len(SYMBOLS) * len(INTERVALS) |
| successful_fetches = 0 |
| |
| |
| for symbol in SYMBOLS: |
| for interval in INTERVALS: |
| try: |
| saved = await fetch_and_cache_ohlc_for_symbol(symbol, interval) |
| total_saved += saved |
| if saved > 0: |
| successful_fetches += 1 |
| |
| |
| await asyncio.sleep(0.5) |
| |
| except Exception as e: |
| logger.error(f"Error processing {symbol} {interval}: {e}") |
| continue |
| |
| elapsed = time.time() - start_time |
| logger.info( |
| f"[Iteration {iteration}] Successfully saved {total_saved} REAL OHLC candles " |
| f"({successful_fetches}/{total_combinations} symbol-intervals) in {elapsed:.2f}s" |
| ) |
| |
| |
| await asyncio.sleep(300) |
| |
| except Exception as e: |
| logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True) |
| |
| await asyncio.sleep(300) |
|
|
|
|
| async def start_ohlc_data_worker(): |
| """ |
| Start OHLC data background worker with multi-source support |
| |
| This should be called during application startup |
| """ |
| try: |
| logger.info("Initializing OHLC data worker with multi-source fallback...") |
| logger.info("📊 Supported sources: CoinGecko, Kraken, Coinbase, Binance") |
| |
| |
| logger.info("Running initial OHLC data fetch...") |
| total_saved = 0 |
| |
| for symbol in SYMBOLS[:5]: |
| for interval in INTERVALS: |
| saved = await fetch_and_cache_ohlc_for_symbol(symbol, interval) |
| total_saved += saved |
| await asyncio.sleep(0.5) |
| |
| logger.info(f"Initial fetch: Saved {total_saved} REAL OHLC candles") |
| |
| |
| asyncio.create_task(ohlc_data_worker_loop()) |
| logger.info("OHLC data worker started successfully") |
| |
| except Exception as e: |
| logger.error(f"Failed to start OHLC data worker: {e}", exc_info=True) |
|
|
|
|
| |
| if __name__ == "__main__": |
| import sys |
| sys.path.append("/workspace") |
| |
| async def test(): |
| """Test the worker with multi-source fallback""" |
| logger.info("Testing OHLC data worker with multi-source fallback...") |
| |
| |
| test_symbols = ["BTC", "ETH"] |
| interval = "1h" |
| |
| for symbol in test_symbols: |
| logger.info(f"\n{'='*60}") |
| logger.info(f"Testing {symbol}") |
| logger.info(f"{'='*60}") |
| |
| data = await fetch_ohlc_with_fallback(symbol, interval, limit=10) |
| logger.info(f"Fetched {len(data)} candles for {symbol} {interval}") |
| |
| if data: |
| |
| logger.info(f"Provider: {data[0].get('provider')}") |
| for candle in data[:3]: |
| logger.info( |
| f" {candle['timestamp']}: O={candle['open']:.2f} " |
| f"H={candle['high']:.2f} L={candle['low']:.2f} C={candle['close']:.2f}" |
| ) |
| |
| |
| saved = await save_ohlc_data_to_cache(data) |
| logger.info(f"Saved {saved} candles to database") |
| else: |
| logger.warning(f"No data retrieved for {symbol}") |
| |
| asyncio.run(test()) |
|
|