import pandas as pd import numpy as np import glob import torch import ast from typing import Tuple class AlphaDataProcessor: """ Processes raw market data (Parquet) into PyTorch Tensors for Alpha Agent training. Upgraded for Deep Optimization (Robust Scaler, Dynamic Labels, Channel Separation, OFI, Triple Barrier). """ def __init__(self, data_dir: str = "./data"): self.data_dir = data_dir def _rolling_robust_scale(self, data: np.ndarray, window: int = 2000) -> np.ndarray: """ Rolling Robust Scaling using Median and IQR. Prevents look-ahead bias (Leakage) by using only past statistics. Computes rolling median/IQR along axis 0. """ # Convert to DataFrame for efficient rolling ops df = pd.DataFrame(data) # Min periods = window/10 to avoid NaNs at start (or ffill) rolling = df.rolling(window=window, min_periods=window//10) median = rolling.median() q75 = rolling.quantile(0.75) q25 = rolling.quantile(0.25) iqr = q75 - q25 # Replace 0 IQR with 1 to avoid div by zero iqr = iqr.replace(0, 1.0) # Scale: (x_t - median_t) / iqr_t # Note: robust scaling conventionally uses recent stats to normalize CURRENT value. scaled = (df - median) / iqr # Fill mean/zeros for initial unstable window return scaled.fillna(0.0).values def get_deeplob_tensors(self, coin: str = "ETH", T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]: """ DeepLOB with Channel Separation and Triple Barrier Labeling. Uses Rolling Robust Scaling. """ df = self.load_l2_snapshots(coin) if df.empty: return self._generate_dummy_deeplob(T, levels) prices_list = [] volumes_list = [] mid_prices = [] # Precompute Volatility for Labeling best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0) best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0) mids = (best_bids + best_asks) / 2 mids = mids.replace(0, np.nan).ffill().fillna(0) returns = np.diff(np.log(mids.values + 1e-9)) returns = np.concatenate(([0], returns)) volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values mid_prices_arr = mids.values for _, row in df.iterrows(): bids = row['bids'] asks = row['asks'] p_feat = [] v_feat = [] for i in range(levels): if i < len(asks): pa, va = asks[i] else: pa, va = 0, 0 if i < len(bids): pb, vb = bids[i] else: pb, vb = 0, 0 p_feat.extend([pa, pb]) v_feat.extend([va, vb]) prices_list.append(p_feat) volumes_list.append(v_feat) prices_data = np.array(prices_list) volumes_data = np.array(volumes_list) # Rolling Robust Scaling (Leakage Free) prices_data = self._rolling_robust_scale(prices_data, window=2000) volumes_data = np.log1p(volumes_data) volumes_data = self._rolling_robust_scale(volumes_data, window=2000) k = 100 # Triple Barrier Labels # PT=2, SL=2 (2x Volatility) y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0) # ... (Rest remains same) def _get_triple_barrier_labels(self, mid_prices: np.ndarray, T: int, horizon: int, volatility: np.ndarray = None, pt: float = 1.0, sl: float = 1.0) -> np.ndarray: """ Triple Barrier Labeling Method (Marcos Lopez de Prado). Labels: 0 (SL Hit), 1 (Time Limit), 2 (TP Hit). pt: Profit Taking multiplier (x Volatility). sl: Stop Loss multiplier (x Volatility). """ labels = [] # If volatility is None, compute standard if volatility is None: # Simple fallback volatility = np.ones(len(mid_prices)) * 0.002 for i in range(T, len(mid_prices) - horizon): current_price = mid_prices[i-1] vol = volatility[i] # Dynamic Barriers upper_barrier = current_price * (1 + vol * pt) lower_barrier = current_price * (1 - vol * sl) # Path within Horizon path = mid_prices[i : i + horizon] # Check First Touch # argmax returns index of first True touch_upper = np.where(path >= upper_barrier)[0] touch_lower = np.where(path <= lower_barrier)[0] t_upper = touch_upper[0] if len(touch_upper) > 0 else horizon + 1 t_lower = touch_lower[0] if len(touch_lower) > 0 else horizon + 1 if t_upper == horizon + 1 and t_lower == horizon + 1: label = 1 # Vertical Barrier (Time Limit) elif t_upper < t_lower: label = 2 # TP Hit First else: label = 0 # SL Hit First labels.append(label) return np.array(labels) def _compute_ofi(self, df: pd.DataFrame, levels: int = 5) -> pd.DataFrame: """ Computes Order Flow Imbalance (OFI) for top 'levels'. OFI_i(t) = I(P > P_prev)q - I(P < P_prev)q_prev + I(P == P_prev)(q - q_prev) Summed across levels. """ # Explode bids/asks for first few levels # This is expensive on large DFs. We do vectorized check on top 1 level mainly or aggregated. # Efficient OFI: Compute on Best Bid/Ask only for speed in this version. # 1. Shift DataFrame df_prev = df.shift(1) ofi = pd.Series(0.0, index=df.index) # Top 1 Level OFI bb_p = df['best_bid'] bb_q = df['best_bid_sz'] prev_bb_p = df_prev['best_bid'] prev_bb_q = df_prev['best_bid_sz'] ba_p = df['best_ask'] ba_q = df['best_ask_sz'] prev_ba_p = df_prev['best_ask'] prev_ba_q = df_prev['best_ask_sz'] # Bid OFI bid_ofi = np.where(bb_p > prev_bb_p, bb_q, np.where(bb_p < prev_bb_p, -prev_bb_q, bb_q - prev_bb_q)) # Ask OFI (Note: Supply side usually negative impact on price? OFI definition: # e_i = e_bid_i - e_ask_i. High Bid demand -> +, High Ask supply -> -) ask_ofi = np.where(ba_p > prev_ba_p, -prev_ba_q, np.where(ba_p < prev_ba_p, ba_q, ba_q - prev_ba_q)) # Logic check needed here # Standard Definition (Cont & Kukanov 2017): # e_ask = I(Pa > Pa_prev) * (-qa_prev) + I(Pa < Pa_prev) * qa + I(Pa=Pa_prev)*(qa - qa_prev) # Wait, if Ask Price Increases -> Supply removed (Good for price) -> ??? # Actually OFI = Flow at Bid - Flow at Ask. # Let's stick to standard formula for 'Flow Contribution to Price Increase'. # Increase in Ask Size -> Resistance -> Negative pressure. ask_flow = np.where(ba_p > prev_ba_p, 0, # Price moved up (Ask Cleared?) -> No resistance added? np.where(ba_p < prev_ba_p, ba_q, # Price moved down -> New wall ba_q - prev_ba_q)) # Same price -> delta size # Improved Ask OFI (Mirroring Bid Logic): # We want "Buying Pressure" - "Selling Pressure" # Bid Increase/Add = Buying Pressure (+) # Ask Decrease/Add = Selling Pressure (-) ask_ofi = np.where(ba_p > prev_ba_p, -prev_ba_q, # Price rose, prev qty consumed/cancelled ? np.where(ba_p < prev_ba_p, ba_q, # Price fell, new supply at lower price ba_q - prev_ba_q)) # Same price, delta # Total OFI ofi = bid_ofi - ask_ofi return pd.Series(ofi).fillna(0) def load_trades(self, coin: str = "ETH") -> pd.DataFrame: """Loads trade data.""" files = glob.glob(f"{self.data_dir}/raw_trade/{coin}/*.parquet") if not files: return pd.DataFrame() try: df = pd.concat([pd.read_parquet(f) for f in files]) df = df.sort_values("time") if 'side' in df.columns: df['signed_vol'] = df.apply(lambda x: x['sz'] if x['side'] == 'B' else -x['sz'], axis=1) else: df['signed_vol'] = 0 return df except Exception as e: print(f"Error loading trades: {e}") return pd.DataFrame() def load_l2_snapshots(self, coin: str = "ETH", limit: int = 10000) -> pd.DataFrame: """Loads L2 Orderbook Snapshots.""" files = glob.glob(f"{self.data_dir}/order_book_snapshot/*.parquet") if not files: return pd.DataFrame() df_list = [] for f in files: try: chunk = pd.read_parquet(f) chunk = chunk[chunk['instrument_id'].str.contains(coin)] if not chunk.empty: df_list.append(chunk) except: pass if not df_list: return pd.DataFrame() df = pd.concat(df_list) df = df.sort_values("ts_event").head(limit) df['bids'] = df['bids'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else []) df['asks'] = df['asks'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else []) return df def get_deeplob_tensors(self, coin: str = "ETH", T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]: """ DeepLOB with Channel Separation and Triple Barrier Labeling. """ df = self.load_l2_snapshots(coin) if df.empty: return self._generate_dummy_deeplob(T, levels) prices_list = [] volumes_list = [] mid_prices = [] # Precompute Volatility for Labeling # Expand Mid Price first best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0) best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0) mids = (best_bids + best_asks) / 2 mids = mids.replace(0, np.nan).ffill().fillna(0) # Rolling Volatility (for Triple Barrier) returns = np.diff(np.log(mids.values + 1e-9)) returns = np.concatenate(([0], returns)) volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values mid_prices_arr = mids.values for _, row in df.iterrows(): bids = row['bids'] asks = row['asks'] p_feat = [] v_feat = [] for i in range(levels): if i < len(asks): pa, va = asks[i] else: pa, va = 0, 0 if i < len(bids): pb, vb = bids[i] else: pb, vb = 0, 0 p_feat.extend([pa, pb]) v_feat.extend([va, vb]) prices_list.append(p_feat) volumes_list.append(v_feat) prices_data = np.array(prices_list) volumes_data = np.array(volumes_list) # Robust Scaling prices_data = self._robust_scale(prices_data) volumes_data = np.log1p(volumes_data) volumes_data = self._robust_scale(volumes_data) k = 100 # Triple Barrier Labels # PT=2, SL=2 (2x Volatility) y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0) X = [] y = [] valid_indices = range(T, len(mid_prices_arr) - k) for idx, i in enumerate(valid_indices): p_window = prices_data[i-T:i] v_window = volumes_data[i-T:i] sample = np.stack([p_window, v_window], axis=0) # (2, T, 2*Levels) X.append(sample) y.append(y_all[idx]) return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) def get_deeplob_tensors_from_df(self, df: pd.DataFrame, T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]: """ Process a pre-loaded DataFrame (chunk) into DeepLOB tensors. Used for Streaming. """ if df.empty: return torch.empty(0), torch.empty(0) # Reuse the logic from get_deeplob_tensors, but skipping the load step. # This duplicates some logic but ensures isolation. prices_list = [] volumes_list = [] # Precompute Volatility for Labeling best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0) best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0) mids = (best_bids + best_asks) / 2 mids = mids.replace(0, np.nan).ffill().fillna(0) returns = np.diff(np.log(mids.values + 1e-9)) returns = np.concatenate(([0], returns)) volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values mid_prices_arr = mids.values for _, row in df.iterrows(): bids = row['bids'] asks = row['asks'] p_feat = [] v_feat = [] for i in range(levels): if i < len(asks): pa, va = asks[i] else: pa, va = 0, 0 if i < len(bids): pb, vb = bids[i] else: pb, vb = 0, 0 p_feat.extend([pa, pb]) v_feat.extend([va, vb]) prices_list.append(p_feat) volumes_list.append(v_feat) prices_data = np.array(prices_list) volumes_data = np.array(volumes_list) # Robust Scaling prices_data = self._robust_scale(prices_data) volumes_data = np.log1p(volumes_data) volumes_data = self._robust_scale(volumes_data) k = 100 # Triple Barrier Labels y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0) X = [] y = [] # Since this is a chunk, we might lose the first T rows if not buffered correctly by the caller. # The caller (StreamingDataLoader) is responsible for overlapping chunks. valid_indices = range(T, len(mid_prices_arr) - k) for idx, i in enumerate(valid_indices): p_window = prices_data[i-T:i] v_window = volumes_data[i-T:i] sample = np.stack([p_window, v_window], axis=0) X.append(sample) y.append(y_all[idx]) return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) def _generate_dummy_deeplob(self, T, levels): return torch.randn(32, 2, T, 2*levels), torch.randint(0, 3, (32,)) def compute_trm_features(self, df: pd.DataFrame) -> pd.DataFrame: """ Computes features including OFI and Real CVD. """ df['best_bid'] = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else np.nan) df['best_ask'] = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else np.nan) df['best_bid_sz'] = df['bids'].apply(lambda x: x[0][1] if len(x)>0 else np.nan) df['best_ask_sz'] = df['asks'].apply(lambda x: x[0][1] if len(x)>0 else np.nan) df.dropna(subset=['best_bid', 'best_ask'], inplace=True) df['mid'] = (df['best_bid'] + df['best_ask']) / 2 # OFI (New Feature) df['ofi'] = self._compute_ofi(df) df['spread'] = df['best_ask'] - df['best_bid'] df['imbalance'] = (df['best_bid_sz'] - df['best_ask_sz']) / (df['best_bid_sz'] + df['best_ask_sz']) df['momentum'] = df['mid'].pct_change(periods=5) df['returns'] = df['mid'].pct_change() df['volatility'] = df['returns'].rolling(10).std() # Real CVD trades = self.load_trades(coin="ETH") if not trades.empty: trades['cumulative_vol'] = trades['signed_vol'].cumsum() df = df.sort_values("ts_event") trades = trades.sort_values("time") df['ts_merge'] = df['ts_event'] trades['ts_merge'] = trades['time'] merged = pd.merge_asof(df, trades[['ts_merge', 'cumulative_vol']], on='ts_merge', direction='backward') df['cvd'] = merged['cumulative_vol'].ffill().fillna(0) else: df['cvd'] = 0 df.dropna(inplace=True) # Return 6 Features now: Vol, Imbal, CVD, Spread, Mom, OFI return df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi', 'mid']] def get_trm_tensors(self, coin: str = "ETH", T: int = 60) -> Tuple[torch.Tensor, torch.Tensor]: """ Returns TRM Tensors. Input size = 6 (Added OFI). Labels = Triple Barrier. """ df = self.load_l2_snapshots(coin, limit=5000) if df.empty: return torch.randn(32, T, 6), torch.randint(0, 3, (32,)) feat_df = self.compute_trm_features(df) data = feat_df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi']].values mid = feat_df['mid'].values # Rolling Robust Scale Features (Leakage Free) data = self._rolling_robust_scale(data, window=2000) # Returns for Vol calc rets = np.diff(np.log(mid + 1e-9)) rets = np.concatenate(([0], rets)) vol = pd.Series(rets).rolling(window=T).std().fillna(0.001).values # Triple Barrier Labels for TRM y_all = self._get_triple_barrier_labels(mid, T, horizon=60, volatility=vol, pt=2.0, sl=2.0) X, y = [], [] valid_indices = range(T, len(data) - 60) for idx, i in enumerate(valid_indices): X.append(data[i-T:i]) y.append(y_all[idx]) return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) def get_trm_tensors_from_df(self, df: pd.DataFrame, T: int = 60) -> Tuple[torch.Tensor, torch.Tensor]: """ Process a pre-loaded DataFrame (chunk) into TRM tensors. Used for Streaming. """ if df.empty: return torch.empty(0), torch.empty(0) feat_df = self.compute_trm_features(df) if feat_df.empty: return torch.empty(0), torch.empty(0) data = feat_df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi']].values mid = feat_df['mid'].values data = self._rolling_robust_scale(data, window=2000) rets = np.diff(np.log(mid + 1e-9)) rets = np.concatenate(([0], rets)) vol = pd.Series(rets).rolling(window=T).std().fillna(0.001).values y_all = self._get_triple_barrier_labels(mid, T, horizon=60, volatility=vol, pt=2.0, sl=2.0) X, y = [], [] valid_indices = range(T, len(data) - 60) for idx, i in enumerate(valid_indices): X.append(data[i-T:i]) y.append(y_all[idx]) return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) def get_lstm_tensors_from_df(self, df: pd.DataFrame, T: int = 60, forecast_horizon: int = 1) -> Tuple[torch.Tensor, torch.Tensor]: """ Process Bar Data (OHLCV) into LSTM Tensors. Features: Log Returns, Log Volume, High-Low Range, Close-Open Range. Target: Next Log Return (scaled). Output: X (Batch, T, Features), y (Batch, 1) """ if df.empty or len(df) < T + forecast_horizon: return torch.empty(0), torch.empty(0) # Ensure numeric cols = ['open', 'high', 'low', 'close', 'volume'] for c in cols: if c in df.columns: df[c] = pd.to_numeric(df[c], errors='coerce') df.dropna(subset=cols, inplace=True) # 1. Feature Engineering # Log Returns (Scale Invariant) df['log_ret'] = np.log(df['close'] / df['close'].shift(1)).fillna(0) # Log Volume df['log_vol'] = np.log1p(df['volume']) # High-Low Range (Relative to Close) df['hl_range'] = (df['high'] - df['low']) / df['close'] # Close-Open Range (Relative to Open) df['co_range'] = (df['close'] - df['open']) / df['open'] # Rolling Volatility (Feature) df['volatility'] = df['log_ret'].rolling(window=20).std().fillna(0) # Features Matrix feature_cols = ['log_ret', 'log_vol', 'hl_range', 'co_range', 'volatility'] data = df[feature_cols].values # 2. Robust Scaling (Leakage Free) data = self._rolling_robust_scale(data, window=2000) # 3. Target: Next Log Return (Scalar Regression) # Scaled by 100 to match Tanh output range [-1, 1] for typical volatility # e.g. 1% move = 0.01 * 100 = 1.0 target = df['log_ret'].shift(-forecast_horizon).fillna(0).values * 100 X, y = [], [] valid_indices = range(T, len(data) - forecast_horizon) for i in valid_indices: window = data[i-T:i] # (T, Features) label = target[i] # (1,) X.append(window) y.append(label) return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y)).unsqueeze(1) def _robust_scale(self, data): # Helper for legacy robust scale (non-rolling) if needed, # or alias to rolling with large window for batch # For now, implementing simple robust scale median = np.median(data, axis=0) q75 = np.percentile(data, 75, axis=0) q25 = np.percentile(data, 25, axis=0) iqr = q75 - q25 iqr[iqr == 0] = 1.0 return (data - median) / iqr