"""Factor Decomposition Engine - Break returns into style factors.""" import numpy as np import pandas as pd from typing import Dict, Optional import warnings warnings.filterwarnings('ignore') class FactorDecomposition: """Decompose returns into style factors.""" def __init__(self): self.factor_names = ['momentum', 'value', 'size', 'volatility', 'quality', 'market'] self.factor_returns = None self.exposures = None def compute_factor_returns(self, returns_df: pd.DataFrame) -> pd.DataFrame: factors = pd.DataFrame(index=returns_df.index) factors['market'] = returns_df.mean(axis=1) momentum_rets = []; value_rets = []; size_rets = []; vol_rets = []; quality_rets = [] for date in returns_df.index: day_returns = returns_df.loc[date].dropna() if len(day_returns) < 5: for lst in [momentum_rets, value_rets, size_rets, vol_rets, quality_rets]: lst.append(0) continue past_returns = returns_df.loc[:date].iloc[-21:-1].mean().reindex(day_returns.index).fillna(0) if date in returns_df.index else pd.Series(0, index=day_returns.index) value_score = 1.0 / (1 + day_returns.abs()) size_score = 1.0 / (day_returns.rolling(21).std().reindex(day_returns.index).fillna(0.01) + 0.01) vol_score = -day_returns.rolling(21).std().reindex(day_returns.index).fillna(0.01) quality_score = (day_returns > 0).astype(float) X = pd.DataFrame({'momentum': past_returns, 'value': value_score, 'size': size_score, 'volatility': vol_score, 'quality': quality_score}).fillna(0) X = (X - X.mean()) / (X.std() + 1e-8) y = day_returns.reindex(X.index).fillna(0) try: coefs = np.linalg.lstsq(X.values, y.values, rcond=None)[0] except: coefs = np.zeros(5) momentum_rets.append(coefs[0]); value_rets.append(coefs[1]); size_rets.append(coefs[2]) vol_rets.append(coefs[3]); quality_rets.append(coefs[4]) factors['momentum'] = momentum_rets; factors['value'] = value_rets factors['size'] = size_rets; factors['volatility'] = vol_rets; factors['quality'] = quality_rets self.factor_returns = factors return factors def compute_exposures(self, asset_returns: pd.Series, factor_returns: pd.DataFrame, window: int = 63) -> pd.DataFrame: from sklearn.linear_model import Ridge exposures = pd.DataFrame(index=asset_returns.index, columns=factor_returns.columns) for factor in factor_returns.columns: for i in range(len(asset_returns)): if i < window: exposures.iloc[i][factor] = 0; continue y = asset_returns.iloc[i-window:i].values X = factor_returns[factor].iloc[i-window:i].values.reshape(-1, 1) try: model = Ridge(alpha=1.0).fit(X, y) exposures.iloc[i][factor] = model.coef_[0] except: exposures.iloc[i][factor] = 0 self.exposures = exposures return exposures def attribution(self, portfolio_returns: pd.Series, factor_returns: pd.DataFrame, exposures: pd.DataFrame) -> Dict: factor_contrib = exposures.multiply(factor_returns.reindex(exposures.index).fillna(0)).sum(axis=1) residual = portfolio_returns - factor_contrib return { 'total_return': portfolio_returns.sum(), 'factor_return': factor_contrib.sum(), 'residual_return': residual.sum(), 'r_squared': 1 - residual.var() / portfolio_returns.var() if portfolio_returns.var() > 0 else 0 }