| """Market Microstructure Features |
| |
| Based on Marcos Lopez de Prado and the mlfinlab library. |
| |
| This is what separates retail technical analysis from institutional quant. |
| Order flow, liquidity, and market impact contain genuine alpha. |
| """ |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Optional, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class MicrostructureFeatures: |
| """ |
| Extract market microstructure features from tick-level data. |
| |
| Key insight: The market is not a continuous price stream. |
| It is a series of discrete transactions driven by informed vs. |
| uninformed traders. Microstructure features detect this. |
| """ |
| |
| @staticmethod |
| def bid_ask_spread(bid: pd.Series, ask: pd.Series) -> pd.Series: |
| """ |
| Raw bid-ask spread. |
| |
| Wider spreads = lower liquidity, higher execution cost. |
| """ |
| return ask - bid |
| |
| @staticmethod |
| def relative_spread(bid: pd.Series, ask: pd.Series, |
| mid: Optional[pd.Series] = None) -> pd.Series: |
| """ |
| Spread as percentage of mid price. |
| """ |
| if mid is None: |
| mid = (bid + ask) / 2 |
| return (ask - bid) / mid |
| |
| @staticmethod |
| def effective_spread(price: pd.Series, bid: pd.Series, |
| ask: pd.Series) -> pd.Series: |
| """ |
| Effective spread = 2 * |trade_price - mid_price|. |
| |
| Measures actual execution cost vs. quoted spread. |
| """ |
| mid = (bid + ask) / 2 |
| return 2 * np.abs(price - mid) / mid |
| |
| @staticmethod |
| def realized_spread(price: pd.Series, bid: pd.Series, ask: pd.Series, |
| future_mid: pd.Series) -> pd.Series: |
| """ |
| Realized spread = 2 * |trade_price - future_mid|. |
| |
| Measures adverse selection. If realized spread > effective spread, |
| your trade moved the market against you. |
| """ |
| mid = (bid + ask) / 2 |
| return 2 * np.abs(price - future_mid) / mid |
| |
| @staticmethod |
| def price_impact(price: pd.Series, volume: pd.Series, |
| bid: pd.Series, ask: pd.Series) -> pd.Series: |
| """ |
| Kyle's Lambda — price impact coefficient. |
| |
| delta_price = lambda * signed_volume + noise |
| |
| Higher lambda = less liquid market, your orders move prices more. |
| """ |
| mid = (bid + ask) / 2 |
| mid_change = mid.diff() |
| |
| |
| signed_vol = np.where( |
| price > mid.shift(1), volume, |
| np.where(price < mid.shift(1), -volume, 0) |
| ) |
| |
| |
| return pd.Series(signed_vol).rolling(100).cov( |
| pd.Series(mid_change).rolling(100) |
| ) / pd.Series(signed_vol).rolling(100).var().replace(0, np.nan) |
| |
| @staticmethod |
| def order_flow_imbalance(bid_size: pd.Series, ask_size: pd.Series) -> pd.Series: |
| """ |
| OFI = (bid_size - ask_size) / (bid_size + ask_size). |
| |
| Positive = more buying pressure = bullish. |
| |
| This is genuine short-term alpha in liquid markets. |
| """ |
| return (bid_size - ask_size) / (bid_size + ask_size + 1e-10) |
| |
| @staticmethod |
| def volume_imbalance(buy_volume: pd.Series, sell_volume: pd.Series) -> pd.Series: |
| """ |
| Volume imbalance = (buy_vol - sell_vol) / (buy_vol + sell_vol). |
| |
| Classification via tick test or quote test. |
| """ |
| return (buy_volume - sell_volume) / (buy_volume + sell_volume + 1e-10) |
| |
| @staticmethod |
| def trade_sign_classification(price: pd.Series, |
| bid: pd.Series, |
| ask: pd.Series) -> pd.Series: |
| """ |
| Lee-Ready tick test for trade direction classification. |
| |
| If trade price > mid → buy (aggressor is buyer) |
| If trade price < mid → sell (aggressor is seller) |
| If trade price = mid → use tick test (compare to previous trade) |
| """ |
| mid = (bid + ask) / 2 |
| |
| |
| sign = np.where(price > mid, 1, np.where(price < mid, -1, 0)) |
| |
| |
| price_change = price.diff() |
| tick_sign = np.where( |
| price_change > 0, 1, |
| np.where(price_change < 0, -1, 0) |
| ) |
| |
| |
| sign = np.where(sign == 0, tick_sign, sign) |
| |
| |
| sign = pd.Series(sign).fillna(method='ffill').fillna(0).values |
| |
| return pd.Series(sign, index=price.index) |
| |
| @staticmethod |
| def amihud_illiquidity(price: pd.Series, volume: pd.Series, |
| window: int = 21) -> pd.Series: |
| """ |
| Amihud illiquidity = |return| / (price * volume). |
| |
| Higher = less liquid. |
| |
| Used in academic literature to measure market quality. |
| Predicts returns (illiquid stocks earn premium). |
| """ |
| returns = price.pct_change().abs() |
| dollar_volume = price * volume |
| |
| return (returns / dollar_volume).rolling(window).mean() * 1e6 |
| |
| @staticmethod |
| def kyles_lambda(price: pd.Series, volume: pd.Series, |
| trade_sign: pd.Series, window: int = 100) -> pd.Series: |
| """ |
| Kyle's Lambda — price impact per unit of order flow. |
| |
| Lambda = Cov(delta_price, signed_volume) / Var(signed_volume) |
| |
| Proxy for adverse selection and market depth. |
| """ |
| delta_price = price.diff() |
| signed_volume = trade_sign * volume |
| |
| cov = delta_price.rolling(window).cov(signed_volume) |
| var = signed_volume.rolling(window).var() |
| |
| return cov / var.replace(0, np.nan) |
| |
| @staticmethod |
| def vpin_approximation(price: pd.Series, volume: pd.Series, |
| bucket_vol: float = 10000) -> float: |
| """ |
| VPIN — Volume-Synchronized Probability of Informed Trading. |
| |
| Simplified approximation using equal-volume buckets. |
| |
| High VPIN = high probability of informed trading = adverse selection risk. |
| """ |
| |
| mid = price.rolling(2).mean() |
| trade_sign = np.where(price > mid.shift(1), 1, -1) |
| |
| signed_volume = trade_sign * volume |
| buy_volume = np.where(signed_volume > 0, volume, 0) |
| sell_volume = np.where(signed_volume < 0, volume, 0) |
| |
| |
| cumulative = np.cumsum(volume) |
| n_buckets = int(cumulative[-1] / bucket_vol) |
| |
| if n_buckets < 10: |
| return np.nan |
| |
| bucket_boundaries = np.linspace(0, cumulative[-1], n_buckets + 1) |
| |
| bucket_buy = [] |
| bucket_sell = [] |
| |
| for i in range(n_buckets): |
| mask = (cumulative >= bucket_boundaries[i]) & (cumulative < bucket_boundaries[i+1]) |
| bucket_buy.append(np.sum(buy_volume[mask])) |
| bucket_sell.append(np.sum(sell_volume[mask])) |
| |
| bucket_buy = np.array(bucket_buy) |
| bucket_sell = np.array(bucket_sell) |
| bucket_volume = bucket_buy + bucket_sell |
| |
| |
| vpin_values = np.abs(bucket_buy - bucket_sell) / (bucket_volume + 1e-10) |
| |
| return np.mean(vpin_values) |
| |
| @staticmethod |
| def roll_measure(price: pd.Series, window: int = 20) -> pd.Series: |
| """ |
| Roll's measure — estimate bid-ask spread from serial covariance. |
| |
| Spread = 2 * sqrt(-Cov(delta_price_t, delta_price_{t-1})) |
| |
| Only valid when covariance is negative. |
| """ |
| delta = price.diff() |
| cov = delta.rolling(window).cov(delta.shift(1)) |
| |
| |
| roll = 2 * np.sqrt(np.maximum(-cov, 0)) |
| |
| return roll |
| |
| @staticmethod |
| def hasbrouck_lambda(price: pd.Series, volume: pd.Series, |
| window: int = 100) -> pd.Series: |
| """ |
| Hasbrouck's Lambda — information-based price impact. |
| |
| Measures how much of the price change is due to information |
| vs. liquidity demand. |
| """ |
| |
| returns = price.pct_change() |
| trade_sign = np.sign(price.diff().fillna(0)) |
| signed_volume = trade_sign * volume |
| |
| return returns.rolling(window).corr(signed_volume.shift(1)) |
|
|
|
|
| def compute_all_microstructure_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Compute all microstructure features from a tick DataFrame. |
| |
| Required columns: price, volume, bid, ask, bid_size, ask_size |
| """ |
| required = ['price', 'volume', 'bid', 'ask', 'bid_size', 'ask_size'] |
| for col in required: |
| if col not in df.columns: |
| raise ValueError(f"Missing required column: {col}") |
| |
| features = pd.DataFrame(index=df.index) |
| |
| |
| features['spread'] = MicrostructureFeatures.bid_ask_spread(df['bid'], df['ask']) |
| features['relative_spread'] = MicrostructureFeatures.relative_spread( |
| df['bid'], df['ask'] |
| ) |
| |
| |
| features['effective_spread'] = MicrostructureFeatures.effective_spread( |
| df['price'], df['bid'], df['ask'] |
| ) |
| |
| |
| features['ofi'] = MicrostructureFeatures.order_flow_imbalance( |
| df['bid_size'], df['ask_size'] |
| ) |
| |
| |
| features['trade_sign'] = MicrostructureFeatures.trade_sign_classification( |
| df['price'], df['bid'], df['ask'] |
| ) |
| |
| |
| features['signed_volume'] = features['trade_sign'] * df['volume'] |
| features['volume_imbalance'] = MicrostructureFeatures.volume_imbalance( |
| np.where(features['trade_sign'] > 0, df['volume'], 0), |
| np.where(features['trade_sign'] < 0, df['volume'], 0) |
| ) |
| |
| |
| features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity( |
| df['price'], df['volume'] |
| ) |
| |
| |
| features['kyle_lambda'] = MicrostructureFeatures.kyles_lambda( |
| df['price'], df['volume'], features['trade_sign'] |
| ) |
| |
| |
| features['roll_measure'] = MicrostructureFeatures.roll_measure(df['price']) |
| |
| |
| features['hasbrouck_lambda'] = MicrostructureFeatures.hasbrouck_lambda( |
| df['price'], df['volume'] |
| ) |
| |
| |
| vpin = MicrostructureFeatures.vpin_approximation(df['price'], df['volume']) |
| features['vpin'] = vpin |
| |
| return features.replace([np.inf, -np.inf], np.nan).fillna(method='ffill').fillna(0) |
|
|
|
|
| def generate_synthetic_tick_data(n_ticks: int = 10000, |
| base_price: float = 100.0, |
| volatility: float = 0.001, |
| spread_bps: float = 1.0) -> pd.DataFrame: |
| """ |
| Generate synthetic tick-level data for testing microstructure features. |
| """ |
| np.random.seed(42) |
| |
| |
| prices = [base_price] |
| for _ in range(n_ticks - 1): |
| |
| change = np.random.randn() * volatility * base_price |
| |
| change -= 0.01 * (prices[-1] - base_price) |
| prices.append(max(prices[-1] + change, 0.01)) |
| |
| prices = np.array(prices) |
| |
| |
| half_spread = prices * spread_bps / 20000 |
| bid = prices - half_spread |
| ask = prices + half_spread |
| |
| |
| bid_size = np.random.lognormal(8, 1.5, n_ticks).astype(int) |
| ask_size = np.random.lognormal(8, 1.5, n_ticks).astype(int) |
| |
| |
| volume = np.random.lognormal(6, 1.2, n_ticks).astype(int) |
| |
| |
| times = pd.date_range('2024-01-01 09:30', periods=n_ticks, freq='1s') |
| |
| return pd.DataFrame({ |
| 'timestamp': times, |
| 'price': prices, |
| 'bid': bid, |
| 'ask': ask, |
| 'bid_size': bid_size, |
| 'ask_size': ask_size, |
| 'volume': volume |
| }).set_index('timestamp') |
|
|
|
|
| if __name__ == '__main__': |
| |
| tick_data = generate_synthetic_tick_data(n_ticks=5000) |
| features = compute_all_microstructure_features(tick_data) |
| |
| print("Market Microstructure Features") |
| print("=" * 60) |
| print(f"\nDataset: {len(tick_data)} ticks") |
| print(f"Features computed: {len(features.columns)}") |
| print(f"\nFeature Summary:") |
| print(features.describe().round(6)) |
| |
| print(f"\nVPIN (Volume-Synchronized Probability of Informed Trading):") |
| print(f" {features['vpin'].iloc[0]:.4f}") |
| |
| print(f"\nSample Features (last 5 ticks):") |
| print(features[['spread', 'relative_spread', 'ofi', 'kyle_lambda', |
| 'amihud_illiquidity']].tail().round(6)) |
|
|