File size: 5,062 Bytes
be296ce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | """Bayesian Probabilistic Forecasting Layer."""
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
class BayesianForecaster:
"""Probabilistic forecasting with Bayesian methods."""
def __init__(self, prior_mean: float = 0.0, prior_std: float = 0.2):
self.prior_mean = prior_mean
self.prior_std = prior_std
self.posterior_mean = prior_mean
self.posterior_std = prior_std
self.update_count = 0
def update(self, new_returns: np.ndarray):
"""Bayesian update with new observations."""
n = len(new_returns)
sample_mean = np.mean(new_returns)
sample_var = np.var(new_returns) if n > 1 else self.posterior_std ** 2
# Conjugate update (Normal-Normal for mean)
prior_precision = 1.0 / (self.posterior_std ** 2)
likelihood_precision = n / sample_var
posterior_precision = prior_precision + likelihood_precision
self.posterior_std = 1.0 / np.sqrt(posterior_precision)
self.posterior_mean = (
(prior_precision * self.posterior_mean + likelihood_precision * sample_mean) /
posterior_precision
)
self.update_count += 1
def forecast(self, horizon: int = 1) -> Dict:
"""Generate probabilistic forecast."""
forecast_mean = self.posterior_mean * horizon
forecast_std = self.posterior_std * np.sqrt(horizon)
alpha = 0.05
z = stats.norm.ppf(1 - alpha / 2)
return {
'mean': forecast_mean,
'std': forecast_std,
'ci_lower': forecast_mean - z * forecast_std,
'ci_upper': forecast_mean + z * forecast_std,
'prob_positive': 1 - stats.norm.cdf(0, forecast_mean, forecast_std),
'prob_negative': stats.norm.cdf(0, forecast_mean, forecast_std),
'posterior_confidence': 1.0 / self.posterior_std
}
def ensemble_forecast(self,
predictions: Dict[str, float],
uncertainties: Dict[str, float]) -> Dict:
"""Combine multiple predictions with Bayesian weighting."""
weights = {}
total_precision = 0
for model, pred in predictions.items():
uncertainty = uncertainties.get(model, 0.1)
precision = 1.0 / (uncertainty ** 2)
weights[model] = precision
total_precision += precision
weights = {k: v / total_precision for k, v in weights.items()}
ensemble_mean = sum(w * predictions[m] for m, w in weights.items())
ensemble_std = 1.0 / np.sqrt(total_precision)
return {
'ensemble_mean': ensemble_mean,
'ensemble_std': ensemble_std,
'weights': weights,
'ci_lower': ensemble_mean - 1.96 * ensemble_std,
'ci_upper': ensemble_mean + 1.96 * ensemble_std
}
class BayesianOptimizer:
"""Bayesian portfolio optimization with uncertainty."""
def __init__(self, risk_aversion: float = 2.0):
self.risk_aversion = risk_aversion
def optimize_with_uncertainty(self,
mu: np.ndarray,
Sigma: np.ndarray,
mu_uncertainty: np.ndarray) -> Dict:
"""
Robust optimization accounting for parameter uncertainty.
Uses Bayesian shrinkage: shrink predictions toward prior (zero alpha)
proportional to their uncertainty.
"""
# Shrinkage factor
precision = 1.0 / (mu_uncertainty ** 2 + 1e-8)
prior_precision = 1.0 / (np.mean(mu_uncertainty) ** 2)
shrinkage = prior_precision / (precision + prior_precision)
# Shrunk estimates
mu_shrunk = (1 - shrinkage) * mu + shrinkage * 0.0 # Prior is zero alpha
# Add uncertainty to diagonal of covariance
Sigma_robust = Sigma + np.diag(mu_uncertainty ** 2)
# Mean-variance optimization
n = len(mu)
def objective(w):
return -(np.dot(w, mu_shrunk) - self.risk_aversion * np.dot(w, np.dot(Sigma_robust, w)))
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]
bounds = [(0, 0.25) for _ in range(n)]
w0 = np.ones(n) / n
from scipy.optimize import minimize
result = minimize(objective, w0, method='SLSQP', bounds=bounds, constraints=constraints)
weights = np.maximum(result.x, 0)
weights /= np.sum(weights)
return {
'weights': weights,
'mu_raw': mu,
'mu_shrunk': mu_shrunk,
'shrinkage_factors': shrinkage,
'expected_return': np.dot(weights, mu_shrunk),
'uncertainty': np.sqrt(np.dot(weights, np.dot(Sigma_robust, weights)))
}
|