Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from scipy.linalg import cholesky | |
| import copy | |
| from config import Color, logger, DEFAULT_CONFIG | |
| from core_types import PortfolioState, LotManager, CovarianceResult | |
| from models import regime_stress_covariance | |
| from solver import build_and_optimize | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE-LEVEL IMPORTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Note: Moved import to module level so runtime errors in execution.py aren't masked | |
| try: | |
| from execution import estimate_market_impact | |
| _HAS_EXECUTION = True | |
| except ImportError: | |
| _HAS_EXECUTION = False | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # UTILITY & METRIC FUNCTIONS | |
| from utils.metrics import israelsen_sharpe, portfolio_gross_metrics, liquidity_score, annual_returns | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # CORE BACKTESTING ENGINES | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def backtest(returns_df, weights, capital, rfr, spy_rets, spread_map, cfg, state: PortfolioState = None, betas: pd.Series = None): | |
| """ | |
| Standard historical backtest with transaction costs, Almgren-Chriss market impact, | |
| and heuristic state-driven tax-drag modeling (for single-period projections). | |
| """ | |
| trading_days = cfg.get("trading_days_per_year", 252) | |
| adv_proxy = cfg.get("default_adv_proxy", 50000000.0) | |
| w_risky = weights.drop(labels=['CASH'], errors='ignore') | |
| w_arr = w_risky.reindex(returns_df.columns).fillna(0.0).values | |
| cash_w = float(weights.get('CASH', 0.0)) | |
| if isinstance(rfr, pd.Series): | |
| rfr_aligned = rfr.reindex(returns_df.index).ffill().bfill().fillna(0.04) | |
| daily_rfr = (rfr_aligned / trading_days).values | |
| cash_growth = (1 + daily_rfr).cumprod() | |
| else: | |
| daily_rfr = rfr / trading_days | |
| cash_growth = (1 + daily_rfr) ** np.arange(1, len(returns_df) + 1) | |
| # True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation) | |
| asset_paths = (1 + returns_df.fillna(0)).cumprod().values | |
| allocated_capital_path = capital * (asset_paths @ w_arr) | |
| cash_path = capital * cash_w * cash_growth | |
| total_path = allocated_capital_path + cash_path | |
| port_daily_rets = np.diff(total_path, prepend=capital) / np.concatenate(([capital], total_path[:-1])) | |
| port_rets_series = pd.Series(port_daily_rets.copy(), index=returns_df.index) | |
| spy_aligned = spy_rets.reindex(returns_df.index).fillna(0.0) | |
| n = len(w_arr) | |
| # Note: Ensure state weights are identically shaped and aligned before subtracting | |
| if state and state.current_weights is not None and state.current_weights.size > 0: | |
| current_w_arr = pd.Series(state.current_weights, index=state.tickers).reindex(returns_df.columns).fillna(0.0).values | |
| else: | |
| current_w_arr = np.zeros(n) | |
| delta = w_arr - current_w_arr | |
| # 1. Friction Cost (Bid-Ask Spread + Brokerage) | |
| spreads = np.array([spread_map.get(t, 0.0008) for t in returns_df.columns]) if spread_map else np.full(n, 0.0008) | |
| trade_cost = cfg.get("transaction_cost", 0.001) | |
| total_friction_rate = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0) | |
| # 2. Market Impact (Almgren-Chriss Square Root Model) | |
| impact_hit_rate = 0.0 | |
| if _HAS_EXECUTION: | |
| vols = returns_df.std().values | |
| for i, t_val in enumerate(delta): | |
| if abs(t_val) > 1e-4: | |
| trade_dollars = abs(t_val) * capital | |
| asset_vol = vols[i] if i < len(vols) else 0.015 | |
| impact_pct = estimate_market_impact(trade_dollars, adv_proxy, asset_vol) | |
| impact_hit_rate += impact_pct * abs(t_val) | |
| # 3. Precision Tax Drag (Heuristic aggregate since single-period lacks time-series prices) | |
| tax_hit_rate = 0.0 | |
| if cfg.get('tax_enabled', False) and state and current_w_arr.size > 0: | |
| if getattr(state, 'gain_fractions', None) is not None and getattr(state, 'tax_rates', None) is not None: | |
| if len(state.gain_fractions) == len(state.tickers) and len(state.tax_rates) == len(state.tickers): | |
| sells = np.maximum(current_w_arr - w_arr, 0.0) | |
| gain_fracs = pd.Series(state.gain_fractions, index=state.tickers).reindex(returns_df.columns).fillna(0.0).values | |
| tax_rates_aligned = pd.Series(state.tax_rates, index=state.tickers).reindex(returns_df.columns).fillna(0.0).values | |
| tax_hit_rate = np.sum(sells * gain_fracs * tax_rates_aligned) | |
| port_rets_series.iloc[0] -= (total_friction_rate + impact_hit_rate + tax_hit_rate) | |
| equity_curve = capital * (1 + port_rets_series).cumprod() | |
| bench_curve = capital * (1 + spy_aligned).cumprod() | |
| # Prepend the baseline (t=0) capital to ensure charting starts exactly at the baseline | |
| first_date = port_rets_series.index[0] - pd.Timedelta(days=1) | |
| equity_curve.loc[first_date] = capital | |
| bench_curve.loc[first_date] = capital | |
| equity_curve = equity_curve.sort_index() | |
| bench_curve = bench_curve.sort_index() | |
| total_days = len(port_rets_series) | |
| n_yrs = total_days / trading_days if total_days > 0 else 1.0 | |
| total_ret = float(equity_curve.iloc[-1] / capital - 1.0) | |
| ann_ret = (1 + total_ret) ** (1 / max(n_yrs, 0.01)) - 1.0 | |
| ann_vol = port_rets_series.std() * np.sqrt(trading_days) | |
| if isinstance(rfr, pd.Series): | |
| rfr_full = rfr.reindex(equity_curve.index).ffill().bfill().fillna(0.04) | |
| daily_rfr_full = (rfr_full / trading_days).values[1:] # drop t=0 | |
| else: | |
| daily_rfr_full = rfr / trading_days | |
| daily_excess = port_rets_series - daily_rfr_full | |
| ann_excess = daily_excess.mean() * trading_days | |
| sharpe = israelsen_sharpe(ann_excess, ann_vol) | |
| roll_max = equity_curve.cummax() | |
| drawdowns = (equity_curve - roll_max) / roll_max | |
| max_dd = float(drawdowns.min()) if not drawdowns.empty else 0.0 | |
| max_dd_date = drawdowns.idxmin() if not drawdowns.empty else None | |
| optimizer_failures = 0 | |
| total_rebalances = 0 | |
| is_dd = drawdowns < 0 | |
| dd_days = int(is_dd.groupby((~is_dd).cumsum()).sum().max()) if is_dd.any() else 0 | |
| # Note: Use semi-deviation instead of the standard deviation of negative subset | |
| sortino = 0.0 | |
| downside_sq = np.minimum(port_rets_series.values - daily_rfr_full, 0.0) ** 2 | |
| downside_vol = np.sqrt(downside_sq.mean()) * np.sqrt(trading_days) | |
| if downside_vol > 0: | |
| sortino = (ann_ret - (rfr.mean() if isinstance(rfr, pd.Series) else rfr)) / downside_vol | |
| calmar = ann_ret / abs(max_dd) if abs(max_dd) > 0.001 else 0.0 | |
| roll_mean = port_rets_series.rolling(trading_days).mean() * trading_days | |
| roll_std = port_rets_series.rolling(trading_days).std() * np.sqrt(trading_days) | |
| rolling_sharpe = (roll_mean - (rfr.mean() if isinstance(rfr, pd.Series) else rfr)) / roll_std | |
| stats = { | |
| "total_ret": total_ret, | |
| "ann_ret": ann_ret, | |
| "ann_vol": ann_vol, | |
| "sharpe": sharpe, | |
| "sortino": sortino, | |
| "calmar": calmar, | |
| "max_dd": max_dd, | |
| "dd_days": dd_days, | |
| "friction_paid": total_friction_rate * capital, | |
| "friction_rate": round(total_friction_rate * 100, 4), | |
| "impact_paid": impact_hit_rate * capital, | |
| "tax_paid": tax_hit_rate * capital, | |
| "max_dd_date": max_dd_date.date() if isinstance(max_dd_date, pd.Timestamp) else max_dd_date, | |
| "is_historical": True, | |
| "optimizer_failures": optimizer_failures, | |
| "optimizer_failure_rate": optimizer_failures / max(1, total_rebalances), | |
| # Note: Compute annual returns purely on daily return series, not on the equity_curve | |
| # which contains a T-0 prepend that distorts first-year geometry. | |
| "ann_rets": annual_returns(port_rets_series), | |
| "rolling_sharpe": rolling_sharpe | |
| } | |
| return equity_curve, bench_curve, port_rets_series, stats | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # SYSTEMIC STRESS & SENSITIVITY TESTING | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def portfolio_stress_test(weights, returns_df, raw_data, betas, durations=None): | |
| """ | |
| Parametric Scenario Generation (Phase 2). | |
| Evaluates portfolio impact across synthetic market and yield curve shocks. | |
| """ | |
| w = weights.drop(labels=['CASH'], errors='ignore') | |
| w_arr = w.reindex(returns_df.columns).fillna(0.0).values | |
| port_beta = float(w @ betas.reindex(w.index).fillna(0.0)) | |
| port_duration = float(w @ durations.reindex(w.index).fillna(0.0)) if durations is not None else 0.0 | |
| scenarios = [ | |
| {"name": "2008 Financial Crisis (Simulated)", "spy_drop": -0.55, "rate_shift": -0.04}, | |
| {"name": "2020 COVID Crash (Simulated)", "spy_drop": -0.33, "rate_shift": -0.015}, | |
| {"name": "Equity Market Shock (Moderate)", "spy_drop": -0.10, "rate_shift": 0.0}, | |
| {"name": "Equity Market Shock (Severe)", "spy_drop": -0.25, "rate_shift": 0.0}, | |
| {"name": "Interest Rate Spike (+100 bps)", "spy_drop": 0.0, "rate_shift": 0.01}, | |
| {"name": "Interest Rate Cut (-100 bps)", "spy_drop": 0.0, "rate_shift": -0.01}, | |
| {"name": "Stagflation (Equities Down, Rates Up)", "spy_drop": -0.15, "rate_shift": 0.015} | |
| ] | |
| results = [] | |
| for sc in scenarios: | |
| # Equity impact via Beta | |
| eq_impact = port_beta * sc["spy_drop"] | |
| # Fixed income impact via Duration: dP/P β -Duration * dY | |
| fi_impact = -port_duration * sc["rate_shift"] | |
| total_impact = eq_impact + fi_impact | |
| trigger_desc = [] | |
| if sc["spy_drop"] != 0: | |
| trigger_desc.append(f"SPY {sc['spy_drop']*100:+.0f}%") | |
| if sc["rate_shift"] != 0: | |
| trigger_desc.append(f"Rates {sc['rate_shift']*10000:+.0f} bps") | |
| results.append({ | |
| "scenario": sc["name"], | |
| "trigger": " & ".join(trigger_desc) if trigger_desc else "No Shock", | |
| "impact": total_impact | |
| }) | |
| return results | |
| def liquidity_adjusted_var(weights, exp_rets, cov_mat, capital, spread_map, cfg=None, adv_proxy=50000000.0, conf_level=0.95, days=21): | |
| """ | |
| Computes Liquidity-Adjusted Value at Risk (LVaR). | |
| Standard VaR is adjusted by the exogenous liquidity cost of liquidation (half-spread + market impact). | |
| """ | |
| import scipy.stats as st | |
| w_risky = weights.drop(labels=['CASH'], errors='ignore') | |
| w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values | |
| ac_gamma = cfg.get("tc_volume_profile", 0.10) if cfg else 0.10 | |
| # Standard Parametric VaR | |
| mu_p = float(w_arr @ exp_rets.reindex(cov_mat.columns).fillna(0.0)) | |
| vol_p = float(np.sqrt(w_arr @ cov_mat.values @ w_arr)) | |
| mu_h = mu_p * (days / 252.0) | |
| vol_h = vol_p * np.sqrt(days / 252.0) | |
| z_score = st.norm.ppf(conf_level) | |
| standard_var_pct = (z_score * vol_h) - mu_h | |
| # Liquidity Adjustment | |
| liquidity_cost_pct = 0.0 | |
| vols = np.sqrt(np.diag(cov_mat.values)) | |
| spreads = np.array([spread_map.get(t, 0.0008) for t in cov_mat.columns]) | |
| for i, t_val in enumerate(w_arr): | |
| if abs(t_val) > 1e-4: | |
| trade_dollars = abs(t_val * capital) | |
| spread_cost = (spreads[i] / 2.0) * abs(t_val) | |
| impact_pct = ac_gamma * vols[i] * np.sqrt(trade_dollars / adv_proxy) | |
| liquidity_cost_pct += spread_cost + (impact_pct * abs(t_val)) | |
| lvar_pct = standard_var_pct + liquidity_cost_pct | |
| return lvar_pct * capital | |
| def portfolio_sensitivity(weights, returns_df, benchmark_rets, exp_rets, cov_mat, risk_factor, risk_input, cfg, betas, spread_map, yield_df=None): | |
| """ | |
| Measures allocation stability by introducing noise into expected returns. | |
| Passes the true historical dataframe and shifts the specific ticker's mean to allow CAPM | |
| to calculate real covariance beta profiles against the shock. | |
| """ | |
| report = {} | |
| tickers = list(exp_rets.index) | |
| original_w = weights.drop(labels=['CASH'], errors='ignore') | |
| empty_state = PortfolioState.empty(tickers) | |
| trading_days = cfg.get("trading_days_per_year", 252) | |
| for t in tickers: | |
| w_orig = float(original_w.get(t, 0.0)) | |
| if abs(w_orig) < 0.01: | |
| continue | |
| w_min, w_max = w_orig, w_orig | |
| for shock in [-0.10, 0.10]: | |
| # Directly shock the annualized expected returns | |
| shocked_exp_rets = exp_rets.copy() | |
| shocked_exp_rets[t] += shock | |
| try: | |
| temp_cfg = copy.deepcopy(cfg) | |
| temp_cfg.garch_enabled = False | |
| temp_cfg.cvar_enabled = False | |
| opt_res = build_and_optimize( | |
| returns_df=returns_df, | |
| benchmark_rets=benchmark_rets, | |
| risk_input=risk_input, | |
| risk_factor=risk_factor, | |
| state=empty_state, | |
| cfg=temp_cfg, | |
| model=1, | |
| allocation_engine=1, | |
| ff_df=None, | |
| spread_map=spread_map, | |
| silent=True, | |
| yield_df=yield_df, | |
| override_exp_rets=shocked_exp_rets | |
| ) | |
| nw = float(opt_res.weights.get(t, 0.0)) | |
| w_min = min(w_min, nw) | |
| w_max = max(w_max, nw) | |
| except Exception as e: | |
| logger.error(f"Sensitivity optimization failed for {t}: {e}", exc_info=True) | |
| raise RuntimeError(f"Sensitivity optimization failed for {t}: {e}") from e | |
| report[t] = { | |
| "optimal": w_orig, | |
| "min": w_min, | |
| "max": w_max, | |
| "spread": w_max - w_min | |
| } | |
| jacobian = None | |
| try: | |
| import torch | |
| from differentiable_optimizer import DifferentiablePortfolioLayer | |
| n = len(tickers) | |
| # Note: the true bounds constraint uses allow_short=cfg.get("allow_short", False) | |
| layer = DifferentiablePortfolioLayer(n_assets=n, risk_factor=risk_factor, allow_short=cfg.get("allow_short", False)) | |
| Sigma = cov_mat.reindex(index=tickers, columns=tickers).fillna(0.0).values | |
| # Ridge for Cholesky stability | |
| L_val = np.linalg.cholesky(Sigma + np.eye(n)*1e-6) | |
| mu_tensor = torch.tensor(exp_rets.reindex(tickers).fillna(0.0).values, dtype=torch.float32, requires_grad=True) | |
| L_tensor = torch.tensor(L_val, dtype=torch.float32) | |
| def _f(mu_t): | |
| # forward expects (batch, n), returns (batch, n) | |
| w_out = layer(mu_t.unsqueeze(0), L_tensor.unsqueeze(0)) | |
| return w_out.squeeze(0) | |
| J = torch.autograd.functional.jacobian(_f, mu_tensor) | |
| jacobian = J.detach().numpy() | |
| except Exception as e: | |
| logger.warning(f"Could not compute gradient-based sensitivity jacobian: {e}") | |
| return { | |
| "report": report, | |
| "jacobian": jacobian, | |
| "tickers": tickers | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONTEXT & DIAGNOSTIC HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_macro(prices, raw, rfr, display_df, w_arr, vix_raw, cfg): | |
| """Constructs a dictionary of market indicators (VIX, yield curve, Benchmark trend).""" | |
| import pandas as pd | |
| rfr_scalar = rfr.iloc[-1] if isinstance(rfr, pd.Series) else rfr | |
| macro = {"vix_val": 0.0, "vix_high": False, "tnx_val": rfr_scalar * 100, "curve_inverted": False, "spy_trend": "UNKNOWN"} | |
| benchmarks = cfg.get("benchmarks", {}) | |
| vol_ticker = benchmarks.get("volatility", "^VIX") | |
| eq_ticker = benchmarks.get("equity", "SPY") | |
| rfr_ticker = benchmarks.get("risk_free", "^TNX") | |
| short_rate_ticker = benchmarks.get("short_term_rate", "^IRX") | |
| if vix_raw is not None and not vix_raw.empty: | |
| val = float(vix_raw.iloc[-1]) | |
| macro["vix_val"] = val | |
| macro["vix_high"] = val > 20.0 | |
| if eq_ticker in raw: | |
| spy_px = raw[eq_ticker] | |
| if len(spy_px) > 200: | |
| sma200 = spy_px.iloc[-200:].mean() | |
| sma50 = spy_px.iloc[-50:].mean() | |
| if sma50 > sma200 and spy_px.iloc[-1] > sma200: | |
| macro["spy_trend"] = "BULL" | |
| elif sma50 < sma200 and spy_px.iloc[-1] < sma200: | |
| macro["spy_trend"] = "BEAR" | |
| else: | |
| macro["spy_trend"] = "CHOP" | |
| if rfr_ticker in prices and short_rate_ticker in prices: | |
| macro["curve_inverted"] = prices[rfr_ticker] < prices[short_rate_ticker] | |
| return macro | |
| def behavioral_diagnostics(weights, display_df, cov_mat, risk_input, max_dd): | |
| """Flags potential conflicts between portfolio behavior and user risk settings.""" | |
| diags = [] | |
| w_risky = weights.drop(labels=['CASH'], errors='ignore') | |
| w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values | |
| vol = float(np.sqrt(w_arr @ cov_mat.values @ w_arr)) | |
| if max_dd < -0.20 and risk_input >= 7: | |
| diags.append(f"Portfolio suffered a {max_dd:.0%} historical drawdown despite a Conservative (Level {risk_input}) setting.") | |
| if vol > 0.25 and risk_input >= 6: | |
| diags.append(f"High annualized volatility ({vol:.1%}) conflicts with Preservation objectives.") | |
| if weights.get("CASH", 0.0) > 0.40 and risk_input <= 4: | |
| diags.append("Large cash drag (>40%) is severely hampering your Aggressive growth objectives.") | |
| return diags |