"""Advanced Feature Engineering Part 1 - Microstructure & Cross-Sectional""" import numpy as np import pandas as pd from typing import Dict, List, Optional import warnings warnings.filterwarnings('ignore') class MicrostructureFeatures: """Market microstructure features from OHLCV data""" @staticmethod def amihud_illiquidity(close, volume, window=21): """Amihud (2002) illiquidity: avg |return| / dollar_volume""" dollar_vol = close * volume abs_ret = close.pct_change().abs() return (abs_ret / dollar_vol).rolling(window).mean() @staticmethod def kyle_lambda(close, volume, window=21): """Kyle's lambda: price impact per unit volume""" abs_ret = close.pct_change().abs() signed_vol = volume * np.sign(close.pct_change()) cov = abs_ret.rolling(window).cov(signed_vol) var = signed_vol.rolling(window).var() return cov / var.replace(0, np.nan) @staticmethod def bid_ask_spread_proxy(high, low, close, window=21): """Corwin & Schultz (2012) spread estimator""" beta = ((high - low) ** 2).rolling(window).sum() spread = 2 * (np.exp(np.sqrt(2*beta) - beta) - 1) / (1 + np.exp(np.sqrt(2*beta) - beta)) return spread @staticmethod def vwap(close, high, low, volume, window=14): """Volume-weighted average price""" typical_price = (high + low + close) / 3 vp = typical_price * volume cum_vp = vp.rolling(window).sum() cum_vol = volume.rolling(window).sum() return cum_vp / cum_vol.replace(0, np.nan) @staticmethod def roll_spread(close, window=20): """Roll (1984) effective spread estimator""" delta_p = close.diff() cov = delta_p.rolling(window).apply(lambda x: np.cov(x[:-1], x[1:])[0,1]) return 2 * np.sqrt(-cov.clip(upper=0)) @staticmethod def compute_all(close, high, low, volume): """Compute all microstructure features""" features = pd.DataFrame(index=close.index) features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity(close, volume) features['kyle_lambda'] = MicrostructureFeatures.kyle_lambda(close, volume) features['bid_ask_spread'] = MicrostructureFeatures.bid_ask_spread_proxy(high, low, close) features['vwap_ratio'] = close / MicrostructureFeatures.vwap(close, high, low, volume) features['roll_spread'] = MicrostructureFeatures.roll_spread(close) # Dollar volume features dollar_vol = close * volume features['dollar_vol_rank'] = dollar_vol.rolling(63).rank(pct=True) features['volume_trend'] = volume.rolling(21).mean() / volume.rolling(63).mean() - 1 features['volume_delta'] = ((close > close.shift(1)).astype(float) * volume - \ (close < close.shift(1)).astype(float) * volume) / volume.rolling(21).mean() return features class CrossSectionalFeatures: """Cross-sectional ranking and momentum features""" @staticmethod def momentum_score(returns, periods=[5, 21, 63, 126, 252]): """Cross-sectional momentum ranking""" features = pd.DataFrame(index=returns.index) for p in periods: cum_ret = returns.rolling(p).sum() features[f'cs_mom_{p}d'] = cum_ret.rank(axis=1, pct=True) return features @staticmethod def mean_reversion(returns, short=5, long=63): """Short-term reversal vs medium-term momentum""" short_ret = returns.rolling(short).sum() long_ret = returns.rolling(long).sum() features = pd.DataFrame(index=returns.index) features['mr_signal'] = short_ret.rank(axis=1, pct=True) - long_ret.rank(axis=1, pct=True) features['mr_short'] = -short_ret.rank(axis=1, pct=True) return features @staticmethod def dispersion(returns, window=21): """Cross-sectional return dispersion""" features = pd.DataFrame(index=returns.index) features['cs_std'] = returns.rolling(window).std(axis=1) features['cs_range'] = returns.rolling(window).max(axis=1) - returns.rolling(window).min(axis=1) features['cs_skew'] = returns.rolling(window).skew(axis=1) features['cs_kurt'] = returns.rolling(window).kurt(axis=1) return features