File size: 4,377 Bytes
7370e85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Advanced Feature Engineering Part 1 - Microstructure & Cross-Sectional"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')


class MicrostructureFeatures:
    """Market microstructure features from OHLCV data"""
    
    @staticmethod
    def amihud_illiquidity(close, volume, window=21):
        """Amihud (2002) illiquidity: avg |return| / dollar_volume"""
        dollar_vol = close * volume
        abs_ret = close.pct_change().abs()
        return (abs_ret / dollar_vol).rolling(window).mean()
    
    @staticmethod
    def kyle_lambda(close, volume, window=21):
        """Kyle's lambda: price impact per unit volume"""
        abs_ret = close.pct_change().abs()
        signed_vol = volume * np.sign(close.pct_change())
        cov = abs_ret.rolling(window).cov(signed_vol)
        var = signed_vol.rolling(window).var()
        return cov / var.replace(0, np.nan)
    
    @staticmethod
    def bid_ask_spread_proxy(high, low, close, window=21):
        """Corwin & Schultz (2012) spread estimator"""
        beta = ((high - low) ** 2).rolling(window).sum()
        spread = 2 * (np.exp(np.sqrt(2*beta) - beta) - 1) / (1 + np.exp(np.sqrt(2*beta) - beta))
        return spread
    
    @staticmethod
    def vwap(close, high, low, volume, window=14):
        """Volume-weighted average price"""
        typical_price = (high + low + close) / 3
        vp = typical_price * volume
        cum_vp = vp.rolling(window).sum()
        cum_vol = volume.rolling(window).sum()
        return cum_vp / cum_vol.replace(0, np.nan)
    
    @staticmethod
    def roll_spread(close, window=20):
        """Roll (1984) effective spread estimator"""
        delta_p = close.diff()
        cov = delta_p.rolling(window).apply(lambda x: np.cov(x[:-1], x[1:])[0,1])
        return 2 * np.sqrt(-cov.clip(upper=0))
    
    @staticmethod
    def compute_all(close, high, low, volume):
        """Compute all microstructure features"""
        features = pd.DataFrame(index=close.index)
        features['amihud_illiquidity'] = MicrostructureFeatures.amihud_illiquidity(close, volume)
        features['kyle_lambda'] = MicrostructureFeatures.kyle_lambda(close, volume)
        features['bid_ask_spread'] = MicrostructureFeatures.bid_ask_spread_proxy(high, low, close)
        features['vwap_ratio'] = close / MicrostructureFeatures.vwap(close, high, low, volume)
        features['roll_spread'] = MicrostructureFeatures.roll_spread(close)
        # Dollar volume features
        dollar_vol = close * volume
        features['dollar_vol_rank'] = dollar_vol.rolling(63).rank(pct=True)
        features['volume_trend'] = volume.rolling(21).mean() / volume.rolling(63).mean() - 1
        features['volume_delta'] = ((close > close.shift(1)).astype(float) * volume - \
                                     (close < close.shift(1)).astype(float) * volume) / volume.rolling(21).mean()
        return features


class CrossSectionalFeatures:
    """Cross-sectional ranking and momentum features"""
    
    @staticmethod
    def momentum_score(returns, periods=[5, 21, 63, 126, 252]):
        """Cross-sectional momentum ranking"""
        features = pd.DataFrame(index=returns.index)
        for p in periods:
            cum_ret = returns.rolling(p).sum()
            features[f'cs_mom_{p}d'] = cum_ret.rank(axis=1, pct=True)
        return features
    
    @staticmethod
    def mean_reversion(returns, short=5, long=63):
        """Short-term reversal vs medium-term momentum"""
        short_ret = returns.rolling(short).sum()
        long_ret = returns.rolling(long).sum()
        features = pd.DataFrame(index=returns.index)
        features['mr_signal'] = short_ret.rank(axis=1, pct=True) - long_ret.rank(axis=1, pct=True)
        features['mr_short'] = -short_ret.rank(axis=1, pct=True)
        return features
    
    @staticmethod
    def dispersion(returns, window=21):
        """Cross-sectional return dispersion"""
        features = pd.DataFrame(index=returns.index)
        features['cs_std'] = returns.rolling(window).std(axis=1)
        features['cs_range'] = returns.rolling(window).max(axis=1) - returns.rolling(window).min(axis=1)
        features['cs_skew'] = returns.rolling(window).skew(axis=1)
        features['cs_kurt'] = returns.rolling(window).kurt(axis=1)
        return features