Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from config import Color, logger | |
| def compute_bl_posterior(prior_rets, view_sets, cov_mat, tau=0.05, silent=False, return_cov=False): | |
| """ | |
| Computes the Universal Black-Litterman posterior expected returns. | |
| Accepts a dynamic structural prior (e.g., CAPM, Fama-French) and an arbitrary | |
| number of absolute view sets. | |
| Uses precision-weighted blending to combine multiple independent forecasts: | |
| Ξ©_combined^(-1) = Ξ£ Ξ©_i^(-1) | |
| Q_combined = [Ξ£ Ξ©_i^(-1)]^(-1) * Ξ£ [ Ξ©_i^(-1) * Q_i ] | |
| Args: | |
| prior_rets (pd.Series): The Prior (Ξ ) - Expected returns from a structural model. | |
| view_sets (list): A list of tuples (views_series, uncertainties) representing (Q_i, Ξ©_i). | |
| `uncertainties` can be a 1D Series (diagonal variance) OR | |
| a full 2D DataFrame (covariance of forecast errors). | |
| cov_mat (pd.DataFrame): The Covariance Matrix (Ξ£). | |
| tau (float): Confidence in the prior vs the views (default 0.05). | |
| silent (bool): If True, suppresses terminal output. | |
| Returns: | |
| pd.Series: The Posterior Expected Returns (ΞΌ_BL) aligned with the input tickers. | |
| """ | |
| tickers = cov_mat.columns.tolist() | |
| n = len(tickers) | |
| # 1. Base Setup | |
| pi_vector = prior_rets.reindex(tickers).fillna(0.0).values | |
| sigma_matrix = cov_mat.values | |
| # Note: Compute a minimum uncertainty floor = Ο Γ diag(Ξ£). | |
| # This prevents ML views from having infinite precision (near-zero Ξ©), | |
| # which would completely override the CAPM equilibrium prior. | |
| min_uncertainty = tau * np.maximum(np.diag(sigma_matrix), 1e-8) | |
| try: | |
| tau_sigma_inv = np.linalg.inv(tau * sigma_matrix) | |
| except np.linalg.LinAlgError: | |
| tau_sigma_inv = np.linalg.pinv(tau * sigma_matrix, rcond=1e-8) | |
| # 2. Precision-Weighted Blending of Multiple View Sets | |
| omega_inv_sum = np.zeros((n, n)) | |
| omega_inv_q_sum = np.zeros(n) | |
| valid_views_count = 0 | |
| for view_idx, (q_series, omega_data) in enumerate(view_sets): | |
| if q_series is None or omega_data is None: | |
| continue | |
| q_vector = q_series.reindex(tickers).fillna(0.0).values | |
| # Note: Handle both 1D independent variances and 2D correlated error matrices | |
| if isinstance(omega_data, pd.Series): | |
| # Treat as diagonal (independent forecast errors) | |
| omega_diag = omega_data.reindex(tickers).fillna(1.0).values | |
| # Note: Floor each element against Ο Γ ΟΒ²_i to prevent | |
| # view certainty from exceeding prior certainty | |
| omega_diag = np.maximum(omega_diag, min_uncertainty) | |
| omega_inv = np.diag(1.0 / omega_diag) | |
| elif isinstance(omega_data, pd.DataFrame): | |
| # Treat as full covariance structure of errors | |
| omega_matrix = omega_data.reindex(index=tickers, columns=tickers).fillna(0.0).values | |
| # Force diagonal floor for safety (same Ο*ΟΒ² floor) | |
| np.fill_diagonal(omega_matrix, np.maximum(np.diag(omega_matrix), min_uncertainty)) | |
| try: | |
| omega_inv = np.linalg.inv(omega_matrix) | |
| except np.linalg.LinAlgError: | |
| omega_inv = np.linalg.pinv(omega_matrix, rcond=1e-8) | |
| else: | |
| if not silent: | |
| logger.warning(f"Unrecognized omega format for view {view_idx}. Skipping.") | |
| continue | |
| # Accumulate precisions | |
| omega_inv_sum += omega_inv | |
| omega_inv_q_sum += np.dot(omega_inv, q_vector) | |
| valid_views_count += 1 | |
| # 3. Compute the Posterior | |
| if valid_views_count == 0: | |
| if not silent: | |
| print(f" {Color.DIM}βΉ Black-Litterman Bridge: No valid views provided. Reverting to structural prior.{Color.RESET}") | |
| return pd.Series(pi_vector, index=tickers) | |
| # Left Term: [(ΟΞ£)^(-1) + Ξ©_combined^(-1)]^(-1) | |
| try: | |
| left_term = np.linalg.inv(tau_sigma_inv + omega_inv_sum) | |
| except np.linalg.LinAlgError: | |
| left_term = np.linalg.pinv(tau_sigma_inv + omega_inv_sum, rcond=1e-8) | |
| # Right Term: [(ΟΞ£)^(-1)Ξ + Ξ©_combined^(-1)Q_combined] | |
| right_term = np.dot(tau_sigma_inv, pi_vector) + omega_inv_q_sum | |
| # Final Posterior | |
| mu_bl = np.dot(left_term, right_term) | |
| if return_cov: | |
| sigma_bl = pd.DataFrame(sigma_matrix + left_term, index=tickers, columns=tickers) | |
| if not silent: | |
| # Calculate how much the blended views moved the prior for diagnostic reporting | |
| avg_shift = np.mean(np.abs(mu_bl - pi_vector)) * 10000 # in basis points | |
| print(f" {Color.DIM}βΉ Black-Litterman Bridge: Integrated {valid_views_count} view set(s) with structural prior (Avg shift: {avg_shift:.1f} bps).{Color.RESET}") | |
| if return_cov: | |
| return pd.Series(mu_bl, index=tickers), sigma_bl | |
| return pd.Series(mu_bl, index=tickers) | |
| def calibrate_tau(prior_rets, view_sets, cov_mat, returns_df, silent=False): | |
| """ | |
| Calibrates Black-Litterman tau by maximizing the log-likelihood of | |
| recent realized returns given the posterior distribution. | |
| """ | |
| import scipy.stats | |
| if len(returns_df) < 63: | |
| return 0.05 | |
| val_rets = returns_df.iloc[-63:].fillna(0.0).values | |
| taus_to_test = np.linspace(0.01, 0.20, 20) | |
| best_tau = 0.05 | |
| best_ll = -np.inf | |
| for tau_val in taus_to_test: | |
| mu_bl, sigma_bl = compute_bl_posterior(prior_rets, view_sets, cov_mat, tau=tau_val, silent=True, return_cov=True) | |
| try: | |
| cov_daily = cov_mat.values / 252.0 | |
| mu_daily = mu_bl.values / 252.0 | |
| ll = np.sum(scipy.stats.multivariate_normal.logpdf(val_rets, mean=mu_daily, cov=cov_daily, allow_singular=True)) | |
| if ll > best_ll: | |
| best_ll = ll | |
| best_tau = tau_val | |
| except Exception: | |
| continue | |
| if not silent: | |
| print(f" {Color.DIM}βΉ Calibrated Black-Litterman tau via MLE: {best_tau:.3f}{Color.RESET}") | |
| final_tau = max(0.01, min(0.50, best_tau)) | |
| return final_tau | |
| def scale_uncertainty_by_regime(base_uncertainties, regime_severity): | |
| """ | |
| Dynamically scales prediction uncertainties (Ξ©) based on the current market regime. | |
| If the HMM detects a high-volatility crash regime, statistical models (which are largely trained | |
| on low-to-medium volatility data) become less reliable. We increase their variance, | |
| which forces the Black-Litterman formula to lean heavier on the Equilibrium Prior. | |
| Args: | |
| base_uncertainties (pd.Series | pd.DataFrame): The raw prediction variance matrix/vector. | |
| regime_severity (float): A scalar > 1.0 indicating how severe the crash is. | |
| Returns: | |
| pd.Series | pd.DataFrame: The regime-adjusted uncertainties. | |
| """ | |
| if regime_severity <= 1.0: | |
| return base_uncertainties | |
| # Variance scaling (volatility squared) | |
| # Since regime_severity represents a volatility multiplier, variance scales by severity squared. | |
| variance_scalar = float(regime_severity ** 2) | |
| scaled = base_uncertainties * variance_scalar | |
| if isinstance(scaled, (pd.Series, pd.DataFrame)): | |
| return scaled.astype(float) | |
| return scaled | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # QUANTITATIVE SIGNAL VIEWS (PHASE 2) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_cross_asset_momentum_views(tickers, returns_df, lookback_days=126): | |
| """ | |
| Computes cross-sectional momentum signals (e.g. 6-month) and translates | |
| them into absolute Black-Litterman views. | |
| Outperformers get positive views; underperformers get negative views. | |
| """ | |
| if returns_df.empty or len(returns_df) < lookback_days: | |
| return None, None | |
| # Calculate cumulative returns over the lookback period | |
| recent_rets = returns_df.iloc[-lookback_days:] | |
| cum_returns = (1 + recent_rets).prod() - 1 | |
| # Cross-sectional Z-score of momentum | |
| mean_mom = cum_returns.mean() | |
| std_mom = cum_returns.std() | |
| if std_mom == 0: | |
| return None, None | |
| z_scores = (cum_returns - mean_mom) / std_mom | |
| # Translate Z-score to annualized expected return view (e.g. max 5% tilt) | |
| views = z_scores * 0.05 | |
| # High conviction for extreme deciles, low conviction for the middle | |
| uncertainties = 1.0 / (np.abs(z_scores) + 0.1) | |
| # Normalize and scale uncertainties | |
| uncertainties = pd.Series(np.clip(uncertainties.values, 0.01, 0.5), index=tickers) | |
| return views, uncertainties | |
| def fetch_mean_reversion_views(tickers, returns_df, lookback_days=21): | |
| """ | |
| Computes short-term mean-reversion signals (e.g. 1-month) and translates | |
| them into Black-Litterman views. Assets that dropped significantly are | |
| expected to bounce back. | |
| """ | |
| if returns_df.empty or len(returns_df) < lookback_days: | |
| return None, None | |
| recent_rets = returns_df.iloc[-lookback_days:] | |
| cum_returns = (1 + recent_rets).prod() - 1 | |
| mean_rev = cum_returns.mean() | |
| std_rev = cum_returns.std() | |
| if std_rev == 0: | |
| return None, None | |
| z_scores = (cum_returns - mean_rev) / std_rev | |
| # Annualized volatility per asset | |
| ann_vol = recent_rets.std() * np.sqrt(252) | |
| # Inverse relationship: Drops lead to positive bounce views, scaled by asset volatility | |
| views = -z_scores * ann_vol * 0.5 | |
| # Confidence is higher for extreme sell-offs | |
| uncertainties = 1.0 / (np.abs(z_scores) + 0.1) | |
| uncertainties = pd.Series(np.clip(uncertainties.values, 0.02, 0.5), index=tickers) | |
| return views, uncertainties | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # DYNAMIC FX HEDGING OVERLAY (PHASE 3) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def calculate_fx_hedge_overlay(weights, fx_exposure_map, hedge_ratio=1.0): | |
| """ | |
| Calculates FX Forward overlay weights to hedge currency risk for international assets. | |
| Args: | |
| weights: Optimal portfolio weights. | |
| fx_exposure_map: Dict mapping tickers to their underlying currency (e.g., {'VGK': 'EUR', 'EWJ': 'JPY', 'SPY': 'USD'}) | |
| hedge_ratio: Fraction of the exposure to hedge (1.0 = fully hedged). | |
| Returns: | |
| A pd.Series of FX forward overlay weights (e.g., {'SHORT_EUR': 0.15}) | |
| """ | |
| import pandas as pd | |
| fx_overlays = {} | |
| for ticker, weight in weights.items(): | |
| if weight <= 0: | |
| continue | |
| currency = fx_exposure_map.get(ticker, 'USD') | |
| if currency != 'USD': | |
| hedge_key = f"SHORT_{currency}" | |
| fx_overlays[hedge_key] = fx_overlays.get(hedge_key, 0.0) + (weight * hedge_ratio) | |
| return pd.Series(fx_overlays) |