"""Market Microstructure Features Based on Marcos Lopez de Prado and the mlfinlab library. This is what separates retail technical analysis from institutional quant. Order flow, liquidity, and market impact contain genuine alpha. """ import numpy as np import pandas as pd from typing import Dict, List, Optional, Tuple import warnings warnings.filterwarnings('ignore') class MicrostructureFeatures: """ Extract market microstructure features from tick-level data. Key insight: The market is not a continuous price stream. It is a series of discrete transactions driven by informed vs. uninformed traders. Microstructure features detect this. """ @staticmethod def bid_ask_spread(bid: pd.Series, ask: pd.Series) -> pd.Series: """ Raw bid-ask spread. Wider spreads = lower liquidity, higher execution cost. """ return ask - bid @staticmethod def relative_spread(bid: pd.Series, ask: pd.Series, mid: Optional[pd.Series] = None) -> pd.Series: """ Spread as percentage of mid price. """ if mid is None: mid = (bid + ask) / 2 return (ask - bid) / mid @staticmethod def effective_spread(price: pd.Series, bid: pd.Series, ask: pd.Series) -> pd.Series: """ Effective spread = 2 * |trade_price - mid_price|. Measures actual execution cost vs. quoted spread. """ mid = (bid + ask) / 2 return 2 * np.abs(price - mid) / mid @staticmethod def realized_spread(price: pd.Series, bid: pd.Series, ask: pd.Series, future_mid: pd.Series) -> pd.Series: """ Realized spread = 2 * |trade_price - future_mid|. Measures adverse selection. If realized spread > effective spread, your trade moved the market against you. """ mid = (bid + ask) / 2 return 2 * np.abs(price - future_mid) / mid @staticmethod def price_impact(price: pd.Series, volume: pd.Series, bid: pd.Series, ask: pd.Series) -> pd.Series: """ Kyle's Lambda — price impact coefficient. delta_price = lambda * signed_volume + noise Higher lambda = less liquid market, your orders move prices more. """ mid = (bid + ask) / 2 mid_change = mid.diff() # Signed volume: Lee-Ready tick test signed_vol = np.where( price > mid.shift(1), volume, np.where(price < mid.shift(1), -volume, 0) ) # Rolling regression via covariance/variance ratio return pd.Series(signed_vol).rolling(100).cov( pd.Series(mid_change).rolling(100) ) / pd.Series(signed_vol).rolling(100).var().replace(0, np.nan) @staticmethod def order_flow_imbalance(bid_size: pd.Series, ask_size: pd.Series) -> pd.Series: """ OFI = (bid_size - ask_size) / (bid_size + ask_size). Positive = more buying pressure = bullish. This is genuine short-term alpha in liquid markets. """ return (bid_size - ask_size) / (bid_size + ask_size + 1e-10) @staticmethod def volume_imbalance(buy_volume: pd.Series, sell_volume: pd.Series) -> pd.Series: """ Volume imbalance = (buy_vol - sell_vol) / (buy_vol + sell_vol). Classification via tick test or quote test. """ return (buy_volume - sell_volume) / (buy_volume + sell_volume + 1e-10) @staticmethod def trade_sign_classification(price: pd.Series, bid: pd.Series, ask: pd.Series) -> pd.Series: """ Lee-Ready tick test for trade direction classification. If trade price > mid → buy (aggressor is buyer) If trade price < mid → sell (aggressor is seller) If trade price = mid → use tick test (compare to previous trade) """ mid = (bid + ask) / 2 # Quote test sign = np.where(price > mid, 1, np.where(price < mid, -1, 0)) # Tick test for mid-trades price_change = price.diff() tick_sign = np.where( price_change > 0, 1, np.where(price_change < 0, -1, 0) ) # Use tick test where quote test is inconclusive sign = np.where(sign == 0, tick_sign, sign) # If still 0, carry forward sign = pd.Series(sign).fillna(method='ffill').fillna(0).values return pd.Series(sign, index=price.index) @staticmethod def amihud_illiquidity(price: pd.Series, volume: pd.Series, window: int = 21) -> pd.Series: """ Amihud illiquidity = |return| / (price * volume). Higher = less liquid. Used in academic literature to measure market quality. Predicts returns (illiquid stocks earn premium). """ returns = price.pct_change().abs() dollar_volume = price * volume return (returns / dollar_volume).rolling(window).mean() * 1e6 @staticmethod def kyles_lambda(price: pd.Series, volume: pd.Series, trade_sign: pd.Series, window: int = 100) -> pd.Series: """ Kyle's Lambda — price impact per unit of order flow. Lambda = Cov(delta_price, signed_volume) / Var(signed_volume) Proxy for adverse selection and market depth. """ delta_price = price.diff() signed_volume = trade_sign * volume cov = delta_price.rolling(window).cov(signed_volume) var = signed_volume.rolling(window).var() return cov / var.replace(0, np.nan) @staticmethod def vpin_approximation(price: pd.Series, volume: pd.Series, bucket_vol: float = 10000) -> float: """ VPIN — Volume-Synchronized Probability of Informed Trading. Simplified approximation using equal-volume buckets. High VPIN = high probability of informed trading = adverse selection risk. """ # Classify trades mid = price.rolling(2).mean() trade_sign = np.where(price > mid.shift(1), 1, -1) signed_volume = trade_sign * volume buy_volume = np.where(signed_volume > 0, volume, 0) sell_volume = np.where(signed_volume < 0, volume, 0) # Create volume buckets cumulative = np.cumsum(volume) n_buckets = int(cumulative[-1] / bucket_vol) if n_buckets < 10: return np.nan bucket_boundaries = np.linspace(0, cumulative[-1], n_buckets + 1) bucket_buy = [] bucket_sell = [] for i in range(n_buckets): mask = (cumulative >= bucket_boundaries[i]) & (cumulative < bucket_boundaries[i+1]) bucket_buy.append(np.sum(buy_volume[mask])) bucket_sell.append(np.sum(sell_volume[mask])) bucket_buy = np.array(bucket_buy) bucket_sell = np.array(bucket_sell) bucket_volume = bucket_buy + bucket_sell # VPIN = average |buy - sell| / volume vpin_values = np.abs(bucket_buy - bucket_sell) / (bucket_volume + 1e-10) return np.mean(vpin_values) @staticmethod def roll_measure(price: pd.Series, window: int = 20) -> pd.Series: """ Roll's measure — estimate bid-ask spread from serial covariance. Spread = 2 * sqrt(-Cov(delta_price_t, delta_price_{t-1})) Only valid when covariance is negative. """ delta = price.diff() cov = delta.rolling(window).cov(delta.shift(1)) # Roll's measure roll = 2 * np.sqrt(np.maximum(-cov, 0)) return roll @staticmethod def hasbrouck_lambda(price: pd.Series, volume: pd.Series, window: int = 100) -> pd.Series: """ Hasbrouck's Lambda — information-based price impact. Measures how much of the price change is due to information vs. liquidity demand. """ # Simplified: correlation of returns with lagged signed volume returns = price.pct_change() trade_sign = np.sign(price.diff().fillna(0)) signed_volume = trade_sign * volume return returns.rolling(window).corr(signed_volume.shift(1)) def compute_all_microstructure_features(df: pd.DataFrame) -> pd.DataFrame: """ Compute all microstructure features from a tick DataFrame. Required columns: price, volume, bid, ask, bid_size, ask_size """ required = ['price', 'volume', 'bid', 'ask', 'bid_size', 'ask_size'] for col in required: if col not in df.columns: raise ValueError(f"Missing required column: {col}") features = pd.DataFrame(index=df.index) # Basic spread features['spread'] = MicrostructureFeatures.bid_ask_spread(df['bid'], df['ask']) features['relative_spread'] = MicrostructureFeatures.relative_spread( df['bid'], df['ask'] ) # Effective spread features['effective_spread'] = MicrostructureFeatures.effective_spread( df['price'], df['bid'], df['ask'] ) # Order flow imbalance features['ofi'] = MicrostructureFeatures.order_flow_imbalance( df['bid_size'], df['ask_size'] ) # Trade sign classification features['trade_sign'] = MicrostructureFeatures.trade_sign_classification( df['price'], df['bid'], df['ask'] ) # Signed volume features['signed_volume'] = features['trade_sign'] * df['volume'] features['volume_imbalance'] = MicrostructureFeatures.volume_imbalance( np.where(features['trade_sign'] > 0, df['volume'], 0), np.where(features['trade_sign'] < 0, df['volume'], 0) ) # Amihud illiquidity (using daily approximation from intraday) features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity( df['price'], df['volume'] ) # Kyle's lambda features['kyle_lambda'] = MicrostructureFeatures.kyles_lambda( df['price'], df['volume'], features['trade_sign'] ) # Roll's measure features['roll_measure'] = MicrostructureFeatures.roll_measure(df['price']) # Hasbrouck lambda features['hasbrouck_lambda'] = MicrostructureFeatures.hasbrouck_lambda( df['price'], df['volume'] ) # VPIN (computed once, broadcast) vpin = MicrostructureFeatures.vpin_approximation(df['price'], df['volume']) features['vpin'] = vpin return features.replace([np.inf, -np.inf], np.nan).fillna(method='ffill').fillna(0) def generate_synthetic_tick_data(n_ticks: int = 10000, base_price: float = 100.0, volatility: float = 0.001, spread_bps: float = 1.0) -> pd.DataFrame: """ Generate synthetic tick-level data for testing microstructure features. """ np.random.seed(42) # Price process: random walk with slight mean reversion prices = [base_price] for _ in range(n_ticks - 1): # Small random walk change = np.random.randn() * volatility * base_price # Mean reversion change -= 0.01 * (prices[-1] - base_price) prices.append(max(prices[-1] + change, 0.01)) prices = np.array(prices) # Bid-ask spread half_spread = prices * spread_bps / 20000 # bps to dollars bid = prices - half_spread ask = prices + half_spread # Sizes (power law: few large orders, many small) bid_size = np.random.lognormal(8, 1.5, n_ticks).astype(int) ask_size = np.random.lognormal(8, 1.5, n_ticks).astype(int) # Volume (trades happen at mid mostly) volume = np.random.lognormal(6, 1.2, n_ticks).astype(int) # Timestamp times = pd.date_range('2024-01-01 09:30', periods=n_ticks, freq='1s') return pd.DataFrame({ 'timestamp': times, 'price': prices, 'bid': bid, 'ask': ask, 'bid_size': bid_size, 'ask_size': ask_size, 'volume': volume }).set_index('timestamp') if __name__ == '__main__': # Test microstructure features tick_data = generate_synthetic_tick_data(n_ticks=5000) features = compute_all_microstructure_features(tick_data) print("Market Microstructure Features") print("=" * 60) print(f"\nDataset: {len(tick_data)} ticks") print(f"Features computed: {len(features.columns)}") print(f"\nFeature Summary:") print(features.describe().round(6)) print(f"\nVPIN (Volume-Synchronized Probability of Informed Trading):") print(f" {features['vpin'].iloc[0]:.4f}") print(f"\nSample Features (last 5 ticks):") print(features[['spread', 'relative_spread', 'ofi', 'kyle_lambda', 'amihud_illiquidity']].tail().round(6))