import numpy as np import pandas as pd from config import Color, logger def compute_bl_posterior(prior_rets, view_sets, cov_mat, tau=0.05, silent=False, return_cov=False): """ Computes the Universal Black-Litterman posterior expected returns. Accepts a dynamic structural prior (e.g., CAPM, Fama-French) and an arbitrary number of absolute view sets. Uses precision-weighted blending to combine multiple independent forecasts: Ω_combined^(-1) = Σ Ω_i^(-1) Q_combined = [Σ Ω_i^(-1)]^(-1) * Σ [ Ω_i^(-1) * Q_i ] Args: prior_rets (pd.Series): The Prior (Π) - Expected returns from a structural model. view_sets (list): A list of tuples (views_series, uncertainties) representing (Q_i, Ω_i). `uncertainties` can be a 1D Series (diagonal variance) OR a full 2D DataFrame (covariance of forecast errors). cov_mat (pd.DataFrame): The Covariance Matrix (Σ). tau (float): Confidence in the prior vs the views (default 0.05). silent (bool): If True, suppresses terminal output. Returns: pd.Series: The Posterior Expected Returns (μ_BL) aligned with the input tickers. """ tickers = cov_mat.columns.tolist() n = len(tickers) # 1. Base Setup pi_vector = prior_rets.reindex(tickers).fillna(0.0).values sigma_matrix = cov_mat.values # Note: Compute a minimum uncertainty floor = τ × diag(Σ). # This prevents ML views from having infinite precision (near-zero Ω), # which would completely override the CAPM equilibrium prior. min_uncertainty = tau * np.maximum(np.diag(sigma_matrix), 1e-8) try: tau_sigma_inv = np.linalg.inv(tau * sigma_matrix) except np.linalg.LinAlgError: tau_sigma_inv = np.linalg.pinv(tau * sigma_matrix, rcond=1e-8) # 2. Precision-Weighted Blending of Multiple View Sets omega_inv_sum = np.zeros((n, n)) omega_inv_q_sum = np.zeros(n) valid_views_count = 0 for view_idx, (q_series, omega_data) in enumerate(view_sets): if q_series is None or omega_data is None: continue q_vector = q_series.reindex(tickers).fillna(0.0).values # Note: Handle both 1D independent variances and 2D correlated error matrices if isinstance(omega_data, pd.Series): # Treat as diagonal (independent forecast errors) omega_diag = omega_data.reindex(tickers).fillna(1.0).values # Note: Floor each element against τ × σ²_i to prevent # view certainty from exceeding prior certainty omega_diag = np.maximum(omega_diag, min_uncertainty) omega_inv = np.diag(1.0 / omega_diag) elif isinstance(omega_data, pd.DataFrame): # Treat as full covariance structure of errors omega_matrix = omega_data.reindex(index=tickers, columns=tickers).fillna(0.0).values # Force diagonal floor for safety (same τ*σ² floor) np.fill_diagonal(omega_matrix, np.maximum(np.diag(omega_matrix), min_uncertainty)) try: omega_inv = np.linalg.inv(omega_matrix) except np.linalg.LinAlgError: omega_inv = np.linalg.pinv(omega_matrix, rcond=1e-8) else: if not silent: logger.warning(f"Unrecognized omega format for view {view_idx}. Skipping.") continue # Accumulate precisions omega_inv_sum += omega_inv omega_inv_q_sum += np.dot(omega_inv, q_vector) valid_views_count += 1 # 3. Compute the Posterior if valid_views_count == 0: if not silent: print(f" {Color.DIM}ℹ Black-Litterman Bridge: No valid views provided. Reverting to structural prior.{Color.RESET}") return pd.Series(pi_vector, index=tickers) # Left Term: [(τΣ)^(-1) + Ω_combined^(-1)]^(-1) try: left_term = np.linalg.inv(tau_sigma_inv + omega_inv_sum) except np.linalg.LinAlgError: left_term = np.linalg.pinv(tau_sigma_inv + omega_inv_sum, rcond=1e-8) # Right Term: [(τΣ)^(-1)Π + Ω_combined^(-1)Q_combined] right_term = np.dot(tau_sigma_inv, pi_vector) + omega_inv_q_sum # Final Posterior mu_bl = np.dot(left_term, right_term) if return_cov: sigma_bl = pd.DataFrame(sigma_matrix + left_term, index=tickers, columns=tickers) if not silent: # Calculate how much the blended views moved the prior for diagnostic reporting avg_shift = np.mean(np.abs(mu_bl - pi_vector)) * 10000 # in basis points print(f" {Color.DIM}ℹ Black-Litterman Bridge: Integrated {valid_views_count} view set(s) with structural prior (Avg shift: {avg_shift:.1f} bps).{Color.RESET}") if return_cov: return pd.Series(mu_bl, index=tickers), sigma_bl return pd.Series(mu_bl, index=tickers) def calibrate_tau(prior_rets, view_sets, cov_mat, returns_df, silent=False): """ Calibrates Black-Litterman tau by maximizing the log-likelihood of recent realized returns given the posterior distribution. """ import scipy.stats if len(returns_df) < 63: return 0.05 val_rets = returns_df.iloc[-63:].fillna(0.0).values taus_to_test = np.linspace(0.01, 0.20, 20) best_tau = 0.05 best_ll = -np.inf for tau_val in taus_to_test: mu_bl, sigma_bl = compute_bl_posterior(prior_rets, view_sets, cov_mat, tau=tau_val, silent=True, return_cov=True) try: cov_daily = cov_mat.values / 252.0 mu_daily = mu_bl.values / 252.0 ll = np.sum(scipy.stats.multivariate_normal.logpdf(val_rets, mean=mu_daily, cov=cov_daily, allow_singular=True)) if ll > best_ll: best_ll = ll best_tau = tau_val except Exception: continue if not silent: print(f" {Color.DIM}ℹ Calibrated Black-Litterman tau via MLE: {best_tau:.3f}{Color.RESET}") final_tau = max(0.01, min(0.50, best_tau)) return final_tau def scale_uncertainty_by_regime(base_uncertainties, regime_severity): """ Dynamically scales prediction uncertainties (Ω) based on the current market regime. If the HMM detects a high-volatility crash regime, statistical models (which are largely trained on low-to-medium volatility data) become less reliable. We increase their variance, which forces the Black-Litterman formula to lean heavier on the Equilibrium Prior. Args: base_uncertainties (pd.Series | pd.DataFrame): The raw prediction variance matrix/vector. regime_severity (float): A scalar > 1.0 indicating how severe the crash is. Returns: pd.Series | pd.DataFrame: The regime-adjusted uncertainties. """ if regime_severity <= 1.0: return base_uncertainties # Variance scaling (volatility squared) # Since regime_severity represents a volatility multiplier, variance scales by severity squared. variance_scalar = float(regime_severity ** 2) scaled = base_uncertainties * variance_scalar if isinstance(scaled, (pd.Series, pd.DataFrame)): return scaled.astype(float) return scaled # ───────────────────────────────────────────── # QUANTITATIVE SIGNAL VIEWS (PHASE 2) # ───────────────────────────────────────────── def fetch_cross_asset_momentum_views(tickers, returns_df, lookback_days=126): """ Computes cross-sectional momentum signals (e.g. 6-month) and translates them into absolute Black-Litterman views. Outperformers get positive views; underperformers get negative views. """ if returns_df.empty or len(returns_df) < lookback_days: return None, None # Calculate cumulative returns over the lookback period recent_rets = returns_df.iloc[-lookback_days:] cum_returns = (1 + recent_rets).prod() - 1 # Cross-sectional Z-score of momentum mean_mom = cum_returns.mean() std_mom = cum_returns.std() if std_mom == 0: return None, None z_scores = (cum_returns - mean_mom) / std_mom # Translate Z-score to annualized expected return view (e.g. max 5% tilt) views = z_scores * 0.05 # High conviction for extreme deciles, low conviction for the middle uncertainties = 1.0 / (np.abs(z_scores) + 0.1) # Normalize and scale uncertainties uncertainties = pd.Series(np.clip(uncertainties.values, 0.01, 0.5), index=tickers) return views, uncertainties def fetch_mean_reversion_views(tickers, returns_df, lookback_days=21): """ Computes short-term mean-reversion signals (e.g. 1-month) and translates them into Black-Litterman views. Assets that dropped significantly are expected to bounce back. """ if returns_df.empty or len(returns_df) < lookback_days: return None, None recent_rets = returns_df.iloc[-lookback_days:] cum_returns = (1 + recent_rets).prod() - 1 mean_rev = cum_returns.mean() std_rev = cum_returns.std() if std_rev == 0: return None, None z_scores = (cum_returns - mean_rev) / std_rev # Annualized volatility per asset ann_vol = recent_rets.std() * np.sqrt(252) # Inverse relationship: Drops lead to positive bounce views, scaled by asset volatility views = -z_scores * ann_vol * 0.5 # Confidence is higher for extreme sell-offs uncertainties = 1.0 / (np.abs(z_scores) + 0.1) uncertainties = pd.Series(np.clip(uncertainties.values, 0.02, 0.5), index=tickers) return views, uncertainties # ───────────────────────────────────────────── # DYNAMIC FX HEDGING OVERLAY (PHASE 3) # ───────────────────────────────────────────── def calculate_fx_hedge_overlay(weights, fx_exposure_map, hedge_ratio=1.0): """ Calculates FX Forward overlay weights to hedge currency risk for international assets. Args: weights: Optimal portfolio weights. fx_exposure_map: Dict mapping tickers to their underlying currency (e.g., {'VGK': 'EUR', 'EWJ': 'JPY', 'SPY': 'USD'}) hedge_ratio: Fraction of the exposure to hedge (1.0 = fully hedged). Returns: A pd.Series of FX forward overlay weights (e.g., {'SHORT_EUR': 0.15}) """ import pandas as pd fx_overlays = {} for ticker, weight in weights.items(): if weight <= 0: continue currency = fx_exposure_map.get(ticker, 'USD') if currency != 'USD': hedge_key = f"SHORT_{currency}" fx_overlays[hedge_key] = fx_overlays.get(hedge_key, 0.0) + (weight * hedge_ratio) return pd.Series(fx_overlays)