File size: 3,401 Bytes
939baa1
 
cb3d9ec
 
 
 
 
 
 
 
 
 
 
 
 
 
939baa1
 
 
 
cb3d9ec
939baa1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb3d9ec
939baa1
cb3d9ec
939baa1
cb3d9ec
 
 
 
939baa1
cb3d9ec
 
 
 
 
 
 
 
 
939baa1
cb3d9ec
 
 
 
 
 
 
 
 
939baa1
 
 
 
 
 
 
cb3d9ec
939baa1
cb3d9ec
 
939baa1
cb3d9ec
939baa1
cb3d9ec
939baa1
 
cb3d9ec
 
939baa1
 
 
 
 
 
 
cb3d9ec
 
 
 
939baa1
cb3d9ec
939baa1
cb3d9ec
 
 
 
 
939baa1
cb3d9ec
939baa1
 
cb3d9ec
 
939baa1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import time
import logging
from typing import Dict, List, Optional

import requests
import numpy as np
import pandas as pd

from config import (
    OKX_BASE_URL,
    TIMEFRAME,
    CANDLE_LIMIT,
    REQUEST_DELAY,
    REQUEST_RETRIES,
    REQUEST_TIMEOUT,
)

logger = logging.getLogger(__name__)


def _parse_candle(bar: list) -> dict:
    return {
        "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
        "open": float(bar[1]),
        "high": float(bar[2]),
        "low": float(bar[3]),
        "close": float(bar[4]),
        "volume": float(bar[5]),
        "quote_volume": float(bar[6]) if len(bar) > 6 else np.nan,
    }


def fetch_ohlcv(
    symbol: str,
    timeframe: str = TIMEFRAME,
    limit: int = CANDLE_LIMIT,
) -> Optional[pd.DataFrame]:
    url = f"{OKX_BASE_URL}/api/v5/market/candles"
    params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}

    for attempt in range(REQUEST_RETRIES):
        try:
            resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
            resp.raise_for_status()
            payload = resp.json()

            if payload.get("code") != "0":
                logger.warning(f"{symbol}: API error — {payload.get('msg', 'unknown')}")
                return None

            raw = payload.get("data")
            if not raw:
                logger.warning(f"{symbol}: empty data returned")
                return None

            records = [_parse_candle(bar) for bar in reversed(raw)]
            df = pd.DataFrame(records).set_index("timestamp").sort_index()
            time.sleep(REQUEST_DELAY)
            return df

        except requests.exceptions.Timeout:
            logger.warning(f"{symbol}: timeout on attempt {attempt + 1}")
        except requests.exceptions.RequestException as exc:
            logger.warning(f"{symbol}: request error on attempt {attempt + 1}: {exc}")

        time.sleep(REQUEST_DELAY * (attempt + 2))

    logger.error(f"{symbol}: all {REQUEST_RETRIES} fetch attempts failed")
    return None


def fetch_instruments(inst_type: str = "SPOT") -> List[str]:
    url = f"{OKX_BASE_URL}/api/v5/public/instruments"
    params = {"instType": inst_type}
    try:
        resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
        resp.raise_for_status()
        payload = resp.json()
        if payload.get("code") != "0":
            return []
        return [
            item["instId"]
            for item in payload.get("data", [])
            if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
        ]
    except requests.exceptions.RequestException as exc:
        logger.error(f"Failed to fetch instruments: {exc}")
        return []


def fetch_multiple(
    symbols: List[str],
    timeframe: str = TIMEFRAME,
    limit: int = CANDLE_LIMIT,
    min_bars: int = 50,
    progress_callback=None,
) -> Dict[str, pd.DataFrame]:
    results: Dict[str, pd.DataFrame] = {}
    total = len(symbols)

    for i, sym in enumerate(symbols, 1):
        if progress_callback:
            progress_callback(i, total, sym)
        else:
            logger.info(f"Fetching {sym} ({i}/{total})")

        df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
        if df is not None and len(df) >= min_bars:
            results[sym] = df
        else:
            logger.warning(f"{sym}: skipped (insufficient data)")

    return results