Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import glob | |
| import torch | |
| import ast | |
| from typing import Tuple | |
| class AlphaDataProcessor: | |
| """ | |
| Processes raw market data (Parquet) into PyTorch Tensors for Alpha Agent training. | |
| Upgraded for Deep Optimization (Robust Scaler, Dynamic Labels, Channel Separation, OFI, Triple Barrier). | |
| """ | |
| def __init__(self, data_dir: str = "./data"): | |
| self.data_dir = data_dir | |
| def _rolling_robust_scale(self, data: np.ndarray, window: int = 2000) -> np.ndarray: | |
| """ | |
| Rolling Robust Scaling using Median and IQR. | |
| Prevents look-ahead bias (Leakage) by using only past statistics. | |
| Computes rolling median/IQR along axis 0. | |
| """ | |
| # Convert to DataFrame for efficient rolling ops | |
| df = pd.DataFrame(data) | |
| # Min periods = window/10 to avoid NaNs at start (or ffill) | |
| rolling = df.rolling(window=window, min_periods=window//10) | |
| median = rolling.median() | |
| q75 = rolling.quantile(0.75) | |
| q25 = rolling.quantile(0.25) | |
| iqr = q75 - q25 | |
| # Replace 0 IQR with 1 to avoid div by zero | |
| iqr = iqr.replace(0, 1.0) | |
| # Scale: (x_t - median_t) / iqr_t | |
| # Note: robust scaling conventionally uses recent stats to normalize CURRENT value. | |
| scaled = (df - median) / iqr | |
| # Fill mean/zeros for initial unstable window | |
| return scaled.fillna(0.0).values | |
| def get_deeplob_tensors(self, coin: str = "ETH", T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| DeepLOB with Channel Separation and Triple Barrier Labeling. | |
| Uses Rolling Robust Scaling. | |
| """ | |
| df = self.load_l2_snapshots(coin) | |
| if df.empty: | |
| return self._generate_dummy_deeplob(T, levels) | |
| prices_list = [] | |
| volumes_list = [] | |
| mid_prices = [] | |
| # Precompute Volatility for Labeling | |
| best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0) | |
| best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0) | |
| mids = (best_bids + best_asks) / 2 | |
| mids = mids.replace(0, np.nan).ffill().fillna(0) | |
| returns = np.diff(np.log(mids.values + 1e-9)) | |
| returns = np.concatenate(([0], returns)) | |
| volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values | |
| mid_prices_arr = mids.values | |
| for _, row in df.iterrows(): | |
| bids = row['bids'] | |
| asks = row['asks'] | |
| p_feat = [] | |
| v_feat = [] | |
| for i in range(levels): | |
| if i < len(asks): pa, va = asks[i] | |
| else: pa, va = 0, 0 | |
| if i < len(bids): pb, vb = bids[i] | |
| else: pb, vb = 0, 0 | |
| p_feat.extend([pa, pb]) | |
| v_feat.extend([va, vb]) | |
| prices_list.append(p_feat) | |
| volumes_list.append(v_feat) | |
| prices_data = np.array(prices_list) | |
| volumes_data = np.array(volumes_list) | |
| # Rolling Robust Scaling (Leakage Free) | |
| prices_data = self._rolling_robust_scale(prices_data, window=2000) | |
| volumes_data = np.log1p(volumes_data) | |
| volumes_data = self._rolling_robust_scale(volumes_data, window=2000) | |
| k = 100 | |
| # Triple Barrier Labels | |
| # PT=2, SL=2 (2x Volatility) | |
| y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0) | |
| # ... (Rest remains same) | |
| def _get_triple_barrier_labels(self, mid_prices: np.ndarray, T: int, horizon: int, volatility: np.ndarray = None, pt: float = 1.0, sl: float = 1.0) -> np.ndarray: | |
| """ | |
| Triple Barrier Labeling Method (Marcos Lopez de Prado). | |
| Labels: 0 (SL Hit), 1 (Time Limit), 2 (TP Hit). | |
| pt: Profit Taking multiplier (x Volatility). | |
| sl: Stop Loss multiplier (x Volatility). | |
| """ | |
| labels = [] | |
| # If volatility is None, compute standard | |
| if volatility is None: | |
| # Simple fallback | |
| volatility = np.ones(len(mid_prices)) * 0.002 | |
| for i in range(T, len(mid_prices) - horizon): | |
| current_price = mid_prices[i-1] | |
| vol = volatility[i] | |
| # Dynamic Barriers | |
| upper_barrier = current_price * (1 + vol * pt) | |
| lower_barrier = current_price * (1 - vol * sl) | |
| # Path within Horizon | |
| path = mid_prices[i : i + horizon] | |
| # Check First Touch | |
| # argmax returns index of first True | |
| touch_upper = np.where(path >= upper_barrier)[0] | |
| touch_lower = np.where(path <= lower_barrier)[0] | |
| t_upper = touch_upper[0] if len(touch_upper) > 0 else horizon + 1 | |
| t_lower = touch_lower[0] if len(touch_lower) > 0 else horizon + 1 | |
| if t_upper == horizon + 1 and t_lower == horizon + 1: | |
| label = 1 # Vertical Barrier (Time Limit) | |
| elif t_upper < t_lower: | |
| label = 2 # TP Hit First | |
| else: | |
| label = 0 # SL Hit First | |
| labels.append(label) | |
| return np.array(labels) | |
| def _compute_ofi(self, df: pd.DataFrame, levels: int = 5) -> pd.DataFrame: | |
| """ | |
| Computes Order Flow Imbalance (OFI) for top 'levels'. | |
| OFI_i(t) = I(P > P_prev)q - I(P < P_prev)q_prev + I(P == P_prev)(q - q_prev) | |
| Summed across levels. | |
| """ | |
| # Explode bids/asks for first few levels | |
| # This is expensive on large DFs. We do vectorized check on top 1 level mainly or aggregated. | |
| # Efficient OFI: Compute on Best Bid/Ask only for speed in this version. | |
| # 1. Shift DataFrame | |
| df_prev = df.shift(1) | |
| ofi = pd.Series(0.0, index=df.index) | |
| # Top 1 Level OFI | |
| bb_p = df['best_bid'] | |
| bb_q = df['best_bid_sz'] | |
| prev_bb_p = df_prev['best_bid'] | |
| prev_bb_q = df_prev['best_bid_sz'] | |
| ba_p = df['best_ask'] | |
| ba_q = df['best_ask_sz'] | |
| prev_ba_p = df_prev['best_ask'] | |
| prev_ba_q = df_prev['best_ask_sz'] | |
| # Bid OFI | |
| bid_ofi = np.where(bb_p > prev_bb_p, bb_q, | |
| np.where(bb_p < prev_bb_p, -prev_bb_q, bb_q - prev_bb_q)) | |
| # Ask OFI (Note: Supply side usually negative impact on price? OFI definition: | |
| # e_i = e_bid_i - e_ask_i. High Bid demand -> +, High Ask supply -> -) | |
| ask_ofi = np.where(ba_p > prev_ba_p, -prev_ba_q, | |
| np.where(ba_p < prev_ba_p, ba_q, ba_q - prev_ba_q)) # Logic check needed here | |
| # Standard Definition (Cont & Kukanov 2017): | |
| # e_ask = I(Pa > Pa_prev) * (-qa_prev) + I(Pa < Pa_prev) * qa + I(Pa=Pa_prev)*(qa - qa_prev) | |
| # Wait, if Ask Price Increases -> Supply removed (Good for price) -> ??? | |
| # Actually OFI = Flow at Bid - Flow at Ask. | |
| # Let's stick to standard formula for 'Flow Contribution to Price Increase'. | |
| # Increase in Ask Size -> Resistance -> Negative pressure. | |
| ask_flow = np.where(ba_p > prev_ba_p, 0, # Price moved up (Ask Cleared?) -> No resistance added? | |
| np.where(ba_p < prev_ba_p, ba_q, # Price moved down -> New wall | |
| ba_q - prev_ba_q)) # Same price -> delta size | |
| # Improved Ask OFI (Mirroring Bid Logic): | |
| # We want "Buying Pressure" - "Selling Pressure" | |
| # Bid Increase/Add = Buying Pressure (+) | |
| # Ask Decrease/Add = Selling Pressure (-) | |
| ask_ofi = np.where(ba_p > prev_ba_p, -prev_ba_q, # Price rose, prev qty consumed/cancelled ? | |
| np.where(ba_p < prev_ba_p, ba_q, # Price fell, new supply at lower price | |
| ba_q - prev_ba_q)) # Same price, delta | |
| # Total OFI | |
| ofi = bid_ofi - ask_ofi | |
| return pd.Series(ofi).fillna(0) | |
| def load_trades(self, coin: str = "ETH") -> pd.DataFrame: | |
| """Loads trade data.""" | |
| files = glob.glob(f"{self.data_dir}/raw_trade/{coin}/*.parquet") | |
| if not files: return pd.DataFrame() | |
| try: | |
| df = pd.concat([pd.read_parquet(f) for f in files]) | |
| df = df.sort_values("time") | |
| if 'side' in df.columns: | |
| df['signed_vol'] = df.apply(lambda x: x['sz'] if x['side'] == 'B' else -x['sz'], axis=1) | |
| else: | |
| df['signed_vol'] = 0 | |
| return df | |
| except Exception as e: | |
| print(f"Error loading trades: {e}") | |
| return pd.DataFrame() | |
| def load_l2_snapshots(self, coin: str = "ETH", limit: int = 10000) -> pd.DataFrame: | |
| """Loads L2 Orderbook Snapshots.""" | |
| files = glob.glob(f"{self.data_dir}/order_book_snapshot/*.parquet") | |
| if not files: return pd.DataFrame() | |
| df_list = [] | |
| for f in files: | |
| try: | |
| chunk = pd.read_parquet(f) | |
| chunk = chunk[chunk['instrument_id'].str.contains(coin)] | |
| if not chunk.empty: df_list.append(chunk) | |
| except: pass | |
| if not df_list: return pd.DataFrame() | |
| df = pd.concat(df_list) | |
| df = df.sort_values("ts_event").head(limit) | |
| df['bids'] = df['bids'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else []) | |
| df['asks'] = df['asks'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else []) | |
| return df | |
| def get_deeplob_tensors(self, coin: str = "ETH", T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| DeepLOB with Channel Separation and Triple Barrier Labeling. | |
| """ | |
| df = self.load_l2_snapshots(coin) | |
| if df.empty: | |
| return self._generate_dummy_deeplob(T, levels) | |
| prices_list = [] | |
| volumes_list = [] | |
| mid_prices = [] | |
| # Precompute Volatility for Labeling | |
| # Expand Mid Price first | |
| best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0) | |
| best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0) | |
| mids = (best_bids + best_asks) / 2 | |
| mids = mids.replace(0, np.nan).ffill().fillna(0) | |
| # Rolling Volatility (for Triple Barrier) | |
| returns = np.diff(np.log(mids.values + 1e-9)) | |
| returns = np.concatenate(([0], returns)) | |
| volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values | |
| mid_prices_arr = mids.values | |
| for _, row in df.iterrows(): | |
| bids = row['bids'] | |
| asks = row['asks'] | |
| p_feat = [] | |
| v_feat = [] | |
| for i in range(levels): | |
| if i < len(asks): pa, va = asks[i] | |
| else: pa, va = 0, 0 | |
| if i < len(bids): pb, vb = bids[i] | |
| else: pb, vb = 0, 0 | |
| p_feat.extend([pa, pb]) | |
| v_feat.extend([va, vb]) | |
| prices_list.append(p_feat) | |
| volumes_list.append(v_feat) | |
| prices_data = np.array(prices_list) | |
| volumes_data = np.array(volumes_list) | |
| # Robust Scaling | |
| prices_data = self._robust_scale(prices_data) | |
| volumes_data = np.log1p(volumes_data) | |
| volumes_data = self._robust_scale(volumes_data) | |
| k = 100 | |
| # Triple Barrier Labels | |
| # PT=2, SL=2 (2x Volatility) | |
| y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0) | |
| X = [] | |
| y = [] | |
| valid_indices = range(T, len(mid_prices_arr) - k) | |
| for idx, i in enumerate(valid_indices): | |
| p_window = prices_data[i-T:i] | |
| v_window = volumes_data[i-T:i] | |
| sample = np.stack([p_window, v_window], axis=0) # (2, T, 2*Levels) | |
| X.append(sample) | |
| y.append(y_all[idx]) | |
| return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) | |
| def get_deeplob_tensors_from_df(self, df: pd.DataFrame, T: int = 100, levels: int = 20) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| Process a pre-loaded DataFrame (chunk) into DeepLOB tensors. | |
| Used for Streaming. | |
| """ | |
| if df.empty: | |
| return torch.empty(0), torch.empty(0) | |
| # Reuse the logic from get_deeplob_tensors, but skipping the load step. | |
| # This duplicates some logic but ensures isolation. | |
| prices_list = [] | |
| volumes_list = [] | |
| # Precompute Volatility for Labeling | |
| best_bids = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else 0) | |
| best_asks = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else 0) | |
| mids = (best_bids + best_asks) / 2 | |
| mids = mids.replace(0, np.nan).ffill().fillna(0) | |
| returns = np.diff(np.log(mids.values + 1e-9)) | |
| returns = np.concatenate(([0], returns)) | |
| volatility = pd.Series(returns).rolling(window=T).std().fillna(0.001).values | |
| mid_prices_arr = mids.values | |
| for _, row in df.iterrows(): | |
| bids = row['bids'] | |
| asks = row['asks'] | |
| p_feat = [] | |
| v_feat = [] | |
| for i in range(levels): | |
| if i < len(asks): pa, va = asks[i] | |
| else: pa, va = 0, 0 | |
| if i < len(bids): pb, vb = bids[i] | |
| else: pb, vb = 0, 0 | |
| p_feat.extend([pa, pb]) | |
| v_feat.extend([va, vb]) | |
| prices_list.append(p_feat) | |
| volumes_list.append(v_feat) | |
| prices_data = np.array(prices_list) | |
| volumes_data = np.array(volumes_list) | |
| # Robust Scaling | |
| prices_data = self._robust_scale(prices_data) | |
| volumes_data = np.log1p(volumes_data) | |
| volumes_data = self._robust_scale(volumes_data) | |
| k = 100 | |
| # Triple Barrier Labels | |
| y_all = self._get_triple_barrier_labels(mid_prices_arr, T, k, volatility, pt=2.0, sl=2.0) | |
| X = [] | |
| y = [] | |
| # Since this is a chunk, we might lose the first T rows if not buffered correctly by the caller. | |
| # The caller (StreamingDataLoader) is responsible for overlapping chunks. | |
| valid_indices = range(T, len(mid_prices_arr) - k) | |
| for idx, i in enumerate(valid_indices): | |
| p_window = prices_data[i-T:i] | |
| v_window = volumes_data[i-T:i] | |
| sample = np.stack([p_window, v_window], axis=0) | |
| X.append(sample) | |
| y.append(y_all[idx]) | |
| return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) | |
| def _generate_dummy_deeplob(self, T, levels): | |
| return torch.randn(32, 2, T, 2*levels), torch.randint(0, 3, (32,)) | |
| def compute_trm_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Computes features including OFI and Real CVD. | |
| """ | |
| df['best_bid'] = df['bids'].apply(lambda x: x[0][0] if len(x)>0 else np.nan) | |
| df['best_ask'] = df['asks'].apply(lambda x: x[0][0] if len(x)>0 else np.nan) | |
| df['best_bid_sz'] = df['bids'].apply(lambda x: x[0][1] if len(x)>0 else np.nan) | |
| df['best_ask_sz'] = df['asks'].apply(lambda x: x[0][1] if len(x)>0 else np.nan) | |
| df.dropna(subset=['best_bid', 'best_ask'], inplace=True) | |
| df['mid'] = (df['best_bid'] + df['best_ask']) / 2 | |
| # OFI (New Feature) | |
| df['ofi'] = self._compute_ofi(df) | |
| df['spread'] = df['best_ask'] - df['best_bid'] | |
| df['imbalance'] = (df['best_bid_sz'] - df['best_ask_sz']) / (df['best_bid_sz'] + df['best_ask_sz']) | |
| df['momentum'] = df['mid'].pct_change(periods=5) | |
| df['returns'] = df['mid'].pct_change() | |
| df['volatility'] = df['returns'].rolling(10).std() | |
| # Real CVD | |
| trades = self.load_trades(coin="ETH") | |
| if not trades.empty: | |
| trades['cumulative_vol'] = trades['signed_vol'].cumsum() | |
| df = df.sort_values("ts_event") | |
| trades = trades.sort_values("time") | |
| df['ts_merge'] = df['ts_event'] | |
| trades['ts_merge'] = trades['time'] | |
| merged = pd.merge_asof(df, trades[['ts_merge', 'cumulative_vol']], on='ts_merge', direction='backward') | |
| df['cvd'] = merged['cumulative_vol'].ffill().fillna(0) | |
| else: | |
| df['cvd'] = 0 | |
| df.dropna(inplace=True) | |
| # Return 6 Features now: Vol, Imbal, CVD, Spread, Mom, OFI | |
| return df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi', 'mid']] | |
| def get_trm_tensors(self, coin: str = "ETH", T: int = 60) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| Returns TRM Tensors. | |
| Input size = 6 (Added OFI). | |
| Labels = Triple Barrier. | |
| """ | |
| df = self.load_l2_snapshots(coin, limit=5000) | |
| if df.empty: | |
| return torch.randn(32, T, 6), torch.randint(0, 3, (32,)) | |
| feat_df = self.compute_trm_features(df) | |
| data = feat_df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi']].values | |
| mid = feat_df['mid'].values | |
| # Rolling Robust Scale Features (Leakage Free) | |
| data = self._rolling_robust_scale(data, window=2000) | |
| # Returns for Vol calc | |
| rets = np.diff(np.log(mid + 1e-9)) | |
| rets = np.concatenate(([0], rets)) | |
| vol = pd.Series(rets).rolling(window=T).std().fillna(0.001).values | |
| # Triple Barrier Labels for TRM | |
| y_all = self._get_triple_barrier_labels(mid, T, horizon=60, volatility=vol, pt=2.0, sl=2.0) | |
| X, y = [], [] | |
| valid_indices = range(T, len(data) - 60) | |
| for idx, i in enumerate(valid_indices): | |
| X.append(data[i-T:i]) | |
| y.append(y_all[idx]) | |
| return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) | |
| def get_trm_tensors_from_df(self, df: pd.DataFrame, T: int = 60) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| Process a pre-loaded DataFrame (chunk) into TRM tensors. | |
| Used for Streaming. | |
| """ | |
| if df.empty: | |
| return torch.empty(0), torch.empty(0) | |
| feat_df = self.compute_trm_features(df) | |
| if feat_df.empty: | |
| return torch.empty(0), torch.empty(0) | |
| data = feat_df[['volatility', 'imbalance', 'cvd', 'spread', 'momentum', 'ofi']].values | |
| mid = feat_df['mid'].values | |
| data = self._rolling_robust_scale(data, window=2000) | |
| rets = np.diff(np.log(mid + 1e-9)) | |
| rets = np.concatenate(([0], rets)) | |
| vol = pd.Series(rets).rolling(window=T).std().fillna(0.001).values | |
| y_all = self._get_triple_barrier_labels(mid, T, horizon=60, volatility=vol, pt=2.0, sl=2.0) | |
| X, y = [], [] | |
| valid_indices = range(T, len(data) - 60) | |
| for idx, i in enumerate(valid_indices): | |
| X.append(data[i-T:i]) | |
| y.append(y_all[idx]) | |
| return torch.FloatTensor(np.array(X)), torch.LongTensor(np.array(y)) | |
| def get_lstm_tensors_from_df(self, df: pd.DataFrame, T: int = 60, forecast_horizon: int = 1) -> Tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| Process Bar Data (OHLCV) into LSTM Tensors. | |
| Features: Log Returns, Log Volume, High-Low Range, Close-Open Range. | |
| Target: Next Log Return (scaled). | |
| Output: X (Batch, T, Features), y (Batch, 1) | |
| """ | |
| if df.empty or len(df) < T + forecast_horizon: | |
| return torch.empty(0), torch.empty(0) | |
| # Ensure numeric | |
| cols = ['open', 'high', 'low', 'close', 'volume'] | |
| for c in cols: | |
| if c in df.columns: | |
| df[c] = pd.to_numeric(df[c], errors='coerce') | |
| df.dropna(subset=cols, inplace=True) | |
| # 1. Feature Engineering | |
| # Log Returns (Scale Invariant) | |
| df['log_ret'] = np.log(df['close'] / df['close'].shift(1)).fillna(0) | |
| # Log Volume | |
| df['log_vol'] = np.log1p(df['volume']) | |
| # High-Low Range (Relative to Close) | |
| df['hl_range'] = (df['high'] - df['low']) / df['close'] | |
| # Close-Open Range (Relative to Open) | |
| df['co_range'] = (df['close'] - df['open']) / df['open'] | |
| # Rolling Volatility (Feature) | |
| df['volatility'] = df['log_ret'].rolling(window=20).std().fillna(0) | |
| # Features Matrix | |
| feature_cols = ['log_ret', 'log_vol', 'hl_range', 'co_range', 'volatility'] | |
| data = df[feature_cols].values | |
| # 2. Robust Scaling (Leakage Free) | |
| data = self._rolling_robust_scale(data, window=2000) | |
| # 3. Target: Next Log Return (Scalar Regression) | |
| # Scaled by 100 to match Tanh output range [-1, 1] for typical volatility | |
| # e.g. 1% move = 0.01 * 100 = 1.0 | |
| target = df['log_ret'].shift(-forecast_horizon).fillna(0).values * 100 | |
| X, y = [], [] | |
| valid_indices = range(T, len(data) - forecast_horizon) | |
| for i in valid_indices: | |
| window = data[i-T:i] # (T, Features) | |
| label = target[i] # (1,) | |
| X.append(window) | |
| y.append(label) | |
| return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y)).unsqueeze(1) | |
| def _robust_scale(self, data): | |
| # Helper for legacy robust scale (non-rolling) if needed, | |
| # or alias to rolling with large window for batch | |
| # For now, implementing simple robust scale | |
| median = np.median(data, axis=0) | |
| q75 = np.percentile(data, 75, axis=0) | |
| q25 = np.percentile(data, 25, axis=0) | |
| iqr = q75 - q25 | |
| iqr[iqr == 0] = 1.0 | |
| return (data - median) / iqr | |