alphaforge-quant-system / advanced_features_part1.py
Premchan369's picture
Upload advanced_features_part1.py
7370e85 verified
"""Advanced Feature Engineering Part 1 - Microstructure & Cross-Sectional"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')
class MicrostructureFeatures:
"""Market microstructure features from OHLCV data"""
@staticmethod
def amihud_illiquidity(close, volume, window=21):
"""Amihud (2002) illiquidity: avg |return| / dollar_volume"""
dollar_vol = close * volume
abs_ret = close.pct_change().abs()
return (abs_ret / dollar_vol).rolling(window).mean()
@staticmethod
def kyle_lambda(close, volume, window=21):
"""Kyle's lambda: price impact per unit volume"""
abs_ret = close.pct_change().abs()
signed_vol = volume * np.sign(close.pct_change())
cov = abs_ret.rolling(window).cov(signed_vol)
var = signed_vol.rolling(window).var()
return cov / var.replace(0, np.nan)
@staticmethod
def bid_ask_spread_proxy(high, low, close, window=21):
"""Corwin & Schultz (2012) spread estimator"""
beta = ((high - low) ** 2).rolling(window).sum()
spread = 2 * (np.exp(np.sqrt(2*beta) - beta) - 1) / (1 + np.exp(np.sqrt(2*beta) - beta))
return spread
@staticmethod
def vwap(close, high, low, volume, window=14):
"""Volume-weighted average price"""
typical_price = (high + low + close) / 3
vp = typical_price * volume
cum_vp = vp.rolling(window).sum()
cum_vol = volume.rolling(window).sum()
return cum_vp / cum_vol.replace(0, np.nan)
@staticmethod
def roll_spread(close, window=20):
"""Roll (1984) effective spread estimator"""
delta_p = close.diff()
cov = delta_p.rolling(window).apply(lambda x: np.cov(x[:-1], x[1:])[0,1])
return 2 * np.sqrt(-cov.clip(upper=0))
@staticmethod
def compute_all(close, high, low, volume):
"""Compute all microstructure features"""
features = pd.DataFrame(index=close.index)
features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity(close, volume)
features['kyle_lambda'] = MicrostructureFeatures.kyle_lambda(close, volume)
features['bid_ask_spread'] = MicrostructureFeatures.bid_ask_spread_proxy(high, low, close)
features['vwap_ratio'] = close / MicrostructureFeatures.vwap(close, high, low, volume)
features['roll_spread'] = MicrostructureFeatures.roll_spread(close)
# Dollar volume features
dollar_vol = close * volume
features['dollar_vol_rank'] = dollar_vol.rolling(63).rank(pct=True)
features['volume_trend'] = volume.rolling(21).mean() / volume.rolling(63).mean() - 1
features['volume_delta'] = ((close > close.shift(1)).astype(float) * volume - \
(close < close.shift(1)).astype(float) * volume) / volume.rolling(21).mean()
return features
class CrossSectionalFeatures:
"""Cross-sectional ranking and momentum features"""
@staticmethod
def momentum_score(returns, periods=[5, 21, 63, 126, 252]):
"""Cross-sectional momentum ranking"""
features = pd.DataFrame(index=returns.index)
for p in periods:
cum_ret = returns.rolling(p).sum()
features[f'cs_mom_{p}d'] = cum_ret.rank(axis=1, pct=True)
return features
@staticmethod
def mean_reversion(returns, short=5, long=63):
"""Short-term reversal vs medium-term momentum"""
short_ret = returns.rolling(short).sum()
long_ret = returns.rolling(long).sum()
features = pd.DataFrame(index=returns.index)
features['mr_signal'] = short_ret.rank(axis=1, pct=True) - long_ret.rank(axis=1, pct=True)
features['mr_short'] = -short_ret.rank(axis=1, pct=True)
return features
@staticmethod
def dispersion(returns, window=21):
"""Cross-sectional return dispersion"""
features = pd.DataFrame(index=returns.index)
features['cs_std'] = returns.rolling(window).std(axis=1)
features['cs_range'] = returns.rolling(window).max(axis=1) - returns.rolling(window).min(axis=1)
features['cs_skew'] = returns.rolling(window).skew(axis=1)
features['cs_kurt'] = returns.rolling(window).kurt(axis=1)
return features