File size: 13,281 Bytes
96e73d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 | """Market Microstructure Features
Based on Marcos Lopez de Prado and the mlfinlab library.
This is what separates retail technical analysis from institutional quant.
Order flow, liquidity, and market impact contain genuine alpha.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
class MicrostructureFeatures:
"""
Extract market microstructure features from tick-level data.
Key insight: The market is not a continuous price stream.
It is a series of discrete transactions driven by informed vs.
uninformed traders. Microstructure features detect this.
"""
@staticmethod
def bid_ask_spread(bid: pd.Series, ask: pd.Series) -> pd.Series:
"""
Raw bid-ask spread.
Wider spreads = lower liquidity, higher execution cost.
"""
return ask - bid
@staticmethod
def relative_spread(bid: pd.Series, ask: pd.Series,
mid: Optional[pd.Series] = None) -> pd.Series:
"""
Spread as percentage of mid price.
"""
if mid is None:
mid = (bid + ask) / 2
return (ask - bid) / mid
@staticmethod
def effective_spread(price: pd.Series, bid: pd.Series,
ask: pd.Series) -> pd.Series:
"""
Effective spread = 2 * |trade_price - mid_price|.
Measures actual execution cost vs. quoted spread.
"""
mid = (bid + ask) / 2
return 2 * np.abs(price - mid) / mid
@staticmethod
def realized_spread(price: pd.Series, bid: pd.Series, ask: pd.Series,
future_mid: pd.Series) -> pd.Series:
"""
Realized spread = 2 * |trade_price - future_mid|.
Measures adverse selection. If realized spread > effective spread,
your trade moved the market against you.
"""
mid = (bid + ask) / 2
return 2 * np.abs(price - future_mid) / mid
@staticmethod
def price_impact(price: pd.Series, volume: pd.Series,
bid: pd.Series, ask: pd.Series) -> pd.Series:
"""
Kyle's Lambda — price impact coefficient.
delta_price = lambda * signed_volume + noise
Higher lambda = less liquid market, your orders move prices more.
"""
mid = (bid + ask) / 2
mid_change = mid.diff()
# Signed volume: Lee-Ready tick test
signed_vol = np.where(
price > mid.shift(1), volume,
np.where(price < mid.shift(1), -volume, 0)
)
# Rolling regression via covariance/variance ratio
return pd.Series(signed_vol).rolling(100).cov(
pd.Series(mid_change).rolling(100)
) / pd.Series(signed_vol).rolling(100).var().replace(0, np.nan)
@staticmethod
def order_flow_imbalance(bid_size: pd.Series, ask_size: pd.Series) -> pd.Series:
"""
OFI = (bid_size - ask_size) / (bid_size + ask_size).
Positive = more buying pressure = bullish.
This is genuine short-term alpha in liquid markets.
"""
return (bid_size - ask_size) / (bid_size + ask_size + 1e-10)
@staticmethod
def volume_imbalance(buy_volume: pd.Series, sell_volume: pd.Series) -> pd.Series:
"""
Volume imbalance = (buy_vol - sell_vol) / (buy_vol + sell_vol).
Classification via tick test or quote test.
"""
return (buy_volume - sell_volume) / (buy_volume + sell_volume + 1e-10)
@staticmethod
def trade_sign_classification(price: pd.Series,
bid: pd.Series,
ask: pd.Series) -> pd.Series:
"""
Lee-Ready tick test for trade direction classification.
If trade price > mid → buy (aggressor is buyer)
If trade price < mid → sell (aggressor is seller)
If trade price = mid → use tick test (compare to previous trade)
"""
mid = (bid + ask) / 2
# Quote test
sign = np.where(price > mid, 1, np.where(price < mid, -1, 0))
# Tick test for mid-trades
price_change = price.diff()
tick_sign = np.where(
price_change > 0, 1,
np.where(price_change < 0, -1, 0)
)
# Use tick test where quote test is inconclusive
sign = np.where(sign == 0, tick_sign, sign)
# If still 0, carry forward
sign = pd.Series(sign).fillna(method='ffill').fillna(0).values
return pd.Series(sign, index=price.index)
@staticmethod
def amihud_illiquidity(price: pd.Series, volume: pd.Series,
window: int = 21) -> pd.Series:
"""
Amihud illiquidity = |return| / (price * volume).
Higher = less liquid.
Used in academic literature to measure market quality.
Predicts returns (illiquid stocks earn premium).
"""
returns = price.pct_change().abs()
dollar_volume = price * volume
return (returns / dollar_volume).rolling(window).mean() * 1e6
@staticmethod
def kyles_lambda(price: pd.Series, volume: pd.Series,
trade_sign: pd.Series, window: int = 100) -> pd.Series:
"""
Kyle's Lambda — price impact per unit of order flow.
Lambda = Cov(delta_price, signed_volume) / Var(signed_volume)
Proxy for adverse selection and market depth.
"""
delta_price = price.diff()
signed_volume = trade_sign * volume
cov = delta_price.rolling(window).cov(signed_volume)
var = signed_volume.rolling(window).var()
return cov / var.replace(0, np.nan)
@staticmethod
def vpin_approximation(price: pd.Series, volume: pd.Series,
bucket_vol: float = 10000) -> float:
"""
VPIN — Volume-Synchronized Probability of Informed Trading.
Simplified approximation using equal-volume buckets.
High VPIN = high probability of informed trading = adverse selection risk.
"""
# Classify trades
mid = price.rolling(2).mean()
trade_sign = np.where(price > mid.shift(1), 1, -1)
signed_volume = trade_sign * volume
buy_volume = np.where(signed_volume > 0, volume, 0)
sell_volume = np.where(signed_volume < 0, volume, 0)
# Create volume buckets
cumulative = np.cumsum(volume)
n_buckets = int(cumulative[-1] / bucket_vol)
if n_buckets < 10:
return np.nan
bucket_boundaries = np.linspace(0, cumulative[-1], n_buckets + 1)
bucket_buy = []
bucket_sell = []
for i in range(n_buckets):
mask = (cumulative >= bucket_boundaries[i]) & (cumulative < bucket_boundaries[i+1])
bucket_buy.append(np.sum(buy_volume[mask]))
bucket_sell.append(np.sum(sell_volume[mask]))
bucket_buy = np.array(bucket_buy)
bucket_sell = np.array(bucket_sell)
bucket_volume = bucket_buy + bucket_sell
# VPIN = average |buy - sell| / volume
vpin_values = np.abs(bucket_buy - bucket_sell) / (bucket_volume + 1e-10)
return np.mean(vpin_values)
@staticmethod
def roll_measure(price: pd.Series, window: int = 20) -> pd.Series:
"""
Roll's measure — estimate bid-ask spread from serial covariance.
Spread = 2 * sqrt(-Cov(delta_price_t, delta_price_{t-1}))
Only valid when covariance is negative.
"""
delta = price.diff()
cov = delta.rolling(window).cov(delta.shift(1))
# Roll's measure
roll = 2 * np.sqrt(np.maximum(-cov, 0))
return roll
@staticmethod
def hasbrouck_lambda(price: pd.Series, volume: pd.Series,
window: int = 100) -> pd.Series:
"""
Hasbrouck's Lambda — information-based price impact.
Measures how much of the price change is due to information
vs. liquidity demand.
"""
# Simplified: correlation of returns with lagged signed volume
returns = price.pct_change()
trade_sign = np.sign(price.diff().fillna(0))
signed_volume = trade_sign * volume
return returns.rolling(window).corr(signed_volume.shift(1))
def compute_all_microstructure_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute all microstructure features from a tick DataFrame.
Required columns: price, volume, bid, ask, bid_size, ask_size
"""
required = ['price', 'volume', 'bid', 'ask', 'bid_size', 'ask_size']
for col in required:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
features = pd.DataFrame(index=df.index)
# Basic spread
features['spread'] = MicrostructureFeatures.bid_ask_spread(df['bid'], df['ask'])
features['relative_spread'] = MicrostructureFeatures.relative_spread(
df['bid'], df['ask']
)
# Effective spread
features['effective_spread'] = MicrostructureFeatures.effective_spread(
df['price'], df['bid'], df['ask']
)
# Order flow imbalance
features['ofi'] = MicrostructureFeatures.order_flow_imbalance(
df['bid_size'], df['ask_size']
)
# Trade sign classification
features['trade_sign'] = MicrostructureFeatures.trade_sign_classification(
df['price'], df['bid'], df['ask']
)
# Signed volume
features['signed_volume'] = features['trade_sign'] * df['volume']
features['volume_imbalance'] = MicrostructureFeatures.volume_imbalance(
np.where(features['trade_sign'] > 0, df['volume'], 0),
np.where(features['trade_sign'] < 0, df['volume'], 0)
)
# Amihud illiquidity (using daily approximation from intraday)
features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity(
df['price'], df['volume']
)
# Kyle's lambda
features['kyle_lambda'] = MicrostructureFeatures.kyles_lambda(
df['price'], df['volume'], features['trade_sign']
)
# Roll's measure
features['roll_measure'] = MicrostructureFeatures.roll_measure(df['price'])
# Hasbrouck lambda
features['hasbrouck_lambda'] = MicrostructureFeatures.hasbrouck_lambda(
df['price'], df['volume']
)
# VPIN (computed once, broadcast)
vpin = MicrostructureFeatures.vpin_approximation(df['price'], df['volume'])
features['vpin'] = vpin
return features.replace([np.inf, -np.inf], np.nan).fillna(method='ffill').fillna(0)
def generate_synthetic_tick_data(n_ticks: int = 10000,
base_price: float = 100.0,
volatility: float = 0.001,
spread_bps: float = 1.0) -> pd.DataFrame:
"""
Generate synthetic tick-level data for testing microstructure features.
"""
np.random.seed(42)
# Price process: random walk with slight mean reversion
prices = [base_price]
for _ in range(n_ticks - 1):
# Small random walk
change = np.random.randn() * volatility * base_price
# Mean reversion
change -= 0.01 * (prices[-1] - base_price)
prices.append(max(prices[-1] + change, 0.01))
prices = np.array(prices)
# Bid-ask spread
half_spread = prices * spread_bps / 20000 # bps to dollars
bid = prices - half_spread
ask = prices + half_spread
# Sizes (power law: few large orders, many small)
bid_size = np.random.lognormal(8, 1.5, n_ticks).astype(int)
ask_size = np.random.lognormal(8, 1.5, n_ticks).astype(int)
# Volume (trades happen at mid mostly)
volume = np.random.lognormal(6, 1.2, n_ticks).astype(int)
# Timestamp
times = pd.date_range('2024-01-01 09:30', periods=n_ticks, freq='1s')
return pd.DataFrame({
'timestamp': times,
'price': prices,
'bid': bid,
'ask': ask,
'bid_size': bid_size,
'ask_size': ask_size,
'volume': volume
}).set_index('timestamp')
if __name__ == '__main__':
# Test microstructure features
tick_data = generate_synthetic_tick_data(n_ticks=5000)
features = compute_all_microstructure_features(tick_data)
print("Market Microstructure Features")
print("=" * 60)
print(f"\nDataset: {len(tick_data)} ticks")
print(f"Features computed: {len(features.columns)}")
print(f"\nFeature Summary:")
print(features.describe().round(6))
print(f"\nVPIN (Volume-Synchronized Probability of Informed Trading):")
print(f" {features['vpin'].iloc[0]:.4f}")
print(f"\nSample Features (last 5 ticks):")
print(features[['spread', 'relative_spread', 'ofi', 'kyle_lambda',
'amihud_illiquidity']].tail().round(6))
|