| """Factor Decomposition Engine - Break returns into style factors.""" |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, Optional |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class FactorDecomposition: |
| """Decompose returns into style factors.""" |
| |
| def __init__(self): |
| self.factor_names = ['momentum', 'value', 'size', 'volatility', 'quality', 'market'] |
| self.factor_returns = None |
| self.exposures = None |
| |
| def compute_factor_returns(self, returns_df: pd.DataFrame) -> pd.DataFrame: |
| factors = pd.DataFrame(index=returns_df.index) |
| factors['market'] = returns_df.mean(axis=1) |
| |
| momentum_rets = []; value_rets = []; size_rets = []; vol_rets = []; quality_rets = [] |
| |
| for date in returns_df.index: |
| day_returns = returns_df.loc[date].dropna() |
| if len(day_returns) < 5: |
| for lst in [momentum_rets, value_rets, size_rets, vol_rets, quality_rets]: |
| lst.append(0) |
| continue |
| |
| past_returns = returns_df.loc[:date].iloc[-21:-1].mean().reindex(day_returns.index).fillna(0) if date in returns_df.index else pd.Series(0, index=day_returns.index) |
| value_score = 1.0 / (1 + day_returns.abs()) |
| size_score = 1.0 / (day_returns.rolling(21).std().reindex(day_returns.index).fillna(0.01) + 0.01) |
| vol_score = -day_returns.rolling(21).std().reindex(day_returns.index).fillna(0.01) |
| quality_score = (day_returns > 0).astype(float) |
| |
| X = pd.DataFrame({'momentum': past_returns, 'value': value_score, 'size': size_score, |
| 'volatility': vol_score, 'quality': quality_score}).fillna(0) |
| X = (X - X.mean()) / (X.std() + 1e-8) |
| y = day_returns.reindex(X.index).fillna(0) |
| |
| try: |
| coefs = np.linalg.lstsq(X.values, y.values, rcond=None)[0] |
| except: |
| coefs = np.zeros(5) |
| |
| momentum_rets.append(coefs[0]); value_rets.append(coefs[1]); size_rets.append(coefs[2]) |
| vol_rets.append(coefs[3]); quality_rets.append(coefs[4]) |
| |
| factors['momentum'] = momentum_rets; factors['value'] = value_rets |
| factors['size'] = size_rets; factors['volatility'] = vol_rets; factors['quality'] = quality_rets |
| |
| self.factor_returns = factors |
| return factors |
| |
| def compute_exposures(self, asset_returns: pd.Series, factor_returns: pd.DataFrame, window: int = 63) -> pd.DataFrame: |
| from sklearn.linear_model import Ridge |
| exposures = pd.DataFrame(index=asset_returns.index, columns=factor_returns.columns) |
| |
| for factor in factor_returns.columns: |
| for i in range(len(asset_returns)): |
| if i < window: exposures.iloc[i][factor] = 0; continue |
| y = asset_returns.iloc[i-window:i].values |
| X = factor_returns[factor].iloc[i-window:i].values.reshape(-1, 1) |
| try: |
| model = Ridge(alpha=1.0).fit(X, y) |
| exposures.iloc[i][factor] = model.coef_[0] |
| except: |
| exposures.iloc[i][factor] = 0 |
| |
| self.exposures = exposures |
| return exposures |
| |
| def attribution(self, portfolio_returns: pd.Series, factor_returns: pd.DataFrame, exposures: pd.DataFrame) -> Dict: |
| factor_contrib = exposures.multiply(factor_returns.reindex(exposures.index).fillna(0)).sum(axis=1) |
| residual = portfolio_returns - factor_contrib |
| return { |
| 'total_return': portfolio_returns.sum(), |
| 'factor_return': factor_contrib.sum(), |
| 'residual_return': residual.sum(), |
| 'r_squared': 1 - residual.var() / portfolio_returns.var() if portfolio_returns.var() > 0 else 0 |
| } |
|
|