File size: 3,902 Bytes
d4fa2ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""Factor Decomposition Engine - Break returns into style factors."""
import numpy as np
import pandas as pd
from typing import Dict, Optional
import warnings
warnings.filterwarnings('ignore')


class FactorDecomposition:
    """Decompose returns into style factors."""
    
    def __init__(self):
        self.factor_names = ['momentum', 'value', 'size', 'volatility', 'quality', 'market']
        self.factor_returns = None
        self.exposures = None
        
    def compute_factor_returns(self, returns_df: pd.DataFrame) -> pd.DataFrame:
        factors = pd.DataFrame(index=returns_df.index)
        factors['market'] = returns_df.mean(axis=1)
        
        momentum_rets = []; value_rets = []; size_rets = []; vol_rets = []; quality_rets = []
        
        for date in returns_df.index:
            day_returns = returns_df.loc[date].dropna()
            if len(day_returns) < 5:
                for lst in [momentum_rets, value_rets, size_rets, vol_rets, quality_rets]:
                    lst.append(0)
                continue
            
            past_returns = returns_df.loc[:date].iloc[-21:-1].mean().reindex(day_returns.index).fillna(0) if date in returns_df.index else pd.Series(0, index=day_returns.index)
            value_score = 1.0 / (1 + day_returns.abs())
            size_score = 1.0 / (day_returns.rolling(21).std().reindex(day_returns.index).fillna(0.01) + 0.01)
            vol_score = -day_returns.rolling(21).std().reindex(day_returns.index).fillna(0.01)
            quality_score = (day_returns > 0).astype(float)
            
            X = pd.DataFrame({'momentum': past_returns, 'value': value_score, 'size': size_score,
                              'volatility': vol_score, 'quality': quality_score}).fillna(0)
            X = (X - X.mean()) / (X.std() + 1e-8)
            y = day_returns.reindex(X.index).fillna(0)
            
            try:
                coefs = np.linalg.lstsq(X.values, y.values, rcond=None)[0]
            except:
                coefs = np.zeros(5)
            
            momentum_rets.append(coefs[0]); value_rets.append(coefs[1]); size_rets.append(coefs[2])
            vol_rets.append(coefs[3]); quality_rets.append(coefs[4])
        
        factors['momentum'] = momentum_rets; factors['value'] = value_rets
        factors['size'] = size_rets; factors['volatility'] = vol_rets; factors['quality'] = quality_rets
        
        self.factor_returns = factors
        return factors
    
    def compute_exposures(self, asset_returns: pd.Series, factor_returns: pd.DataFrame, window: int = 63) -> pd.DataFrame:
        from sklearn.linear_model import Ridge
        exposures = pd.DataFrame(index=asset_returns.index, columns=factor_returns.columns)
        
        for factor in factor_returns.columns:
            for i in range(len(asset_returns)):
                if i < window: exposures.iloc[i][factor] = 0; continue
                y = asset_returns.iloc[i-window:i].values
                X = factor_returns[factor].iloc[i-window:i].values.reshape(-1, 1)
                try:
                    model = Ridge(alpha=1.0).fit(X, y)
                    exposures.iloc[i][factor] = model.coef_[0]
                except:
                    exposures.iloc[i][factor] = 0
        
        self.exposures = exposures
        return exposures
    
    def attribution(self, portfolio_returns: pd.Series, factor_returns: pd.DataFrame, exposures: pd.DataFrame) -> Dict:
        factor_contrib = exposures.multiply(factor_returns.reindex(exposures.index).fillna(0)).sum(axis=1)
        residual = portfolio_returns - factor_contrib
        return {
            'total_return': portfolio_returns.sum(),
            'factor_return': factor_contrib.sum(),
            'residual_return': residual.sum(),
            'r_squared': 1 - residual.var() / portfolio_returns.var() if portfolio_returns.var() > 0 else 0
        }