File size: 3,401 Bytes
939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 cb3d9ec 939baa1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | import time
import logging
from typing import Dict, List, Optional
import requests
import numpy as np
import pandas as pd
from config import (
OKX_BASE_URL,
TIMEFRAME,
CANDLE_LIMIT,
REQUEST_DELAY,
REQUEST_RETRIES,
REQUEST_TIMEOUT,
)
logger = logging.getLogger(__name__)
def _parse_candle(bar: list) -> dict:
return {
"timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
"quote_volume": float(bar[6]) if len(bar) > 6 else np.nan,
}
def fetch_ohlcv(
symbol: str,
timeframe: str = TIMEFRAME,
limit: int = CANDLE_LIMIT,
) -> Optional[pd.DataFrame]:
url = f"{OKX_BASE_URL}/api/v5/market/candles"
params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}
for attempt in range(REQUEST_RETRIES):
try:
resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
payload = resp.json()
if payload.get("code") != "0":
logger.warning(f"{symbol}: API error — {payload.get('msg', 'unknown')}")
return None
raw = payload.get("data")
if not raw:
logger.warning(f"{symbol}: empty data returned")
return None
records = [_parse_candle(bar) for bar in reversed(raw)]
df = pd.DataFrame(records).set_index("timestamp").sort_index()
time.sleep(REQUEST_DELAY)
return df
except requests.exceptions.Timeout:
logger.warning(f"{symbol}: timeout on attempt {attempt + 1}")
except requests.exceptions.RequestException as exc:
logger.warning(f"{symbol}: request error on attempt {attempt + 1}: {exc}")
time.sleep(REQUEST_DELAY * (attempt + 2))
logger.error(f"{symbol}: all {REQUEST_RETRIES} fetch attempts failed")
return None
def fetch_instruments(inst_type: str = "SPOT") -> List[str]:
url = f"{OKX_BASE_URL}/api/v5/public/instruments"
params = {"instType": inst_type}
try:
resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
payload = resp.json()
if payload.get("code") != "0":
return []
return [
item["instId"]
for item in payload.get("data", [])
if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
]
except requests.exceptions.RequestException as exc:
logger.error(f"Failed to fetch instruments: {exc}")
return []
def fetch_multiple(
symbols: List[str],
timeframe: str = TIMEFRAME,
limit: int = CANDLE_LIMIT,
min_bars: int = 50,
progress_callback=None,
) -> Dict[str, pd.DataFrame]:
results: Dict[str, pd.DataFrame] = {}
total = len(symbols)
for i, sym in enumerate(symbols, 1):
if progress_callback:
progress_callback(i, total, sym)
else:
logger.info(f"Fetching {sym} ({i}/{total})")
df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
if df is not None and len(df) >= min_bars:
results[sym] = df
else:
logger.warning(f"{sym}: skipped (insufficient data)")
return results
|