Add cross-sectional factors - Fama-French, momentum, quality, low-vol style factors
cee7654 verified | """cross_sectional_factors.py — Cross-Sectional Factor Construction | |
| Implements classic and modern equity style factors: Fama-French 5-factor, | |
| momentum (Carhart), quality ( profitability + low investment), low volatility, | |
| value (book-to-market, earnings yield), size (SMB), and liquidity. | |
| References: | |
| - Fama & French 2015: "A Five-Factor Asset Pricing Model" | |
| - Carhart 1997: "On Persistence in Mutual Fund Performance" | |
| - Asness et al. 2013: "The Devil in HML's Details" (quality factor) | |
| - Blitz & van Vliet 2007: "The Volatility Effect" | |
| """ | |
| import numpy as np, pandas as pd | |
| class CrossSectionalFactorModel: | |
| """Constructs and scores equity style factors cross-sectionally.""" | |
| FACTORS = ['MKT', 'SMB', 'HML', 'RMW', 'CMA', 'MOM', 'QUAL', 'BAB', 'LIQ'] | |
| def __init__(self, lookback=252, n_quantiles=10): | |
| self.lookback = lookback | |
| self.n_q = n_quantiles | |
| def value_factor(self, prices, book_values): | |
| """HML: High book-to-market minus Low.""" | |
| bv = book_values.reindex(prices.index, method='ffill') | |
| btm = bv / prices | |
| return self._long_short_rank(bt_m, 'high') | |
| def size_factor(self, prices, market_caps): | |
| """SMB: Small minus Big.""" | |
| mc = market_caps.reindex(prices.index, method='ffill') | |
| return self._long_short_rank(mc, 'low') | |
| def momentum_factor(self, prices, window=252, skip=21): | |
| """MOM: 12-1 month momentum (skip most recent month).""" | |
| mom = prices.pct_change(window).shift(skip) | |
| return self._long_short_rank(mom, 'high') | |
| def quality_factor(self, prices, roe, accruals, leverage): | |
| """QUAL: profitability + low accruals + low leverage.""" | |
| roe_s = self._zscore(roe) | |
| acc_s = -self._zscore(accruals) # Low accruals = good | |
| lev_s = -self._zscore(leverage) # Low leverage = good | |
| qual = (roe_s + acc_s + lev_s) / 3.0 | |
| return self._long_short_rank(qual, 'high') | |
| def low_vol_factor(self, prices, window=63): | |
| """BAB: Betting Against Beta / low volatility.""" | |
| vol = prices.pct_change().rolling(window).std() * np.sqrt(252) | |
| return self._long_short_rank(vol, 'low') | |
| def liquidity_factor(self, prices, volumes, window=63): | |
| """LIQ: Amihud illiquidity.""" | |
| ret = prices.pct_change().abs() | |
| illiq = (ret / (volumes.reindex(prices.index) / prices)).rolling(window).mean() | |
| return self._long_short_rank(illiq, 'low') # Long liquid, short illiquid | |
| def _zscore(self, x): | |
| return (x - x.mean()) / (x.std() + 1e-10) | |
| def _long_short_rank(self, scores, direction='high'): | |
| """Form long-short portfolio from cross-sectional scores.""" | |
| valid = scores.dropna() | |
| if len(valid) == 0: return pd.Series() | |
| q = pd.qcut(valid, self.n_q, labels=False, duplicates='drop') | |
| if direction == 'high': | |
| long = q[q == q.max()].index | |
| short = q[q == q.min()].index | |
| else: | |
| long = q[q == q.min()].index | |
| short = q[q == q.max()].index | |
| ls = pd.Series(0.0, index=scores.index) | |
| ls.loc[long] = 1.0 / len(long) | |
| ls.loc[short] = -1.0 / len(short) | |
| return ls | |
| def factor_returns(self, prices, factors_dict): | |
| """Compute factor returns from price series and factor portfolios.""" | |
| ret = prices.pct_change().shift(-1) # t+1 return | |
| factor_rets = {} | |
| for name, weights in factors_dict.items(): | |
| w = weights.reindex(ret.columns, fill_value=0) | |
| factor_rets[name] = (ret * w).sum(axis=1) | |
| return pd.DataFrame(factor_rets) | |
| def factor_exposures(self, returns, factor_returns): | |
| """Estimate factor betas via rolling regression.""" | |
| betas = {} | |
| for col in returns.columns: | |
| y = returns[col].dropna() | |
| X = factor_returns.reindex(y.index).dropna() | |
| common = y.index.intersection(X.index) | |
| if len(common) < 30: continue | |
| yc = y.loc[common].values | |
| Xc = np.column_stack([np.ones(len(common)), X.loc[common].values]) | |
| beta = np.linalg.lstsq(Xc, yc, rcond=None)[0] | |
| betas[col] = dict(zip(['alpha'] + list(factor_returns.columns), beta)) | |
| return pd.DataFrame(betas).T | |
| def factor_report(self, prices, book=None, mc=None, volumes=None): | |
| """Generate full factor report for an asset.""" | |
| ret = prices.pct_change().dropna() | |
| report = {"momentum_12m": float((prices.iloc[-1]/prices.iloc[-min(252,len(prices))]-1)), | |
| "volatility_3m": float(ret.tail(63).std()*np.sqrt(252)), | |
| "sharpe_1y": float(ret.tail(252).mean()*252/(ret.tail(252).std()*np.sqrt(252)+1e-10)), | |
| "max_drawdown": float(((1+ret).cumprod().expanding().max()-(1+ret).cumprod())/(1+ret).cumprod().expanding().max()).max()), | |
| "skewness": float(ret.skew()), | |
| "kurtosis": float(ret.kurtosis())} | |
| if book is not None: | |
| report["book_to_market"] = float(book.iloc[-1] / prices.iloc[-1]) if prices.iloc[-1] > 0 else 0 | |
| if mc is not None: | |
| report["market_cap"] = float(mc.iloc[-1]) | |
| report["size_decile"] = int(pd.qcut(mc, 10, labels=False, duplicates='drop').iloc[-1]) + 1 | |
| if volumes is not None: | |
| report["avg_volume"] = float(volumes.tail(20).mean()) | |
| report["dollar_volume"] = float(volumes.tail(20).mean() * prices.tail(20).mean()) | |
| return report | |
| if __name__ == '__main__': | |
| np.random.seed(42) | |
| prices = pd.Series(np.cumprod(1 + np.random.normal(0.0005, 0.015, 500)), | |
| index=pd.date_range('2022-01-01', periods=500, freq='B')) | |
| model = CrossSectionalFactorModel() | |
| print(model.factor_report(prices)) | |