NautilusTrainer / data_processor.py
Nautilus AI
Deploy: Trainer to Root (Retry)
c5c085b
import pandas as pd
import numpy as np
import glob
import torch
import ast
from typing import Tuple
class AlphaDataProcessor:
"""
Processes raw market data (Parquet) into PyTorch Tensors for Alpha Agent training.
Upgraded for Deep Optimization (Robust Scaler, Dynamic Labels, Channel Separation, OFI, Triple Barrier).
"""
def __init__(self, data_dir: str = "./data"):
self.data_dir = data_dir
def _rolling_robust_scale(self, data: np.ndarray, window: int = 2000) -> np.ndarray:
"""
Rolling Robust Scaling using Median and IQR.
Prevents look-ahead bias (Leakage) by using only past statistics.
Computes rolling median/IQR along axis 0.
"""
# Convert to DataFrame for efficient rolling ops
df = pd.DataFrame(data)
# Min periods = window/10 to avoid NaNs at start (or ffill)
rolling = df.rolling(window=window, min_periods=window//10)
median = rolling.median()
q75 = rolling.quantile(0.75)
q25 = rolling.quantile(0.25)
iqr = q75 - q25
# Replace 0 IQR with 1 to avoid div by zero
iqr = iqr.replace(0, 1.0)
# Scale: (x_t - median_t) / iqr_t
# Note: robust scaling conventionally uses recent stats to normalize CURRENT value.
scaled = (df - median) / iqr
# Fill mean/zeros for initial unstable window
return scaled.fillna(0.0).values
def get_deeplob_tensors(self, coin: str = "ETH", T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]:
"""
DeepLOB with Channel Separation and Triple Barrier Labeling.
Uses Rolling Robust Scaling.
"""
df = self.load_l2_snapshots(coin)
if df.empty:
return self._generate_dummy_deeplob(T, levels)
prices_list = []
volumes_list = []
mid_prices = []
# Precompute Volatility for Labeling
best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0)
best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0)
mids = (best_bids + best_asks) / 2
mids = mids.replace(0, np.nan).ffill().fillna(0)
returns = np.diff(np.log(mids.values + 1e-9))
returns = np.concatenate(([0], returns))
volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values
mid_prices_arr = mids.values
for _, row in df.iterrows():
bids = row['bids']
asks = row['asks']
p_feat = []
v_feat = []
for i in range(levels):
if i < len(asks): pa, va = asks[i]
else: pa, va = 0, 0
if i < len(bids): pb, vb = bids[i]
else: pb, vb = 0, 0
p_feat.extend([pa, pb])
v_feat.extend([va, vb])
prices_list.append(p_feat)
volumes_list.append(v_feat)
prices_data = np.array(prices_list)
volumes_data = np.array(volumes_list)
# Rolling Robust Scaling (Leakage Free)
prices_data = self._rolling_robust_scale(prices_data, window=2000)
volumes_data = np.log1p(volumes_data)
volumes_data = self._rolling_robust_scale(volumes_data, window=2000)
k = 100
# Triple Barrier Labels
# PT=2, SL=2 (2x Volatility)
y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0)
# ... (Rest remains same)
def _get_triple_barrier_labels(self, mid_prices: np.ndarray, T: int, horizon: int, volatility: np.ndarray = None, pt: float = 1.0, sl: float = 1.0) -> np.ndarray:
"""
Triple Barrier Labeling Method (Marcos Lopez de Prado).
Labels: 0 (SL Hit), 1 (Time Limit), 2 (TP Hit).
pt: Profit Taking multiplier (x Volatility).
sl: Stop Loss multiplier (x Volatility).
"""
labels = []
# If volatility is None, compute standard
if volatility is None:
# Simple fallback
volatility = np.ones(len(mid_prices)) * 0.002
for i in range(T, len(mid_prices) - horizon):
current_price = mid_prices[i-1]
vol = volatility[i]
# Dynamic Barriers
upper_barrier = current_price * (1 + vol * pt)
lower_barrier = current_price * (1 - vol * sl)
# Path within Horizon
path = mid_prices[i : i + horizon]
# Check First Touch
# argmax returns index of first True
touch_upper = np.where(path >= upper_barrier)[0]
touch_lower = np.where(path <= lower_barrier)[0]
t_upper = touch_upper[0] if len(touch_upper) > 0 else horizon + 1
t_lower = touch_lower[0] if len(touch_lower) > 0 else horizon + 1
if t_upper == horizon + 1 and t_lower == horizon + 1:
label = 1 # Vertical Barrier (Time Limit)
elif t_upper < t_lower:
label = 2 # TP Hit First
else:
label = 0 # SL Hit First
labels.append(label)
return np.array(labels)
def _compute_ofi(self, df: pd.DataFrame, levels: int = 5) -> pd.DataFrame:
"""
Computes Order Flow Imbalance (OFI) for top 'levels'.
OFI_i(t) = I(P > P_prev)q - I(P < P_prev)q_prev + I(P == P_prev)(q - q_prev)
Summed across levels.
"""
# Explode bids/asks for first few levels
# This is expensive on large DFs. We do vectorized check on top 1 level mainly or aggregated.
# Efficient OFI: Compute on Best Bid/Ask only for speed in this version.
# 1. Shift DataFrame
df_prev = df.shift(1)
ofi = pd.Series(0.0, index=df.index)
# Top 1 Level OFI
bb_p = df['best_bid']
bb_q = df['best_bid_sz']
prev_bb_p = df_prev['best_bid']
prev_bb_q = df_prev['best_bid_sz']
ba_p = df['best_ask']
ba_q = df['best_ask_sz']
prev_ba_p = df_prev['best_ask']
prev_ba_q = df_prev['best_ask_sz']
# Bid OFI
bid_ofi = np.where(bb_p > prev_bb_p, bb_q,
np.where(bb_p < prev_bb_p, -prev_bb_q, bb_q - prev_bb_q))
# Ask OFI (Note: Supply side usually negative impact on price? OFI definition:
# e_i = e_bid_i - e_ask_i. High Bid demand -> +, High Ask supply -> -)
ask_ofi = np.where(ba_p > prev_ba_p, -prev_ba_q,
np.where(ba_p < prev_ba_p, ba_q, ba_q - prev_ba_q)) # Logic check needed here
# Standard Definition (Cont & Kukanov 2017):
# e_ask = I(Pa > Pa_prev) * (-qa_prev) + I(Pa < Pa_prev) * qa + I(Pa=Pa_prev)*(qa - qa_prev)
# Wait, if Ask Price Increases -> Supply removed (Good for price) -> ???
# Actually OFI = Flow at Bid - Flow at Ask.
# Let's stick to standard formula for 'Flow Contribution to Price Increase'.
# Increase in Ask Size -> Resistance -> Negative pressure.
ask_flow = np.where(ba_p > prev_ba_p, 0, # Price moved up (Ask Cleared?) -> No resistance added?
np.where(ba_p < prev_ba_p, ba_q, # Price moved down -> New wall
ba_q - prev_ba_q)) # Same price -> delta size
# Improved Ask OFI (Mirroring Bid Logic):
# We want "Buying Pressure" - "Selling Pressure"
# Bid Increase/Add = Buying Pressure (+)
# Ask Decrease/Add = Selling Pressure (-)
ask_ofi = np.where(ba_p > prev_ba_p, -prev_ba_q, # Price rose, prev qty consumed/cancelled ?
np.where(ba_p < prev_ba_p, ba_q, # Price fell, new supply at lower price
ba_q - prev_ba_q)) # Same price, delta
# Total OFI
ofi = bid_ofi - ask_ofi
return pd.Series(ofi).fillna(0)
def load_trades(self, coin: str = "ETH") -> pd.DataFrame:
"""Loads trade data."""
files = glob.glob(f"{self.data_dir}/raw_trade/{coin}/*.parquet")
if not files: return pd.DataFrame()
try:
df = pd.concat([pd.read_parquet(f) for f in files])
df = df.sort_values("time")
if 'side' in df.columns:
df['signed_vol'] = df.apply(lambda x: x['sz'] if x['side'] == 'B' else -x['sz'], axis=1)
else:
df['signed_vol'] = 0
return df
except Exception as e:
print(f"Error loading trades: {e}")
return pd.DataFrame()
def load_l2_snapshots(self, coin: str = "ETH", limit: int = 10000) -> pd.DataFrame:
"""Loads L2 Orderbook Snapshots."""
files = glob.glob(f"{self.data_dir}/order_book_snapshot/*.parquet")
if not files: return pd.DataFrame()
df_list = []
for f in files:
try:
chunk = pd.read_parquet(f)
chunk = chunk[chunk['instrument_id'].str.contains(coin)]
if not chunk.empty: df_list.append(chunk)
except: pass
if not df_list: return pd.DataFrame()
df = pd.concat(df_list)
df = df.sort_values("ts_event").head(limit)
df['bids'] = df['bids'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else [])
df['asks'] = df['asks'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else [])
return df
def get_deeplob_tensors(self, coin: str = "ETH", T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]:
"""
DeepLOB with Channel Separation and Triple Barrier Labeling.
"""
df = self.load_l2_snapshots(coin)
if df.empty:
return self._generate_dummy_deeplob(T, levels)
prices_list = []
volumes_list = []
mid_prices = []
# Precompute Volatility for Labeling
# Expand Mid Price first
best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0)
best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0)
mids = (best_bids + best_asks) / 2
mids = mids.replace(0, np.nan).ffill().fillna(0)
# Rolling Volatility (for Triple Barrier)
returns = np.diff(np.log(mids.values + 1e-9))
returns = np.concatenate(([0], returns))
volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values
mid_prices_arr = mids.values
for _, row in df.iterrows():
bids = row['bids']
asks = row['asks']
p_feat = []
v_feat = []
for i in range(levels):
if i < len(asks): pa, va = asks[i]
else: pa, va = 0, 0
if i < len(bids): pb, vb = bids[i]
else: pb, vb = 0, 0
p_feat.extend([pa, pb])
v_feat.extend([va, vb])
prices_list.append(p_feat)
volumes_list.append(v_feat)
prices_data = np.array(prices_list)
volumes_data = np.array(volumes_list)
# Robust Scaling
prices_data = self._robust_scale(prices_data)
volumes_data = np.log1p(volumes_data)
volumes_data = self._robust_scale(volumes_data)
k = 100
# Triple Barrier Labels
# PT=2, SL=2 (2x Volatility)
y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0)
X = []
y = []
valid_indices = range(T, len(mid_prices_arr) - k)
for idx, i in enumerate(valid_indices):
p_window = prices_data[i-T:i]
v_window = volumes_data[i-T:i]
sample = np.stack([p_window, v_window], axis=0) # (2, T, 2*Levels)
X.append(sample)
y.append(y_all[idx])
return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y))
def get_deeplob_tensors_from_df(self, df: pd.DataFrame, T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Process a pre-loaded DataFrame (chunk) into DeepLOB tensors.
Used for Streaming.
"""
if df.empty:
return torch.empty(0), torch.empty(0)
# Reuse the logic from get_deeplob_tensors, but skipping the load step.
# This duplicates some logic but ensures isolation.
prices_list = []
volumes_list = []
# Precompute Volatility for Labeling
best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0)
best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0)
mids = (best_bids + best_asks) / 2
mids = mids.replace(0, np.nan).ffill().fillna(0)
returns = np.diff(np.log(mids.values + 1e-9))
returns = np.concatenate(([0], returns))
volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values
mid_prices_arr = mids.values
for _, row in df.iterrows():
bids = row['bids']
asks = row['asks']
p_feat = []
v_feat = []
for i in range(levels):
if i < len(asks): pa, va = asks[i]
else: pa, va = 0, 0
if i < len(bids): pb, vb = bids[i]
else: pb, vb = 0, 0
p_feat.extend([pa, pb])
v_feat.extend([va, vb])
prices_list.append(p_feat)
volumes_list.append(v_feat)
prices_data = np.array(prices_list)
volumes_data = np.array(volumes_list)
# Robust Scaling
prices_data = self._robust_scale(prices_data)
volumes_data = np.log1p(volumes_data)
volumes_data = self._robust_scale(volumes_data)
k = 100
# Triple Barrier Labels
y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0)
X = []
y = []
# Since this is a chunk, we might lose the first T rows if not buffered correctly by the caller.
# The caller (StreamingDataLoader) is responsible for overlapping chunks.
valid_indices = range(T, len(mid_prices_arr) - k)
for idx, i in enumerate(valid_indices):
p_window = prices_data[i-T:i]
v_window = volumes_data[i-T:i]
sample = np.stack([p_window, v_window], axis=0)
X.append(sample)
y.append(y_all[idx])
return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y))
def _generate_dummy_deeplob(self, T, levels):
return torch.randn(32, 2, T, 2*levels), torch.randint(0, 3, (32,))
def compute_trm_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Computes features including OFI and Real CVD.
"""
df['best_bid'] = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else np.nan)
df['best_ask'] = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else np.nan)
df['best_bid_sz'] = df['bids'].apply(lambda x: x[0][1] if len(x)>0 else np.nan)
df['best_ask_sz'] = df['asks'].apply(lambda x: x[0][1] if len(x)>0 else np.nan)
df.dropna(subset=['best_bid', 'best_ask'], inplace=True)
df['mid'] = (df['best_bid'] + df['best_ask']) / 2
# OFI (New Feature)
df['ofi'] = self._compute_ofi(df)
df['spread'] = df['best_ask'] - df['best_bid']
df['imbalance'] = (df['best_bid_sz'] - df['best_ask_sz']) / (df['best_bid_sz'] + df['best_ask_sz'])
df['momentum'] = df['mid'].pct_change(periods=5)
df['returns'] = df['mid'].pct_change()
df['volatility'] = df['returns'].rolling(10).std()
# Real CVD
trades = self.load_trades(coin="ETH")
if not trades.empty:
trades['cumulative_vol'] = trades['signed_vol'].cumsum()
df = df.sort_values("ts_event")
trades = trades.sort_values("time")
df['ts_merge'] = df['ts_event']
trades['ts_merge'] = trades['time']
merged = pd.merge_asof(df, trades[['ts_merge', 'cumulative_vol']], on='ts_merge', direction='backward')
df['cvd'] = merged['cumulative_vol'].ffill().fillna(0)
else:
df['cvd'] = 0
df.dropna(inplace=True)
# Return 6 Features now: Vol, Imbal, CVD, Spread, Mom, OFI
return df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi', 'mid']]
def get_trm_tensors(self, coin: str = "ETH", T: int = 60) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Returns TRM Tensors.
Input size = 6 (Added OFI).
Labels = Triple Barrier.
"""
df = self.load_l2_snapshots(coin, limit=5000)
if df.empty:
return torch.randn(32, T, 6), torch.randint(0, 3, (32,))
feat_df = self.compute_trm_features(df)
data = feat_df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi']].values
mid = feat_df['mid'].values
# Rolling Robust Scale Features (Leakage Free)
data = self._rolling_robust_scale(data, window=2000)
# Returns for Vol calc
rets = np.diff(np.log(mid + 1e-9))
rets = np.concatenate(([0], rets))
vol = pd.Series(rets).rolling(window=T).std().fillna(0.001).values
# Triple Barrier Labels for TRM
y_all = self._get_triple_barrier_labels(mid, T, horizon=60, volatility=vol, pt=2.0, sl=2.0)
X, y = [], []
valid_indices = range(T, len(data) - 60)
for idx, i in enumerate(valid_indices):
X.append(data[i-T:i])
y.append(y_all[idx])
return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y))
def get_trm_tensors_from_df(self, df: pd.DataFrame, T: int = 60) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Process a pre-loaded DataFrame (chunk) into TRM tensors.
Used for Streaming.
"""
if df.empty:
return torch.empty(0), torch.empty(0)
feat_df = self.compute_trm_features(df)
if feat_df.empty:
return torch.empty(0), torch.empty(0)
data = feat_df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi']].values
mid = feat_df['mid'].values
data = self._rolling_robust_scale(data, window=2000)
rets = np.diff(np.log(mid + 1e-9))
rets = np.concatenate(([0], rets))
vol = pd.Series(rets).rolling(window=T).std().fillna(0.001).values
y_all = self._get_triple_barrier_labels(mid, T, horizon=60, volatility=vol, pt=2.0, sl=2.0)
X, y = [], []
valid_indices = range(T, len(data) - 60)
for idx, i in enumerate(valid_indices):
X.append(data[i-T:i])
y.append(y_all[idx])
return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y))
def get_lstm_tensors_from_df(self, df: pd.DataFrame, T: int = 60, forecast_horizon: int = 1) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Process Bar Data (OHLCV) into LSTM Tensors.
Features: Log Returns, Log Volume, High-Low Range, Close-Open Range.
Target: Next Log Return (scaled).
Output: X (Batch, T, Features), y (Batch, 1)
"""
if df.empty or len(df) < T + forecast_horizon:
return torch.empty(0), torch.empty(0)
# Ensure numeric
cols = ['open', 'high', 'low', 'close', 'volume']
for c in cols:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors='coerce')
df.dropna(subset=cols, inplace=True)
# 1. Feature Engineering
# Log Returns (Scale Invariant)
df['log_ret'] = np.log(df['close'] / df['close'].shift(1)).fillna(0)
# Log Volume
df['log_vol'] = np.log1p(df['volume'])
# High-Low Range (Relative to Close)
df['hl_range'] = (df['high'] - df['low']) / df['close']
# Close-Open Range (Relative to Open)
df['co_range'] = (df['close'] - df['open']) / df['open']
# Rolling Volatility (Feature)
df['volatility'] = df['log_ret'].rolling(window=20).std().fillna(0)
# Features Matrix
feature_cols = ['log_ret', 'log_vol', 'hl_range', 'co_range', 'volatility']
data = df[feature_cols].values
# 2. Robust Scaling (Leakage Free)
data = self._rolling_robust_scale(data, window=2000)
# 3. Target: Next Log Return (Scalar Regression)
# Scaled by 100 to match Tanh output range [-1, 1] for typical volatility
# e.g. 1% move = 0.01 * 100 = 1.0
target = df['log_ret'].shift(-forecast_horizon).fillna(0).values * 100
X, y = [], []
valid_indices = range(T, len(data) - forecast_horizon)
for i in valid_indices:
window = data[i-T:i] # (T, Features)
label = target[i] # (1,)
X.append(window)
y.append(label)
return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y)).unsqueeze(1)
def _robust_scale(self, data):
# Helper for legacy robust scale (non-rolling) if needed,
# or alias to rolling with large window for batch
# For now, implementing simple robust scale
median = np.median(data, axis=0)
q75 = np.percentile(data, 75, axis=0)
q25 = np.percentile(data, 25, axis=0)
iqr = q75 - q25
iqr[iqr == 0] = 1.0
return (data - median) / iqr