Goshawk_Hedge_Pro / data_fetcher.py
GoshawkVortexAI's picture
Update data_fetcher.py
939baa1 verified
raw
history blame
2.93 kB
import requests
import pandas as pd
import numpy as np
import time
import logging
from typing import Optional, List
from config import OKX_BASE_URL, TIMEFRAME, CANDLE_LIMIT
logger = logging.getLogger(__name__)
def _bar_to_series(bar: list) -> dict:
return {
"timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
"quote_volume": float(bar[6]) if len(bar) > 6 else np.nan,
}
def fetch_ohlcv(
symbol: str,
timeframe: str = TIMEFRAME,
limit: int = CANDLE_LIMIT,
retries: int = 3,
sleep_between: float = 0.2,
) -> Optional[pd.DataFrame]:
url = f"{OKX_BASE_URL}/api/v5/market/candles"
params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}
for attempt in range(retries):
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("code") != "0" or not data.get("data"):
logger.warning(f"{symbol}: API error or empty data: {data.get('msg')}")
return None
records = [_bar_to_series(bar) for bar in reversed(data["data"])]
df = pd.DataFrame(records)
df.set_index("timestamp", inplace=True)
df.sort_index(inplace=True)
time.sleep(sleep_between)
return df
except requests.RequestException as e:
logger.warning(f"{symbol}: Attempt {attempt+1} failed: {e}")
time.sleep(1.0 * (attempt + 1))
return None
def fetch_instruments(inst_type: str = "SPOT") -> List[str]:
url = f"{OKX_BASE_URL}/api/v5/public/instruments"
params = {"instType": inst_type}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("code") != "0":
return []
symbols = [
item["instId"]
for item in data["data"]
if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
]
return symbols
except requests.RequestException as e:
logger.error(f"Failed to fetch instruments: {e}")
return []
def fetch_multiple(
symbols: List[str],
timeframe: str = TIMEFRAME,
limit: int = CANDLE_LIMIT,
max_symbols: Optional[int] = None,
) -> dict:
if max_symbols:
symbols = symbols[:max_symbols]
results = {}
total = len(symbols)
for i, sym in enumerate(symbols, 1):
logger.info(f"Fetching {sym} ({i}/{total})")
df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
if df is not None and len(df) >= 50:
results[sym] = df
else:
logger.warning(f"{sym}: insufficient data, skipping")
return results