GoshawkVortexAI commited on
Commit
cb3d9ec
·
verified ·
1 Parent(s): 3e83bce

Update data_fetcher.py

Browse files
Files changed (1) hide show
  1. data_fetcher.py +59 -37
data_fetcher.py CHANGED
@@ -1,15 +1,24 @@
1
- import requests
2
- import pandas as pd
3
- import numpy as np
4
  import time
5
  import logging
6
- from typing import Optional, List
7
- from config import OKX_BASE_URL, TIMEFRAME, CANDLE_LIMIT
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  logger = logging.getLogger(__name__)
10
 
11
 
12
- def _bar_to_series(bar: list) -> dict:
13
  return {
14
  "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
15
  "open": float(bar[1]),
@@ -25,29 +34,38 @@ def fetch_ohlcv(
25
  symbol: str,
26
  timeframe: str = TIMEFRAME,
27
  limit: int = CANDLE_LIMIT,
28
- retries: int = 3,
29
- sleep_between: float = 0.2,
30
  ) -> Optional[pd.DataFrame]:
31
  url = f"{OKX_BASE_URL}/api/v5/market/candles"
32
  params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}
33
 
34
- for attempt in range(retries):
35
  try:
36
- resp = requests.get(url, params=params, timeout=10)
37
  resp.raise_for_status()
38
- data = resp.json()
39
- if data.get("code") != "0" or not data.get("data"):
40
- logger.warning(f"{symbol}: API error or empty data: {data.get('msg')}")
 
41
  return None
42
- records = [_bar_to_series(bar) for bar in reversed(data["data"])]
43
- df = pd.DataFrame(records)
44
- df.set_index("timestamp", inplace=True)
45
- df.sort_index(inplace=True)
46
- time.sleep(sleep_between)
 
 
 
 
47
  return df
48
- except requests.RequestException as e:
49
- logger.warning(f"{symbol}: Attempt {attempt+1} failed: {e}")
50
- time.sleep(1.0 * (attempt + 1))
 
 
 
 
 
 
51
  return None
52
 
53
 
@@ -55,19 +73,18 @@ def fetch_instruments(inst_type: str = "SPOT") -> List[str]:
55
  url = f"{OKX_BASE_URL}/api/v5/public/instruments"
56
  params = {"instType": inst_type}
57
  try:
58
- resp = requests.get(url, params=params, timeout=10)
59
  resp.raise_for_status()
60
- data = resp.json()
61
- if data.get("code") != "0":
62
  return []
63
- symbols = [
64
  item["instId"]
65
- for item in data["data"]
66
  if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
67
  ]
68
- return symbols
69
- except requests.RequestException as e:
70
- logger.error(f"Failed to fetch instruments: {e}")
71
  return []
72
 
73
 
@@ -75,17 +92,22 @@ def fetch_multiple(
75
  symbols: List[str],
76
  timeframe: str = TIMEFRAME,
77
  limit: int = CANDLE_LIMIT,
78
- max_symbols: Optional[int] = None,
79
- ) -> dict:
80
- if max_symbols:
81
- symbols = symbols[:max_symbols]
82
- results = {}
83
  total = len(symbols)
 
84
  for i, sym in enumerate(symbols, 1):
85
- logger.info(f"Fetching {sym} ({i}/{total})")
 
 
 
 
86
  df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
87
- if df is not None and len(df) >= 50:
88
  results[sym] = df
89
  else:
90
- logger.warning(f"{sym}: insufficient data, skipping")
 
91
  return results
 
 
 
 
1
  import time
2
  import logging
3
+ from typing import Dict, List, Optional
4
+
5
+ import requests
6
+ import numpy as np
7
+ import pandas as pd
8
+
9
+ from config import (
10
+ OKX_BASE_URL,
11
+ TIMEFRAME,
12
+ CANDLE_LIMIT,
13
+ REQUEST_DELAY,
14
+ REQUEST_RETRIES,
15
+ REQUEST_TIMEOUT,
16
+ )
17
 
18
  logger = logging.getLogger(__name__)
19
 
20
 
21
+ def _parse_candle(bar: list) -> dict:
22
  return {
23
  "timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
24
  "open": float(bar[1]),
 
34
  symbol: str,
35
  timeframe: str = TIMEFRAME,
36
  limit: int = CANDLE_LIMIT,
 
 
37
  ) -> Optional[pd.DataFrame]:
38
  url = f"{OKX_BASE_URL}/api/v5/market/candles"
39
  params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}
40
 
41
+ for attempt in range(REQUEST_RETRIES):
42
  try:
43
+ resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
44
  resp.raise_for_status()
45
+ payload = resp.json()
46
+
47
+ if payload.get("code") != "0":
48
+ logger.warning(f"{symbol}: API error — {payload.get('msg', 'unknown')}")
49
  return None
50
+
51
+ raw = payload.get("data")
52
+ if not raw:
53
+ logger.warning(f"{symbol}: empty data returned")
54
+ return None
55
+
56
+ records = [_parse_candle(bar) for bar in reversed(raw)]
57
+ df = pd.DataFrame(records).set_index("timestamp").sort_index()
58
+ time.sleep(REQUEST_DELAY)
59
  return df
60
+
61
+ except requests.exceptions.Timeout:
62
+ logger.warning(f"{symbol}: timeout on attempt {attempt + 1}")
63
+ except requests.exceptions.RequestException as exc:
64
+ logger.warning(f"{symbol}: request error on attempt {attempt + 1}: {exc}")
65
+
66
+ time.sleep(REQUEST_DELAY * (attempt + 2))
67
+
68
+ logger.error(f"{symbol}: all {REQUEST_RETRIES} fetch attempts failed")
69
  return None
70
 
71
 
 
73
  url = f"{OKX_BASE_URL}/api/v5/public/instruments"
74
  params = {"instType": inst_type}
75
  try:
76
+ resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
77
  resp.raise_for_status()
78
+ payload = resp.json()
79
+ if payload.get("code") != "0":
80
  return []
81
+ return [
82
  item["instId"]
83
+ for item in payload.get("data", [])
84
  if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
85
  ]
86
+ except requests.exceptions.RequestException as exc:
87
+ logger.error(f"Failed to fetch instruments: {exc}")
 
88
  return []
89
 
90
 
 
92
  symbols: List[str],
93
  timeframe: str = TIMEFRAME,
94
  limit: int = CANDLE_LIMIT,
95
+ min_bars: int = 50,
96
+ progress_callback=None,
97
+ ) -> Dict[str, pd.DataFrame]:
98
+ results: Dict[str, pd.DataFrame] = {}
 
99
  total = len(symbols)
100
+
101
  for i, sym in enumerate(symbols, 1):
102
+ if progress_callback:
103
+ progress_callback(i, total, sym)
104
+ else:
105
+ logger.info(f"Fetching {sym} ({i}/{total})")
106
+
107
  df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
108
+ if df is not None and len(df) >= min_bars:
109
  results[sym] = df
110
  else:
111
+ logger.warning(f"{sym}: skipped (insufficient data)")
112
+
113
  return results