import time import logging from typing import Dict, List, Optional import requests import numpy as np import pandas as pd from config import ( OKX_BASE_URL, TIMEFRAME, CANDLE_LIMIT, REQUEST_DELAY, REQUEST_RETRIES, REQUEST_TIMEOUT, ) logger = logging.getLogger(__name__) def _parse_candle(bar: list) -> dict: return { "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True), "open": float(bar[1]), "high": float(bar[2]), "low": float(bar[3]), "close": float(bar[4]), "volume": float(bar[5]), "quote_volume": float(bar[6]) if len(bar) > 6 else np.nan, } def fetch_ohlcv( symbol: str, timeframe: str = TIMEFRAME, limit: int = CANDLE_LIMIT, ) -> Optional[pd.DataFrame]: url = f"{OKX_BASE_URL}/api/v5/market/candles" params = {"instId": symbol, "bar": timeframe, "limit": str(limit)} for attempt in range(REQUEST_RETRIES): try: resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) resp.raise_for_status() payload = resp.json() if payload.get("code") != "0": logger.warning(f"{symbol}: API error — {payload.get('msg', 'unknown')}") return None raw = payload.get("data") if not raw: logger.warning(f"{symbol}: empty data returned") return None records = [_parse_candle(bar) for bar in reversed(raw)] df = pd.DataFrame(records).set_index("timestamp").sort_index() time.sleep(REQUEST_DELAY) return df except requests.exceptions.Timeout: logger.warning(f"{symbol}: timeout on attempt {attempt + 1}") except requests.exceptions.RequestException as exc: logger.warning(f"{symbol}: request error on attempt {attempt + 1}: {exc}") time.sleep(REQUEST_DELAY * (attempt + 2)) logger.error(f"{symbol}: all {REQUEST_RETRIES} fetch attempts failed") return None def fetch_instruments(inst_type: str = "SPOT") -> List[str]: url = f"{OKX_BASE_URL}/api/v5/public/instruments" params = {"instType": inst_type} try: resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) resp.raise_for_status() payload = resp.json() if payload.get("code") != "0": return [] return [ item["instId"] for item in payload.get("data", []) if item.get("quoteCcy") == "USDT" and item.get("state") == "live" ] except requests.exceptions.RequestException as exc: logger.error(f"Failed to fetch instruments: {exc}") return [] def fetch_multiple( symbols: List[str], timeframe: str = TIMEFRAME, limit: int = CANDLE_LIMIT, min_bars: int = 50, progress_callback=None, ) -> Dict[str, pd.DataFrame]: results: Dict[str, pd.DataFrame] = {} total = len(symbols) for i, sym in enumerate(symbols, 1): if progress_callback: progress_callback(i, total, sym) else: logger.info(f"Fetching {sym} ({i}/{total})") df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit) if df is not None and len(df) >= min_bars: results[sym] = df else: logger.warning(f"{sym}: skipped (insufficient data)") return results