| import time | |
| import logging | |
| from typing import Dict, List, Optional | |
| import requests | |
| import numpy as np | |
| import pandas as pd | |
| from config import ( | |
| OKX_BASE_URL, | |
| TIMEFRAME, | |
| CANDLE_LIMIT, | |
| REQUEST_DELAY, | |
| REQUEST_RETRIES, | |
| REQUEST_TIMEOUT, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def _parse_candle(bar: list) -> dict: | |
| return { | |
| "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True), | |
| "open": float(bar[1]), | |
| "high": float(bar[2]), | |
| "low": float(bar[3]), | |
| "close": float(bar[4]), | |
| "volume": float(bar[5]), | |
| "quote_volume": float(bar[6]) if len(bar) > 6 else np.nan, | |
| } | |
| def fetch_ohlcv( | |
| symbol: str, | |
| timeframe: str = TIMEFRAME, | |
| limit: int = CANDLE_LIMIT, | |
| ) -> Optional[pd.DataFrame]: | |
| url = f"{OKX_BASE_URL}/api/v5/market/candles" | |
| params = {"instId": symbol, "bar": timeframe, "limit": str(limit)} | |
| for attempt in range(REQUEST_RETRIES): | |
| try: | |
| resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) | |
| resp.raise_for_status() | |
| payload = resp.json() | |
| if payload.get("code") != "0": | |
| logger.warning(f"{symbol}: API error — {payload.get('msg', 'unknown')}") | |
| return None | |
| raw = payload.get("data") | |
| if not raw: | |
| logger.warning(f"{symbol}: empty data returned") | |
| return None | |
| records = [_parse_candle(bar) for bar in reversed(raw)] | |
| df = pd.DataFrame(records).set_index("timestamp").sort_index() | |
| time.sleep(REQUEST_DELAY) | |
| return df | |
| except requests.exceptions.Timeout: | |
| logger.warning(f"{symbol}: timeout on attempt {attempt + 1}") | |
| except requests.exceptions.RequestException as exc: | |
| logger.warning(f"{symbol}: request error on attempt {attempt + 1}: {exc}") | |
| time.sleep(REQUEST_DELAY * (attempt + 2)) | |
| logger.error(f"{symbol}: all {REQUEST_RETRIES} fetch attempts failed") | |
| return None | |
| def fetch_instruments(inst_type: str = "SPOT") -> List[str]: | |
| url = f"{OKX_BASE_URL}/api/v5/public/instruments" | |
| params = {"instType": inst_type} | |
| try: | |
| resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) | |
| resp.raise_for_status() | |
| payload = resp.json() | |
| if payload.get("code") != "0": | |
| return [] | |
| return [ | |
| item["instId"] | |
| for item in payload.get("data", []) | |
| if item.get("quoteCcy") == "USDT" and item.get("state") == "live" | |
| ] | |
| except requests.exceptions.RequestException as exc: | |
| logger.error(f"Failed to fetch instruments: {exc}") | |
| return [] | |
| def fetch_multiple( | |
| symbols: List[str], | |
| timeframe: str = TIMEFRAME, | |
| limit: int = CANDLE_LIMIT, | |
| min_bars: int = 50, | |
| progress_callback=None, | |
| ) -> Dict[str, pd.DataFrame]: | |
| results: Dict[str, pd.DataFrame] = {} | |
| total = len(symbols) | |
| for i, sym in enumerate(symbols, 1): | |
| if progress_callback: | |
| progress_callback(i, total, sym) | |
| else: | |
| logger.info(f"Fetching {sym} ({i}/{total})") | |
| df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit) | |
| if df is not None and len(df) >= min_bars: | |
| results[sym] = df | |
| else: | |
| logger.warning(f"{sym}: skipped (insufficient data)") | |
| return results | |