Goshawk_Hedge_Pro / data_fetcher.py
GoshawkVortexAI's picture
Update data_fetcher.py
cb3d9ec verified
import time
import logging
from typing import Dict, List, Optional
import requests
import numpy as np
import pandas as pd
from config import (
OKX_BASE_URL,
TIMEFRAME,
CANDLE_LIMIT,
REQUEST_DELAY,
REQUEST_RETRIES,
REQUEST_TIMEOUT,
)
logger = logging.getLogger(__name__)
def _parse_candle(bar: list) -> dict:
return {
"timestamp": pd.to_datetime(int(bar[0]), unit="ms", utc=True),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
"quote_volume": float(bar[6]) if len(bar) > 6 else np.nan,
}
def fetch_ohlcv(
symbol: str,
timeframe: str = TIMEFRAME,
limit: int = CANDLE_LIMIT,
) -> Optional[pd.DataFrame]:
url = f"{OKX_BASE_URL}/api/v5/market/candles"
params = {"instId": symbol, "bar": timeframe, "limit": str(limit)}
for attempt in range(REQUEST_RETRIES):
try:
resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
payload = resp.json()
if payload.get("code") != "0":
logger.warning(f"{symbol}: API error — {payload.get('msg', 'unknown')}")
return None
raw = payload.get("data")
if not raw:
logger.warning(f"{symbol}: empty data returned")
return None
records = [_parse_candle(bar) for bar in reversed(raw)]
df = pd.DataFrame(records).set_index("timestamp").sort_index()
time.sleep(REQUEST_DELAY)
return df
except requests.exceptions.Timeout:
logger.warning(f"{symbol}: timeout on attempt {attempt + 1}")
except requests.exceptions.RequestException as exc:
logger.warning(f"{symbol}: request error on attempt {attempt + 1}: {exc}")
time.sleep(REQUEST_DELAY * (attempt + 2))
logger.error(f"{symbol}: all {REQUEST_RETRIES} fetch attempts failed")
return None
def fetch_instruments(inst_type: str = "SPOT") -> List[str]:
url = f"{OKX_BASE_URL}/api/v5/public/instruments"
params = {"instType": inst_type}
try:
resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
payload = resp.json()
if payload.get("code") != "0":
return []
return [
item["instId"]
for item in payload.get("data", [])
if item.get("quoteCcy") == "USDT" and item.get("state") == "live"
]
except requests.exceptions.RequestException as exc:
logger.error(f"Failed to fetch instruments: {exc}")
return []
def fetch_multiple(
symbols: List[str],
timeframe: str = TIMEFRAME,
limit: int = CANDLE_LIMIT,
min_bars: int = 50,
progress_callback=None,
) -> Dict[str, pd.DataFrame]:
results: Dict[str, pd.DataFrame] = {}
total = len(symbols)
for i, sym in enumerate(symbols, 1):
if progress_callback:
progress_callback(i, total, sym)
else:
logger.info(f"Fetching {sym} ({i}/{total})")
df = fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
if df is not None and len(df) >= min_bars:
results[sym] = df
else:
logger.warning(f"{sym}: skipped (insufficient data)")
return results