GoshawkVortexAI commited on
Commit
939baa1
·
verified ·
1 Parent(s): 79d5764

Update data_fetcher.py

Browse files
Files changed (1) hide show
  1. data_fetcher.py +91 -0
data_fetcher.py CHANGED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import pandas as pd
3
+ import numpy as np
4
+ import time
5
+ import logging
6
+ from typing import Optional, List
7
+ from config import OKX_BASE_URL, TIMEFRAME, CANDLE_LIMIT
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ def _bar_to_series(bar: list) -> dict:
13
+ return {
14
+ "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
15
+ "open": float(bar[1]),
16
+ "high": float(bar[2]),
17
+ "low": float(bar[3]),
18
+ "close": float(bar[4]),
19
+ "volume": float(bar[5]),
20
+ "quote_volume": float(bar[6]) if len(bar) > 6 else np.nan,
21
+ }
22
+
23
+
24
+ def fetch_ohlcv(
25
+ symbol: str,
26
+ timeframe: str = TIMEFRAME,
27
+ limit: int = CANDLE_LIMIT,
28
+ retries: int = 3,
29
+ sleep_between: float = 0.2,
30
+ ) -> Optional[pd.DataFrame]:
31
+ url = f"{OKX_BASE_URL}/api/v5/market/candles"
32
+ params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}
33
+
34
+ for attempt in range(retries):
35
+ try:
36
+ resp = requests.get(url, params=params, timeout=10)
37
+ resp.raise_for_status()
38
+ data = resp.json()
39
+ if data.get("code") != "0" or not data.get("data"):
40
+ logger.warning(f"{symbol}: API error or empty data: {data.get('msg')}")
41
+ return None
42
+ records = [_bar_to_series(bar) for bar in reversed(data["data"])]
43
+ df = pd.DataFrame(records)
44
+ df.set_index("timestamp", inplace=True)
45
+ df.sort_index(inplace=True)
46
+ time.sleep(sleep_between)
47
+ return df
48
+ except requests.RequestException as e:
49
+ logger.warning(f"{symbol}: Attempt {attempt+1} failed: {e}")
50
+ time.sleep(1.0 * (attempt + 1))
51
+ return None
52
+
53
+
54
+ def fetch_instruments(inst_type: str = "SPOT") -> List[str]:
55
+ url = f"{OKX_BASE_URL}/api/v5/public/instruments"
56
+ params = {"instType": inst_type}
57
+ try:
58
+ resp = requests.get(url, params=params, timeout=10)
59
+ resp.raise_for_status()
60
+ data = resp.json()
61
+ if data.get("code") != "0":
62
+ return []
63
+ symbols = [
64
+ item["instId"]
65
+ for item in data["data"]
66
+ if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
67
+ ]
68
+ return symbols
69
+ except requests.RequestException as e:
70
+ logger.error(f"Failed to fetch instruments: {e}")
71
+ return []
72
+
73
+
74
+ def fetch_multiple(
75
+ symbols: List[str],
76
+ timeframe: str = TIMEFRAME,
77
+ limit: int = CANDLE_LIMIT,
78
+ max_symbols: Optional[int] = None,
79
+ ) -> dict:
80
+ if max_symbols:
81
+ symbols = symbols[:max_symbols]
82
+ results = {}
83
+ total = len(symbols)
84
+ for i, sym in enumerate(symbols, 1):
85
+ logger.info(f"Fetching {sym} ({i}/{total})")
86
+ df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
87
+ if df is not None and len(df) >= 50:
88
+ results[sym] = df
89
+ else:
90
+ logger.warning(f"{sym}: insufficient data, skipping")
91
+ return results