eth-arb-trainer / features_orderbook.py
commanderzee's picture
fix: numpy 2.4 float(1d-array) in _walked_ask_cost
63361af verified
"""
Orderbook feature extraction for ARB-MAX. 80 features, hardcoded order.
Column convention (produced by data_loader.build_window_frame):
pm_up_bid_px_{1..5}, pm_up_bid_sz_{1..5},
pm_up_ask_px_{1..5}, pm_up_ask_sz_{1..5},
pm_dn_bid_px_{1..5}, pm_dn_bid_sz_{1..5},
pm_dn_ask_px_{1..5}, pm_dn_ask_sz_{1..5}
"""
from __future__ import annotations
from typing import List
import numpy as np
import pandas as pd
_LAGS = [30, 60, 180]
def _build_feature_names() -> List[str]:
names: List[str] = []
# --- Immediate per side (18 = 9 * 2) ---
for side in ("up", "dn"):
names.append(f"{side}_best_bid")
names.append(f"{side}_best_ask")
names.append(f"{side}_mid")
names.append(f"{side}_spread")
names.append(f"{side}_bid_sum_L1_L5")
names.append(f"{side}_ask_sum_L1_L5")
names.append(f"{side}_L1_imb")
names.append(f"{side}_L1_L5_w_imb")
names.append(f"{side}_walked_cost_500")
# --- Time-lagged per side (24 = 12 * 2) ---
for side in ("up", "dn"):
for lag in _LAGS:
names.append(f"{side}_ask_t_minus_{lag}s")
for lag in _LAGS:
names.append(f"{side}_ask_mean_{lag}s")
for lag in _LAGS:
names.append(f"{side}_ask_std_{lag}s")
for lag in _LAGS:
names.append(f"{side}_cum_vol_best_{lag}s")
# --- Cross-side (7) ---
names.append("cross_up_ask_plus_dn_ask")
names.append("cross_min_combined_60s")
names.append("cross_min_combined_180s")
names.append("cross_min_combined_600s")
names.append("cross_combined_pct_rank_in_window")
names.append("cross_corr_up_dn_ask_180s")
names.append("cross_mom_mismatch_60s")
# So far 18 + 24 + 7 = 49
# --- Padded derived features (target total = 80) ---
# Level-depth ratios per side (5 levels-2) * 2 sides = 8? Use 10
for side in ("up", "dn"):
for lvl in range(1, 6):
names.append(f"{side}_bid_sz_lvl{lvl}_frac")
# +10 => 59
for side in ("up", "dn"):
for lvl in range(1, 6):
names.append(f"{side}_ask_sz_lvl{lvl}_frac")
# +10 => 69
# Mid-price velocity per side at 3 lags (6)
for side in ("up", "dn"):
for lag in _LAGS:
names.append(f"{side}_mid_ret_{lag}s")
# +6 => 75
# Spread stats per side (2 sides * 2 = 4)
for side in ("up", "dn"):
names.append(f"{side}_spread_mean_60s")
names.append(f"{side}_spread_std_60s")
# +4 => 79
# one more: cross-side mid sum
names.append("cross_mid_up_plus_dn")
# +1 => 80
return names
FEATURE_NAMES: List[str] = _build_feature_names()
assert len(FEATURE_NAMES) == 80, f"expected 80, got {len(FEATURE_NAMES)}"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _col(df: pd.DataFrame, name: str) -> np.ndarray:
if name in df.columns:
a = df[name].to_numpy(dtype=np.float64)
else:
a = np.full(len(df), np.nan, dtype=np.float64)
return a
def _ff(a: np.ndarray) -> np.ndarray:
out = a.copy()
last = np.nan
for i, v in enumerate(out):
if np.isfinite(v):
last = v
else:
out[i] = last
# backfill any leading NaNs with first finite
if not np.isfinite(out[0]):
first = np.nan
for v in out:
if np.isfinite(v):
first = v
break
if np.isfinite(first):
for i in range(len(out)):
if np.isfinite(out[i]):
break
out[i] = first
return np.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0)
def _walked_ask_cost(
ask_px, ask_sz, notional: float
) -> float:
"""Cost per share to buy $notional walking the ask levels at this tick.
ask_px[i] / ask_sz[i] are per-level scalars (price / size at this tick).
Accepts numpy scalars, python floats, or 0-d arrays. Returns effective
price per share in [0, 1]. Penalize with best*1.02 if thin.
"""
def _as_scalar(v):
# tolerate numpy 0-d arrays, 1-elem 1-d arrays, scalars, or None
try:
a = np.asarray(v, dtype=np.float64)
if a.ndim == 0:
return float(a)
return float(a.reshape(-1)[0])
except Exception:
return float("nan")
n_levels = len(ask_px)
filled_shares = 0.0
total_cost = 0.0
for i in range(n_levels):
p = _as_scalar(ask_px[i])
s = _as_scalar(ask_sz[i])
if not np.isfinite(p) or not np.isfinite(s) or p <= 0 or s <= 0:
continue
remaining_dollars = notional - total_cost
if remaining_dollars <= 0:
break
dollars_this = p * s
if dollars_this >= remaining_dollars:
shares_this = remaining_dollars / p
total_cost += shares_this * p
filled_shares += shares_this
break
total_cost += dollars_this
filled_shares += s
if filled_shares > 0 and total_cost >= notional * 0.99:
return total_cost / filled_shares
best = _as_scalar(ask_px[0]) if n_levels else 1.0
if not np.isfinite(best) or best <= 0:
best = 1.0
return min(1.0, best * 1.02)
# ---------------------------------------------------------------------------
def extract(window_frame: pd.DataFrame, at_tick: int = 120) -> np.ndarray:
df = window_frame.iloc[: at_tick + 1].copy()
n = len(df)
# Load all side/level arrays forward-filled for the window so far
sides = ("up", "dn")
levels = range(1, 6)
series: dict = {}
for side in sides:
for lvl in levels:
series[f"{side}_bid_px_{lvl}"] = _ff(_col(df, f"pm_{side}_bid_px_{lvl}"))
series[f"{side}_bid_sz_{lvl}"] = _ff(_col(df, f"pm_{side}_bid_sz_{lvl}"))
series[f"{side}_ask_px_{lvl}"] = _ff(_col(df, f"pm_{side}_ask_px_{lvl}"))
series[f"{side}_ask_sz_{lvl}"] = _ff(_col(df, f"pm_{side}_ask_sz_{lvl}"))
out: List[float] = []
# --- Immediate per side (18) ---
for side in sides:
best_bid = series[f"{side}_bid_px_1"][-1]
best_ask = series[f"{side}_ask_px_1"][-1]
mid = (best_bid + best_ask) / 2.0 if (best_bid > 0 and best_ask > 0) else 0.0
spread = (best_ask - best_bid) if (best_ask > 0 and best_bid > 0) else 0.0
bid_sum = sum(series[f"{side}_bid_sz_{l}"][-1] for l in levels)
ask_sum = sum(series[f"{side}_ask_sz_{l}"][-1] for l in levels)
b1 = series[f"{side}_bid_sz_1"][-1]
a1 = series[f"{side}_ask_sz_1"][-1]
l1_imb = (b1 - a1) / (b1 + a1) if (b1 + a1) > 0 else 0.0
# Weighted imbalance: higher levels weighted less
weights = np.array([5, 4, 3, 2, 1], dtype=np.float64)
bsum = sum(
weights[i - 1] * series[f"{side}_bid_sz_{i}"][-1] for i in levels
)
asum = sum(
weights[i - 1] * series[f"{side}_ask_sz_{i}"][-1] for i in levels
)
w_imb = (bsum - asum) / (bsum + asum) if (bsum + asum) > 0 else 0.0
px_levels = [series[f"{side}_ask_px_{l}"][-1] for l in levels]
sz_levels = [series[f"{side}_ask_sz_{l}"][-1] for l in levels]
walked = _walked_ask_cost(px_levels, sz_levels, 500.0)
out.extend([best_bid, best_ask, mid, spread, bid_sum, ask_sum, l1_imb, w_imb, walked])
# --- Time-lagged per side (24) ---
for side in sides:
ask1 = series[f"{side}_ask_px_1"]
# ask at t-lag
for lag in _LAGS:
idx = max(0, n - 1 - lag)
out.append(float(ask1[idx]))
# rolling mean over last lag seconds
for lag in _LAGS:
w = ask1[-lag:] if n >= lag else ask1
out.append(float(np.nanmean(w)) if len(w) else 0.0)
# rolling std
for lag in _LAGS:
w = ask1[-lag:] if n >= lag else ask1
out.append(float(np.nanstd(w)) if len(w) > 1 else 0.0)
# cumulative traded volume proxy — use OHLCV volume summed over lag
volume = _col(df, "volume")
volume = np.where(np.isfinite(volume), volume, 0.0)
for lag in _LAGS:
w = volume[-lag:] if n >= lag else volume
out.append(float(np.sum(w)))
# --- Cross-side (7) ---
up_ask = series["up_ask_px_1"]
dn_ask = series["dn_ask_px_1"]
combined = up_ask + dn_ask
out.append(float(combined[-1]))
for lag in (60, 180, 600):
w = combined[-lag:] if n >= lag else combined
out.append(float(np.nanmin(w)) if len(w) else 0.0)
# percentile rank of latest combined within window so far
if len(combined) > 1:
latest = combined[-1]
out.append(float((combined <= latest).mean()))
else:
out.append(0.5)
# corr over last 180s
if n >= 10:
w_u = up_ask[-180:] if n >= 180 else up_ask
w_d = dn_ask[-180:] if n >= 180 else dn_ask
if w_u.std() > 0 and w_d.std() > 0:
out.append(float(np.corrcoef(w_u, w_d)[0, 1]))
else:
out.append(0.0)
else:
out.append(0.0)
# momentum mismatch: up_ret_60s - (-dn_ret_60s) = up_ret_60s + dn_ret_60s
def _ret60(a):
if len(a) < 61 or a[-61] <= 0:
return 0.0
return float(a[-1] / a[-61] - 1.0)
out.append(_ret60(up_ask) + _ret60(dn_ask))
# --- Padded (31) ---
# bid_sz_lvl{l}_frac per side (10)
for side in sides:
total = sum(series[f"{side}_bid_sz_{l}"][-1] for l in levels)
for lvl in levels:
v = series[f"{side}_bid_sz_{lvl}"][-1]
out.append(v / total if total > 0 else 0.0)
# ask_sz_lvl frac per side (10)
for side in sides:
total = sum(series[f"{side}_ask_sz_{l}"][-1] for l in levels)
for lvl in levels:
v = series[f"{side}_ask_sz_{lvl}"][-1]
out.append(v / total if total > 0 else 0.0)
# mid ret at 3 lags per side (6)
for side in sides:
best_bid_s = series[f"{side}_bid_px_1"]
best_ask_s = series[f"{side}_ask_px_1"]
mid_s = (best_bid_s + best_ask_s) / 2.0
for lag in _LAGS:
if n > lag and mid_s[-lag - 1] > 0:
out.append(float(mid_s[-1] / mid_s[-lag - 1] - 1.0))
else:
out.append(0.0)
# spread stats per side (4)
for side in sides:
best_bid_s = series[f"{side}_bid_px_1"]
best_ask_s = series[f"{side}_ask_px_1"]
spr = best_ask_s - best_bid_s
w = spr[-60:] if n >= 60 else spr
out.append(float(np.nanmean(w)) if len(w) else 0.0)
out.append(float(np.nanstd(w)) if len(w) > 1 else 0.0)
# cross mid sum (1)
up_mid = (series["up_bid_px_1"][-1] + series["up_ask_px_1"][-1]) / 2.0
dn_mid = (series["dn_bid_px_1"][-1] + series["dn_ask_px_1"][-1]) / 2.0
out.append(float(up_mid + dn_mid))
arr = np.asarray(out, dtype=np.float64)
assert arr.shape[0] == 80, f"produced {arr.shape[0]} features, expected 80"
arr = np.where(np.isfinite(arr), arr, 0.0).astype(np.float32)
return arr
__all__ = ["FEATURE_NAMES", "extract"]