Spaces:
Sleeping
Sleeping
| """ | |
| Orderbook feature extraction for ARB-MAX. 80 features, hardcoded order. | |
| Column convention (produced by data_loader.build_window_frame): | |
| pm_up_bid_px_{1..5}, pm_up_bid_sz_{1..5}, | |
| pm_up_ask_px_{1..5}, pm_up_ask_sz_{1..5}, | |
| pm_dn_bid_px_{1..5}, pm_dn_bid_sz_{1..5}, | |
| pm_dn_ask_px_{1..5}, pm_dn_ask_sz_{1..5} | |
| """ | |
| from __future__ import annotations | |
| from typing import List | |
| import numpy as np | |
| import pandas as pd | |
| _LAGS = [30, 60, 180] | |
| def _build_feature_names() -> List[str]: | |
| names: List[str] = [] | |
| # --- Immediate per side (18 = 9 * 2) --- | |
| for side in ("up", "dn"): | |
| names.append(f"{side}_best_bid") | |
| names.append(f"{side}_best_ask") | |
| names.append(f"{side}_mid") | |
| names.append(f"{side}_spread") | |
| names.append(f"{side}_bid_sum_L1_L5") | |
| names.append(f"{side}_ask_sum_L1_L5") | |
| names.append(f"{side}_L1_imb") | |
| names.append(f"{side}_L1_L5_w_imb") | |
| names.append(f"{side}_walked_cost_500") | |
| # --- Time-lagged per side (24 = 12 * 2) --- | |
| for side in ("up", "dn"): | |
| for lag in _LAGS: | |
| names.append(f"{side}_ask_t_minus_{lag}s") | |
| for lag in _LAGS: | |
| names.append(f"{side}_ask_mean_{lag}s") | |
| for lag in _LAGS: | |
| names.append(f"{side}_ask_std_{lag}s") | |
| for lag in _LAGS: | |
| names.append(f"{side}_cum_vol_best_{lag}s") | |
| # --- Cross-side (7) --- | |
| names.append("cross_up_ask_plus_dn_ask") | |
| names.append("cross_min_combined_60s") | |
| names.append("cross_min_combined_180s") | |
| names.append("cross_min_combined_600s") | |
| names.append("cross_combined_pct_rank_in_window") | |
| names.append("cross_corr_up_dn_ask_180s") | |
| names.append("cross_mom_mismatch_60s") | |
| # So far 18 + 24 + 7 = 49 | |
| # --- Padded derived features (target total = 80) --- | |
| # Level-depth ratios per side (5 levels-2) * 2 sides = 8? Use 10 | |
| for side in ("up", "dn"): | |
| for lvl in range(1, 6): | |
| names.append(f"{side}_bid_sz_lvl{lvl}_frac") | |
| # +10 => 59 | |
| for side in ("up", "dn"): | |
| for lvl in range(1, 6): | |
| names.append(f"{side}_ask_sz_lvl{lvl}_frac") | |
| # +10 => 69 | |
| # Mid-price velocity per side at 3 lags (6) | |
| for side in ("up", "dn"): | |
| for lag in _LAGS: | |
| names.append(f"{side}_mid_ret_{lag}s") | |
| # +6 => 75 | |
| # Spread stats per side (2 sides * 2 = 4) | |
| for side in ("up", "dn"): | |
| names.append(f"{side}_spread_mean_60s") | |
| names.append(f"{side}_spread_std_60s") | |
| # +4 => 79 | |
| # one more: cross-side mid sum | |
| names.append("cross_mid_up_plus_dn") | |
| # +1 => 80 | |
| return names | |
| FEATURE_NAMES: List[str] = _build_feature_names() | |
| assert len(FEATURE_NAMES) == 80, f"expected 80, got {len(FEATURE_NAMES)}" | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _col(df: pd.DataFrame, name: str) -> np.ndarray: | |
| if name in df.columns: | |
| a = df[name].to_numpy(dtype=np.float64) | |
| else: | |
| a = np.full(len(df), np.nan, dtype=np.float64) | |
| return a | |
| def _ff(a: np.ndarray) -> np.ndarray: | |
| out = a.copy() | |
| last = np.nan | |
| for i, v in enumerate(out): | |
| if np.isfinite(v): | |
| last = v | |
| else: | |
| out[i] = last | |
| # backfill any leading NaNs with first finite | |
| if not np.isfinite(out[0]): | |
| first = np.nan | |
| for v in out: | |
| if np.isfinite(v): | |
| first = v | |
| break | |
| if np.isfinite(first): | |
| for i in range(len(out)): | |
| if np.isfinite(out[i]): | |
| break | |
| out[i] = first | |
| return np.nan_to_num(out, nan=0.0, posinf=0.0, neginf=0.0) | |
| def _walked_ask_cost( | |
| ask_px, ask_sz, notional: float | |
| ) -> float: | |
| """Cost per share to buy $notional walking the ask levels at this tick. | |
| ask_px[i] / ask_sz[i] are per-level scalars (price / size at this tick). | |
| Accepts numpy scalars, python floats, or 0-d arrays. Returns effective | |
| price per share in [0, 1]. Penalize with best*1.02 if thin. | |
| """ | |
| def _as_scalar(v): | |
| # tolerate numpy 0-d arrays, 1-elem 1-d arrays, scalars, or None | |
| try: | |
| a = np.asarray(v, dtype=np.float64) | |
| if a.ndim == 0: | |
| return float(a) | |
| return float(a.reshape(-1)[0]) | |
| except Exception: | |
| return float("nan") | |
| n_levels = len(ask_px) | |
| filled_shares = 0.0 | |
| total_cost = 0.0 | |
| for i in range(n_levels): | |
| p = _as_scalar(ask_px[i]) | |
| s = _as_scalar(ask_sz[i]) | |
| if not np.isfinite(p) or not np.isfinite(s) or p <= 0 or s <= 0: | |
| continue | |
| remaining_dollars = notional - total_cost | |
| if remaining_dollars <= 0: | |
| break | |
| dollars_this = p * s | |
| if dollars_this >= remaining_dollars: | |
| shares_this = remaining_dollars / p | |
| total_cost += shares_this * p | |
| filled_shares += shares_this | |
| break | |
| total_cost += dollars_this | |
| filled_shares += s | |
| if filled_shares > 0 and total_cost >= notional * 0.99: | |
| return total_cost / filled_shares | |
| best = _as_scalar(ask_px[0]) if n_levels else 1.0 | |
| if not np.isfinite(best) or best <= 0: | |
| best = 1.0 | |
| return min(1.0, best * 1.02) | |
| # --------------------------------------------------------------------------- | |
| def extract(window_frame: pd.DataFrame, at_tick: int = 120) -> np.ndarray: | |
| df = window_frame.iloc[: at_tick + 1].copy() | |
| n = len(df) | |
| # Load all side/level arrays forward-filled for the window so far | |
| sides = ("up", "dn") | |
| levels = range(1, 6) | |
| series: dict = {} | |
| for side in sides: | |
| for lvl in levels: | |
| series[f"{side}_bid_px_{lvl}"] = _ff(_col(df, f"pm_{side}_bid_px_{lvl}")) | |
| series[f"{side}_bid_sz_{lvl}"] = _ff(_col(df, f"pm_{side}_bid_sz_{lvl}")) | |
| series[f"{side}_ask_px_{lvl}"] = _ff(_col(df, f"pm_{side}_ask_px_{lvl}")) | |
| series[f"{side}_ask_sz_{lvl}"] = _ff(_col(df, f"pm_{side}_ask_sz_{lvl}")) | |
| out: List[float] = [] | |
| # --- Immediate per side (18) --- | |
| for side in sides: | |
| best_bid = series[f"{side}_bid_px_1"][-1] | |
| best_ask = series[f"{side}_ask_px_1"][-1] | |
| mid = (best_bid + best_ask) / 2.0 if (best_bid > 0 and best_ask > 0) else 0.0 | |
| spread = (best_ask - best_bid) if (best_ask > 0 and best_bid > 0) else 0.0 | |
| bid_sum = sum(series[f"{side}_bid_sz_{l}"][-1] for l in levels) | |
| ask_sum = sum(series[f"{side}_ask_sz_{l}"][-1] for l in levels) | |
| b1 = series[f"{side}_bid_sz_1"][-1] | |
| a1 = series[f"{side}_ask_sz_1"][-1] | |
| l1_imb = (b1 - a1) / (b1 + a1) if (b1 + a1) > 0 else 0.0 | |
| # Weighted imbalance: higher levels weighted less | |
| weights = np.array([5, 4, 3, 2, 1], dtype=np.float64) | |
| bsum = sum( | |
| weights[i - 1] * series[f"{side}_bid_sz_{i}"][-1] for i in levels | |
| ) | |
| asum = sum( | |
| weights[i - 1] * series[f"{side}_ask_sz_{i}"][-1] for i in levels | |
| ) | |
| w_imb = (bsum - asum) / (bsum + asum) if (bsum + asum) > 0 else 0.0 | |
| px_levels = [series[f"{side}_ask_px_{l}"][-1] for l in levels] | |
| sz_levels = [series[f"{side}_ask_sz_{l}"][-1] for l in levels] | |
| walked = _walked_ask_cost(px_levels, sz_levels, 500.0) | |
| out.extend([best_bid, best_ask, mid, spread, bid_sum, ask_sum, l1_imb, w_imb, walked]) | |
| # --- Time-lagged per side (24) --- | |
| for side in sides: | |
| ask1 = series[f"{side}_ask_px_1"] | |
| # ask at t-lag | |
| for lag in _LAGS: | |
| idx = max(0, n - 1 - lag) | |
| out.append(float(ask1[idx])) | |
| # rolling mean over last lag seconds | |
| for lag in _LAGS: | |
| w = ask1[-lag:] if n >= lag else ask1 | |
| out.append(float(np.nanmean(w)) if len(w) else 0.0) | |
| # rolling std | |
| for lag in _LAGS: | |
| w = ask1[-lag:] if n >= lag else ask1 | |
| out.append(float(np.nanstd(w)) if len(w) > 1 else 0.0) | |
| # cumulative traded volume proxy — use OHLCV volume summed over lag | |
| volume = _col(df, "volume") | |
| volume = np.where(np.isfinite(volume), volume, 0.0) | |
| for lag in _LAGS: | |
| w = volume[-lag:] if n >= lag else volume | |
| out.append(float(np.sum(w))) | |
| # --- Cross-side (7) --- | |
| up_ask = series["up_ask_px_1"] | |
| dn_ask = series["dn_ask_px_1"] | |
| combined = up_ask + dn_ask | |
| out.append(float(combined[-1])) | |
| for lag in (60, 180, 600): | |
| w = combined[-lag:] if n >= lag else combined | |
| out.append(float(np.nanmin(w)) if len(w) else 0.0) | |
| # percentile rank of latest combined within window so far | |
| if len(combined) > 1: | |
| latest = combined[-1] | |
| out.append(float((combined <= latest).mean())) | |
| else: | |
| out.append(0.5) | |
| # corr over last 180s | |
| if n >= 10: | |
| w_u = up_ask[-180:] if n >= 180 else up_ask | |
| w_d = dn_ask[-180:] if n >= 180 else dn_ask | |
| if w_u.std() > 0 and w_d.std() > 0: | |
| out.append(float(np.corrcoef(w_u, w_d)[0, 1])) | |
| else: | |
| out.append(0.0) | |
| else: | |
| out.append(0.0) | |
| # momentum mismatch: up_ret_60s - (-dn_ret_60s) = up_ret_60s + dn_ret_60s | |
| def _ret60(a): | |
| if len(a) < 61 or a[-61] <= 0: | |
| return 0.0 | |
| return float(a[-1] / a[-61] - 1.0) | |
| out.append(_ret60(up_ask) + _ret60(dn_ask)) | |
| # --- Padded (31) --- | |
| # bid_sz_lvl{l}_frac per side (10) | |
| for side in sides: | |
| total = sum(series[f"{side}_bid_sz_{l}"][-1] for l in levels) | |
| for lvl in levels: | |
| v = series[f"{side}_bid_sz_{lvl}"][-1] | |
| out.append(v / total if total > 0 else 0.0) | |
| # ask_sz_lvl frac per side (10) | |
| for side in sides: | |
| total = sum(series[f"{side}_ask_sz_{l}"][-1] for l in levels) | |
| for lvl in levels: | |
| v = series[f"{side}_ask_sz_{lvl}"][-1] | |
| out.append(v / total if total > 0 else 0.0) | |
| # mid ret at 3 lags per side (6) | |
| for side in sides: | |
| best_bid_s = series[f"{side}_bid_px_1"] | |
| best_ask_s = series[f"{side}_ask_px_1"] | |
| mid_s = (best_bid_s + best_ask_s) / 2.0 | |
| for lag in _LAGS: | |
| if n > lag and mid_s[-lag - 1] > 0: | |
| out.append(float(mid_s[-1] / mid_s[-lag - 1] - 1.0)) | |
| else: | |
| out.append(0.0) | |
| # spread stats per side (4) | |
| for side in sides: | |
| best_bid_s = series[f"{side}_bid_px_1"] | |
| best_ask_s = series[f"{side}_ask_px_1"] | |
| spr = best_ask_s - best_bid_s | |
| w = spr[-60:] if n >= 60 else spr | |
| out.append(float(np.nanmean(w)) if len(w) else 0.0) | |
| out.append(float(np.nanstd(w)) if len(w) > 1 else 0.0) | |
| # cross mid sum (1) | |
| up_mid = (series["up_bid_px_1"][-1] + series["up_ask_px_1"][-1]) / 2.0 | |
| dn_mid = (series["dn_bid_px_1"][-1] + series["dn_ask_px_1"][-1]) / 2.0 | |
| out.append(float(up_mid + dn_mid)) | |
| arr = np.asarray(out, dtype=np.float64) | |
| assert arr.shape[0] == 80, f"produced {arr.shape[0]} features, expected 80" | |
| arr = np.where(np.isfinite(arr), arr, 0.0).astype(np.float32) | |
| return arr | |
| __all__ = ["FEATURE_NAMES", "extract"] | |