alphaforge-quant-system / market_microstructure.py
Premchan369's picture
Add market microstructure features: Kyle's lambda, VPIN, Roll measure, OFI, Amihud
96e73d5 verified
"""Market Microstructure Features
Based on Marcos Lopez de Prado and the mlfinlab library.
This is what separates retail technical analysis from institutional quant.
Order flow, liquidity, and market impact contain genuine alpha.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
class MicrostructureFeatures:
"""
Extract market microstructure features from tick-level data.
Key insight: The market is not a continuous price stream.
It is a series of discrete transactions driven by informed vs.
uninformed traders. Microstructure features detect this.
"""
@staticmethod
def bid_ask_spread(bid: pd.Series, ask: pd.Series) -> pd.Series:
"""
Raw bid-ask spread.
Wider spreads = lower liquidity, higher execution cost.
"""
return ask - bid
@staticmethod
def relative_spread(bid: pd.Series, ask: pd.Series,
mid: Optional[pd.Series] = None) -> pd.Series:
"""
Spread as percentage of mid price.
"""
if mid is None:
mid = (bid + ask) / 2
return (ask - bid) / mid
@staticmethod
def effective_spread(price: pd.Series, bid: pd.Series,
ask: pd.Series) -> pd.Series:
"""
Effective spread = 2 * |trade_price - mid_price|.
Measures actual execution cost vs. quoted spread.
"""
mid = (bid + ask) / 2
return 2 * np.abs(price - mid) / mid
@staticmethod
def realized_spread(price: pd.Series, bid: pd.Series, ask: pd.Series,
future_mid: pd.Series) -> pd.Series:
"""
Realized spread = 2 * |trade_price - future_mid|.
Measures adverse selection. If realized spread > effective spread,
your trade moved the market against you.
"""
mid = (bid + ask) / 2
return 2 * np.abs(price - future_mid) / mid
@staticmethod
def price_impact(price: pd.Series, volume: pd.Series,
bid: pd.Series, ask: pd.Series) -> pd.Series:
"""
Kyle's Lambda — price impact coefficient.
delta_price = lambda * signed_volume + noise
Higher lambda = less liquid market, your orders move prices more.
"""
mid = (bid + ask) / 2
mid_change = mid.diff()
# Signed volume: Lee-Ready tick test
signed_vol = np.where(
price > mid.shift(1), volume,
np.where(price < mid.shift(1), -volume, 0)
)
# Rolling regression via covariance/variance ratio
return pd.Series(signed_vol).rolling(100).cov(
pd.Series(mid_change).rolling(100)
) / pd.Series(signed_vol).rolling(100).var().replace(0, np.nan)
@staticmethod
def order_flow_imbalance(bid_size: pd.Series, ask_size: pd.Series) -> pd.Series:
"""
OFI = (bid_size - ask_size) / (bid_size + ask_size).
Positive = more buying pressure = bullish.
This is genuine short-term alpha in liquid markets.
"""
return (bid_size - ask_size) / (bid_size + ask_size + 1e-10)
@staticmethod
def volume_imbalance(buy_volume: pd.Series, sell_volume: pd.Series) -> pd.Series:
"""
Volume imbalance = (buy_vol - sell_vol) / (buy_vol + sell_vol).
Classification via tick test or quote test.
"""
return (buy_volume - sell_volume) / (buy_volume + sell_volume + 1e-10)
@staticmethod
def trade_sign_classification(price: pd.Series,
bid: pd.Series,
ask: pd.Series) -> pd.Series:
"""
Lee-Ready tick test for trade direction classification.
If trade price > mid → buy (aggressor is buyer)
If trade price < mid → sell (aggressor is seller)
If trade price = mid → use tick test (compare to previous trade)
"""
mid = (bid + ask) / 2
# Quote test
sign = np.where(price > mid, 1, np.where(price < mid, -1, 0))
# Tick test for mid-trades
price_change = price.diff()
tick_sign = np.where(
price_change > 0, 1,
np.where(price_change < 0, -1, 0)
)
# Use tick test where quote test is inconclusive
sign = np.where(sign == 0, tick_sign, sign)
# If still 0, carry forward
sign = pd.Series(sign).fillna(method='ffill').fillna(0).values
return pd.Series(sign, index=price.index)
@staticmethod
def amihud_illiquidity(price: pd.Series, volume: pd.Series,
window: int = 21) -> pd.Series:
"""
Amihud illiquidity = |return| / (price * volume).
Higher = less liquid.
Used in academic literature to measure market quality.
Predicts returns (illiquid stocks earn premium).
"""
returns = price.pct_change().abs()
dollar_volume = price * volume
return (returns / dollar_volume).rolling(window).mean() * 1e6
@staticmethod
def kyles_lambda(price: pd.Series, volume: pd.Series,
trade_sign: pd.Series, window: int = 100) -> pd.Series:
"""
Kyle's Lambda — price impact per unit of order flow.
Lambda = Cov(delta_price, signed_volume) / Var(signed_volume)
Proxy for adverse selection and market depth.
"""
delta_price = price.diff()
signed_volume = trade_sign * volume
cov = delta_price.rolling(window).cov(signed_volume)
var = signed_volume.rolling(window).var()
return cov / var.replace(0, np.nan)
@staticmethod
def vpin_approximation(price: pd.Series, volume: pd.Series,
bucket_vol: float = 10000) -> float:
"""
VPIN — Volume-Synchronized Probability of Informed Trading.
Simplified approximation using equal-volume buckets.
High VPIN = high probability of informed trading = adverse selection risk.
"""
# Classify trades
mid = price.rolling(2).mean()
trade_sign = np.where(price > mid.shift(1), 1, -1)
signed_volume = trade_sign * volume
buy_volume = np.where(signed_volume > 0, volume, 0)
sell_volume = np.where(signed_volume < 0, volume, 0)
# Create volume buckets
cumulative = np.cumsum(volume)
n_buckets = int(cumulative[-1] / bucket_vol)
if n_buckets < 10:
return np.nan
bucket_boundaries = np.linspace(0, cumulative[-1], n_buckets + 1)
bucket_buy = []
bucket_sell = []
for i in range(n_buckets):
mask = (cumulative >= bucket_boundaries[i]) & (cumulative < bucket_boundaries[i+1])
bucket_buy.append(np.sum(buy_volume[mask]))
bucket_sell.append(np.sum(sell_volume[mask]))
bucket_buy = np.array(bucket_buy)
bucket_sell = np.array(bucket_sell)
bucket_volume = bucket_buy + bucket_sell
# VPIN = average |buy - sell| / volume
vpin_values = np.abs(bucket_buy - bucket_sell) / (bucket_volume + 1e-10)
return np.mean(vpin_values)
@staticmethod
def roll_measure(price: pd.Series, window: int = 20) -> pd.Series:
"""
Roll's measure — estimate bid-ask spread from serial covariance.
Spread = 2 * sqrt(-Cov(delta_price_t, delta_price_{t-1}))
Only valid when covariance is negative.
"""
delta = price.diff()
cov = delta.rolling(window).cov(delta.shift(1))
# Roll's measure
roll = 2 * np.sqrt(np.maximum(-cov, 0))
return roll
@staticmethod
def hasbrouck_lambda(price: pd.Series, volume: pd.Series,
window: int = 100) -> pd.Series:
"""
Hasbrouck's Lambda — information-based price impact.
Measures how much of the price change is due to information
vs. liquidity demand.
"""
# Simplified: correlation of returns with lagged signed volume
returns = price.pct_change()
trade_sign = np.sign(price.diff().fillna(0))
signed_volume = trade_sign * volume
return returns.rolling(window).corr(signed_volume.shift(1))
def compute_all_microstructure_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute all microstructure features from a tick DataFrame.
Required columns: price, volume, bid, ask, bid_size, ask_size
"""
required = ['price', 'volume', 'bid', 'ask', 'bid_size', 'ask_size']
for col in required:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
features = pd.DataFrame(index=df.index)
# Basic spread
features['spread'] = MicrostructureFeatures.bid_ask_spread(df['bid'], df['ask'])
features['relative_spread'] = MicrostructureFeatures.relative_spread(
df['bid'], df['ask']
)
# Effective spread
features['effective_spread'] = MicrostructureFeatures.effective_spread(
df['price'], df['bid'], df['ask']
)
# Order flow imbalance
features['ofi'] = MicrostructureFeatures.order_flow_imbalance(
df['bid_size'], df['ask_size']
)
# Trade sign classification
features['trade_sign'] = MicrostructureFeatures.trade_sign_classification(
df['price'], df['bid'], df['ask']
)
# Signed volume
features['signed_volume'] = features['trade_sign'] * df['volume']
features['volume_imbalance'] = MicrostructureFeatures.volume_imbalance(
np.where(features['trade_sign'] > 0, df['volume'], 0),
np.where(features['trade_sign'] < 0, df['volume'], 0)
)
# Amihud illiquidity (using daily approximation from intraday)
features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity(
df['price'], df['volume']
)
# Kyle's lambda
features['kyle_lambda'] = MicrostructureFeatures.kyles_lambda(
df['price'], df['volume'], features['trade_sign']
)
# Roll's measure
features['roll_measure'] = MicrostructureFeatures.roll_measure(df['price'])
# Hasbrouck lambda
features['hasbrouck_lambda'] = MicrostructureFeatures.hasbrouck_lambda(
df['price'], df['volume']
)
# VPIN (computed once, broadcast)
vpin = MicrostructureFeatures.vpin_approximation(df['price'], df['volume'])
features['vpin'] = vpin
return features.replace([np.inf, -np.inf], np.nan).fillna(method='ffill').fillna(0)
def generate_synthetic_tick_data(n_ticks: int = 10000,
base_price: float = 100.0,
volatility: float = 0.001,
spread_bps: float = 1.0) -> pd.DataFrame:
"""
Generate synthetic tick-level data for testing microstructure features.
"""
np.random.seed(42)
# Price process: random walk with slight mean reversion
prices = [base_price]
for _ in range(n_ticks - 1):
# Small random walk
change = np.random.randn() * volatility * base_price
# Mean reversion
change -= 0.01 * (prices[-1] - base_price)
prices.append(max(prices[-1] + change, 0.01))
prices = np.array(prices)
# Bid-ask spread
half_spread = prices * spread_bps / 20000 # bps to dollars
bid = prices - half_spread
ask = prices + half_spread
# Sizes (power law: few large orders, many small)
bid_size = np.random.lognormal(8, 1.5, n_ticks).astype(int)
ask_size = np.random.lognormal(8, 1.5, n_ticks).astype(int)
# Volume (trades happen at mid mostly)
volume = np.random.lognormal(6, 1.2, n_ticks).astype(int)
# Timestamp
times = pd.date_range('2024-01-01 09:30', periods=n_ticks, freq='1s')
return pd.DataFrame({
'timestamp': times,
'price': prices,
'bid': bid,
'ask': ask,
'bid_size': bid_size,
'ask_size': ask_size,
'volume': volume
}).set_index('timestamp')
if __name__ == '__main__':
# Test microstructure features
tick_data = generate_synthetic_tick_data(n_ticks=5000)
features = compute_all_microstructure_features(tick_data)
print("Market Microstructure Features")
print("=" * 60)
print(f"\nDataset: {len(tick_data)} ticks")
print(f"Features computed: {len(features.columns)}")
print(f"\nFeature Summary:")
print(features.describe().round(6))
print(f"\nVPIN (Volume-Synchronized Probability of Informed Trading):")
print(f" {features['vpin'].iloc[0]:.4f}")
print(f"\nSample Features (last 5 ticks):")
print(features[['spread', 'relative_spread', 'ofi', 'kyle_lambda',
'amihud_illiquidity']].tail().round(6))