Spaces:
Sleeping
Sleeping
| import requests | |
| import pandas as pd | |
| import numpy as np | |
| import time | |
| import logging | |
| from typing import Optional, List | |
| from config import OKX_BASE_URL, TIMEFRAME, CANDLE_LIMIT | |
| logger = logging.getLogger(__name__) | |
| def _bar_to_series(bar: list) -> dict: | |
| return { | |
| "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True), | |
| "open": float(bar[1]), | |
| "high": float(bar[2]), | |
| "low": float(bar[3]), | |
| "close": float(bar[4]), | |
| "volume": float(bar[5]), | |
| "quote_volume": float(bar[6]) if len(bar) > 6 else np.nan, | |
| } | |
| def fetch_ohlcv( | |
| symbol: str, | |
| timeframe: str = TIMEFRAME, | |
| limit: int = CANDLE_LIMIT, | |
| retries: int = 3, | |
| sleep_between: float = 0.2, | |
| ) -> Optional[pd.DataFrame]: | |
| url = f"{OKX_BASE_URL}/api/v5/market/candles" | |
| params = {"instId": symbol, "bar": timeframe, "limit": str(limit)} | |
| for attempt in range(retries): | |
| try: | |
| resp = requests.get(url, params=params, timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("code") != "0" or not data.get("data"): | |
| logger.warning(f"{symbol}: API error or empty data: {data.get('msg')}") | |
| return None | |
| records = [_bar_to_series(bar) for bar in reversed(data["data"])] | |
| df = pd.DataFrame(records) | |
| df.set_index("timestamp", inplace=True) | |
| df.sort_index(inplace=True) | |
| time.sleep(sleep_between) | |
| return df | |
| except requests.RequestException as e: | |
| logger.warning(f"{symbol}: Attempt {attempt+1} failed: {e}") | |
| time.sleep(1.0 * (attempt + 1)) | |
| return None | |
| def fetch_instruments(inst_type: str = "SPOT") -> List[str]: | |
| url = f"{OKX_BASE_URL}/api/v5/public/instruments" | |
| params = {"instType": inst_type} | |
| try: | |
| resp = requests.get(url, params=params, timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("code") != "0": | |
| return [] | |
| symbols = [ | |
| item["instId"] | |
| for item in data["data"] | |
| if item.get("quoteCcy") == "USDT" and item.get("state") == "live" | |
| ] | |
| return symbols | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to fetch instruments: {e}") | |
| return [] | |
| def fetch_multiple( | |
| symbols: List[str], | |
| timeframe: str = TIMEFRAME, | |
| limit: int = CANDLE_LIMIT, | |
| max_symbols: Optional[int] = None, | |
| ) -> dict: | |
| if max_symbols: | |
| symbols = symbols[:max_symbols] | |
| results = {} | |
| total = len(symbols) | |
| for i, sym in enumerate(symbols, 1): | |
| logger.info(f"Fetching {sym} ({i}/{total})") | |
| df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit) | |
| if df is not None and len(df) >= 50: | |
| results[sym] = df | |
| else: | |
| logger.warning(f"{sym}: insufficient data, skipping") | |
| return results | |