Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| import cvxpy as cp | |
| import warnings | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from dataclasses import dataclass | |
| from config import Color, logger | |
| from core_types import PortfolioState, ForecastResult | |
| class TaggedConstraint: | |
| name: str | |
| constraint: cp.Constraint | |
| display: bool = True | |
| class CVXPYResult: | |
| weights: Optional[pd.Series] | |
| display_constraints: Dict[str, float] | |
| binding_constraints: Dict[str, float] | |
| relaxation_log: List[str] | |
| error: Optional[str] = None | |
| class CVXPYOptimizationEngine: | |
| def __init__(self, | |
| tickers: List[str], | |
| returns_df: pd.DataFrame, | |
| forecast: ForecastResult, | |
| state: PortfolioState, | |
| cfg: dict, | |
| macro: Optional[dict], | |
| spread_map: Optional[dict], | |
| risk_input: int, | |
| risk_factor: float, | |
| capital: float, | |
| adv_proxy: float, | |
| safe_min: float, | |
| asset_max: float, | |
| sector_limit: float, | |
| allow_shorts: bool, | |
| durations: np.ndarray, | |
| b_min: float, | |
| b_max: float, | |
| has_basis: bool, | |
| max_turnover: float, | |
| stability_spreads: np.ndarray, | |
| stab_lambda: float, | |
| silent: bool = False, | |
| opt_params: Any = None): | |
| self.tickers = tickers | |
| self.n = len(tickers) | |
| self.returns_df = returns_df | |
| self.forecast = forecast | |
| self.state = state | |
| self.cfg = cfg | |
| self.macro = macro | |
| self.spread_map = spread_map | |
| self.risk_input = risk_input | |
| self.risk_factor = risk_factor | |
| self.capital = capital | |
| self.adv_proxy = adv_proxy | |
| self.safe_min = safe_min | |
| self.asset_max = asset_max | |
| self.sector_limit = sector_limit | |
| self.allow_shorts = allow_shorts | |
| self.durations = durations | |
| self.b_min = b_min | |
| self.b_max = b_max | |
| self.has_basis = has_basis | |
| self.max_turnover = max_turnover | |
| self.stability_spreads = stability_spreads | |
| self.stab_lambda = stab_lambda | |
| self.silent = silent | |
| self.opt_params = opt_params | |
| self.max_assets = self.cfg.get("max_assets", 0) | |
| self.dropped_indices = [] | |
| self.solver_feas_tol = 5e-4 | |
| # Prepare core matrices | |
| self.Sigma = self._prepare_covariance() | |
| self.exp_rets_arr = np.nan_to_num(self.forecast.expected_returns.values, nan=0.0, posinf=0.0, neginf=0.0) | |
| self.ret_matrix = self._prepare_return_matrix() | |
| self.vol_arr = np.nan_to_num(self.forecast.covariance_result.volatility.values, nan=0.0, posinf=0.0, neginf=0.0) | |
| self.beta_vals = np.nan_to_num(self.forecast.betas.values if self.forecast.betas is not None else np.ones(self.n), nan=1.0) | |
| # Base variables | |
| self.w = cp.Variable(self.n) | |
| self.dw = cp.Variable(self.n) | |
| self.short_w = cp.Variable(self.n) | |
| self.sell_w = cp.Variable(self.n) | |
| self.t_impact = None | |
| if isinstance(self.state.current_weights, pd.Series): | |
| self.prev_w = self.state.current_weights.drop(labels=['CASH'], errors='ignore').reindex(self.tickers).fillna(0.0).values | |
| elif self.state.current_weights is not None and len(self.state.current_weights) > 0: | |
| self.prev_w = np.asarray(self.state.current_weights)[:self.n] | |
| else: | |
| self.prev_w = np.zeros(self.n) | |
| def _prepare_covariance(self) -> np.ndarray: | |
| Sigma_raw = self.forecast.covariance_result.covariance.values | |
| Sigma_raw = np.nan_to_num(Sigma_raw, nan=0.0, posinf=0.0, neginf=0.0) | |
| Sigma = (Sigma_raw + Sigma_raw.T) / 2.0 | |
| Sigma = Sigma + np.eye(self.n) * 1e-8 | |
| if getattr(self.opt_params, 'use_fast_ewm_cov', False): | |
| return Sigma | |
| eigvals_psd, eigvecs_psd = np.linalg.eigh(Sigma) | |
| Sigma = (eigvecs_psd * np.maximum(eigvals_psd, 1e-8)) @ eigvecs_psd.T | |
| Sigma = (Sigma + Sigma.T) / 2.0 | |
| return Sigma | |
| def _prepare_return_matrix(self) -> np.ndarray: | |
| ret_matrix = np.nan_to_num(self.returns_df.values, nan=0.0, posinf=0.0, neginf=0.0) | |
| if hasattr(self.forecast, 'garch_info') and self.forecast.garch_info: | |
| for i, ticker in enumerate(self.tickers): | |
| multiplier = np.sqrt(self.forecast.garch_info.get(ticker, {}).get('scale', 1.0)) | |
| ret_matrix[:, i] *= multiplier | |
| return ret_matrix | |
| def _apply_warm_start(self): | |
| e2e_ws = self.cfg.get('_e2e_warm_start') | |
| if e2e_ws is not None: | |
| try: | |
| ws_arr = np.asarray(e2e_ws, dtype=float).flatten()[:self.n] | |
| if ws_arr.shape == (self.n,) and np.all(np.isfinite(ws_arr)): | |
| self.w.value = ws_arr | |
| if not self.silent: | |
| logger.info("E2E warm-start applied to CVXPY solver.") | |
| except Exception as e: | |
| logger.warning(f"E2E warm-start failed: {e}") | |
| def _build_risk_measure(self): | |
| risk_measure = self.cfg.get('_risk_measure', 'Mean-Variance') | |
| T_obs_daily = len(self.returns_df) | |
| baseline_rf = self.cfg.get('baseline_risk_factor', 3.0) | |
| risk_tilt = baseline_rf / max(self.risk_factor, 0.5) | |
| cvar_alpha = self.cfg.get('cvar_alpha', 0.95) | |
| cvar_lambda = self.cfg.get('cvar_lambda', 0.5) * (0.2 + 0.8 * risk_tilt) | |
| regime_severity = self.macro.get("hmm_regime", {}).get("severity_score", 1.0) if self.macro else 1.0 | |
| if regime_severity > 1.5: | |
| cvar_alpha = 0.99 | |
| cvar_lambda *= regime_severity | |
| if cvar_alpha >= 0.999: | |
| cvar_alpha = 0.99 | |
| risk_penalty = 0.0 | |
| risk_measure_constraints = [] | |
| VOL_TARGETS = { | |
| 1: 0.03, 2: 0.05, 3: 0.07, 4: 0.09, 5: 0.12, | |
| 6: 0.15, 7: 0.18, 8: 0.21, 9: 0.24, 10: 0.28 | |
| } | |
| if risk_measure == 'Mean-Variance': | |
| target_vol = VOL_TARGETS.get(int(round(self.risk_factor)), 0.15) | |
| vol_slack = cp.Variable(nonneg=True) | |
| risk_penalty = 100.0 * vol_slack | |
| portfolio_variance = cp.quad_form(self.w, cp.psd_wrap(self.Sigma)) | |
| risk_measure_constraints = [portfolio_variance <= (target_vol**2) + vol_slack] | |
| elif risk_measure == 'CVaR': | |
| VaR = cp.Variable(1) | |
| u_var = cp.Variable(T_obs_daily) | |
| risk_penalty = cvar_lambda * (VaR[0] + (1.0 / (T_obs_daily * (1.0 - cvar_alpha))) * cp.sum(u_var)) | |
| risk_measure_constraints = [u_var >= -(self.ret_matrix @ self.w) - VaR[0], u_var >= 0] | |
| elif risk_measure == 'CDaR': | |
| C_rets = self.ret_matrix @ self.w | |
| C = cp.cumsum(C_rets) | |
| M = cp.Variable(T_obs_daily) | |
| cdar_cons = [M >= C, M[0] >= 0] | |
| for i in range(1, T_obs_daily): | |
| cdar_cons.append(M[i] >= M[i-1]) | |
| D = M - C | |
| VaR_CDaR = cp.Variable(1) | |
| u_cdar = cp.Variable(T_obs_daily) | |
| risk_penalty = cvar_lambda * (VaR_CDaR[0] + (1.0 / (T_obs_daily * (1.0 - cvar_alpha))) * cp.sum(u_cdar)) | |
| risk_measure_constraints = cdar_cons + [u_cdar >= D - VaR_CDaR[0], u_cdar >= 0] | |
| elif risk_measure == 'Max Loss': | |
| max_loss_var = cp.Variable(1) | |
| risk_penalty = self.risk_factor * max_loss_var[0] | |
| risk_measure_constraints = [max_loss_var >= -(self.ret_matrix @ self.w)] | |
| elif risk_measure == 'MAD': | |
| mu_port = self.w.T @ self.exp_rets_arr | |
| abs_dev = cp.Variable(T_obs_daily) | |
| risk_penalty = self.risk_factor * cp.sum(abs_dev) / T_obs_daily | |
| risk_measure_constraints = [abs_dev >= (self.ret_matrix @ self.w) - mu_port, abs_dev >= mu_port - (self.ret_matrix @ self.w)] | |
| elif risk_measure == 'Semi-Variance': | |
| downside_dev = cp.Variable(T_obs_daily) | |
| risk_penalty = (self.risk_factor / 2) * cp.sum_squares(downside_dev) / T_obs_daily | |
| risk_measure_constraints = [downside_dev >= -(self.ret_matrix @ self.w), downside_dev >= 0] | |
| else: | |
| portfolio_variance = cp.quad_form(self.w, cp.psd_wrap(self.Sigma)) | |
| risk_penalty = (self.risk_factor / 2) * portfolio_variance | |
| # Conditional CVaR Constraint Formulation | |
| use_cvar = self.cfg.get('cvar_enabled', True) | |
| cvar_cost = 0.0 | |
| cvar_constraints = [] | |
| if use_cvar and risk_measure == 'Mean-Variance': | |
| cvar_ret_matrix = self.ret_matrix | |
| if regime_severity > 1.5 and self.macro: | |
| hmm_details = self.macro.get("hmm_regime", {}).get("details", {}) | |
| state_seq = hmm_details.get("state_sequence") | |
| if state_seq is not None: | |
| crash_mask = np.array(state_seq) == 2 | |
| if len(crash_mask) == self.ret_matrix.shape[0]: | |
| slice_mask = crash_mask | |
| elif len(crash_mask) > self.ret_matrix.shape[0]: | |
| slice_mask = crash_mask[-self.ret_matrix.shape[0]:] | |
| else: | |
| pad = np.zeros(self.ret_matrix.shape[0] - len(crash_mask), dtype=bool) | |
| slice_mask = np.concatenate([pad, crash_mask]) | |
| n_crash = int(np.sum(slice_mask)) | |
| if n_crash >= 21: | |
| cvar_ret_matrix = self.ret_matrix[slice_mask] | |
| if not self.silent: | |
| print(f" {Color.DIM}ℹ CVaR conditioned on {n_crash} crash-regime days (of {self.ret_matrix.shape[0]} total).{Color.RESET}") | |
| T_cvar = cvar_ret_matrix.shape[0] | |
| VaR = cp.Variable(1) | |
| u = cp.Variable(T_cvar) | |
| cvar_cost = cvar_lambda * (VaR[0] + (1.0 / (T_cvar * (1.0 - cvar_alpha))) * cp.sum(u)) | |
| cvar_constraints = [u >= -(cvar_ret_matrix @ self.w) - VaR[0], u >= 0] | |
| return risk_penalty, risk_measure_constraints, cvar_cost, cvar_constraints | |
| def _get_problem(self, use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn, use_factors=True, active_cvar=None): | |
| cons_dict = {} | |
| def add_cons(name, cons, display=True): | |
| cons_dict[name] = TaggedConstraint(name, cons, display) | |
| add_cons("Fully Invested", (cp.sum(self.w) == 1.0), False) | |
| base_gross_cap = self.cfg.get("gross_leverage_cap", 1.0) | |
| adj_gross_cap = min(base_gross_cap * 1.5, base_gross_cap + (abs(cur_min) * 1.5)) if self.allow_shorts else base_gross_cap | |
| add_cons("Gross Leverage Cap", (cp.norm(self.w, 1) <= adj_gross_cap), False) | |
| add_cons(f"Single Asset Floor ({cur_min:.1%})", (self.w >= cur_min), False) | |
| add_cons(f"Single Asset Cap ({cur_max:.1%})", (self.w <= cur_max), False) | |
| if self.t_impact is not None: | |
| impact_cons = [cp.norm(cp.vstack([2 * self.dw[i], self.t_impact[i] - 1]), 2) <= self.t_impact[i] + 1 for i in range(self.n)] | |
| add_cons("Market Impact SOCP", impact_cons, False) | |
| add_cons("Short Position Bounds", (self.short_w >= -self.w), False) | |
| add_cons("DW Nonneg", (self.dw >= 0), False) | |
| add_cons("Short W Nonneg", (self.short_w >= 0), False) | |
| add_cons("Sell W Nonneg", (self.sell_w >= 0), False) | |
| if self.dropped_indices: | |
| add_cons(f"Cardinality Pruning", (self.w[self.dropped_indices] == 0), True) | |
| if np.any(self.prev_w): | |
| add_cons("Trade Size Tracking (Buys)", (self.dw >= self.w - self.prev_w), False) | |
| add_cons("Trade Size Tracking (Sells)", (self.dw >= self.prev_w - self.w), False) | |
| add_cons("Sell Volume Tracking", (self.sell_w >= self.prev_w - self.w), False) | |
| if cur_turn < 10.0: | |
| add_cons(f"Max Turnover ({cur_turn:.0%})", (cp.sum(self.dw) <= cur_turn), False) | |
| else: | |
| add_cons("Trade Size Tracking (Buys)", (self.dw >= self.w), False) | |
| add_cons("Trade Size Tracking (Sells)", (self.dw >= -self.w), False) | |
| add_cons("Sell Volume Tracking", (self.sell_w >= -self.w), False) | |
| if cur_turn < 10.0: | |
| add_cons(f"Max Turnover ({cur_turn:.0%})", (cp.sum(self.dw) <= cur_turn), False) | |
| sector_map = self.cfg.get("sector_map", {}) | |
| for sector in set(sector_map.values()): | |
| idx = [i for i, t in enumerate(self.tickers) if sector_map.get(t) == sector] | |
| if idx: | |
| if self.allow_shorts: | |
| max_gross_allowed = max(0.80, abs(cur_min) * len(idx)) | |
| add_cons(f"Sector Gross Exposure ({sector})", (cp.norm(self.w[idx], 1) <= min(cur_sec * 2.0, max_gross_allowed)), True) | |
| else: | |
| add_cons(f"Sector Concentration Limit ({sector})", (cp.sum(self.w[idx]) <= cur_sec), True) | |
| custom_constraints = self.cfg.get("custom_constraints") or [] | |
| for cc in custom_constraints: | |
| asset = cc.get("asset") | |
| limit = float(cc.get("limit", 0)) | |
| direction = cc.get("direction", "max") | |
| if asset in self.tickers: | |
| idx = self.tickers.index(asset) | |
| if direction == "max": | |
| add_cons(f"Custom Max ({asset} <= {limit:.1%})", (self.w[idx] <= limit), True) | |
| elif direction == "min": | |
| add_cons(f"Custom Min ({asset} >= {limit:.1%})", (self.w[idx] >= limit), True) | |
| elif asset and asset.startswith("Sector:"): | |
| sector = asset.split(":", 1)[1].strip() | |
| idx = [i for i, t in enumerate(self.tickers) if sector_map.get(t) == sector] | |
| if idx: | |
| if direction == "max": | |
| add_cons(f"Custom Sector Max ({sector} <= {limit:.1%})", (cp.sum(self.w[idx]) <= limit), True) | |
| elif direction == "min": | |
| add_cons(f"Custom Sector Min ({sector} >= {limit:.1%})", (cp.sum(self.w[idx]) >= limit), True) | |
| if active_cvar is None: | |
| active_cvar = [] | |
| ff_betas = self.forecast.ff_betas | |
| if use_factors and self.cfg.get("factor_neutrality", False) and ff_betas is not None: | |
| if 'HML' in ff_betas.columns: | |
| hml_loadings = np.nan_to_num(ff_betas['HML'].values) | |
| add_cons("Factor Neutrality (HML)", (cp.abs(self.w.T @ hml_loadings) <= 0.05), True) | |
| if 'SMB' in ff_betas.columns: | |
| smb_loadings = np.nan_to_num(ff_betas['SMB'].values) | |
| add_cons("Factor Neutrality (SMB)", (cp.abs(self.w.T @ smb_loadings) <= 0.05), True) | |
| if use_factors and self.cfg.get("risk_budgeting", False) and np.any(self.prev_w): | |
| port_var_prev = self.prev_w.T @ self.Sigma @ self.prev_w | |
| if port_var_prev > 1e-6: | |
| marginal_risk = (self.Sigma @ self.prev_w) / port_var_prev | |
| add_cons("Risk Budgeting (Max 15%)", (cp.multiply(self.w, marginal_risk) <= 0.15), True) | |
| flat_cons = [] | |
| for tc in cons_dict.values(): | |
| if isinstance(tc.constraint, list): | |
| flat_cons.extend(tc.constraint) | |
| else: | |
| flat_cons.append(tc.constraint) | |
| cons_list = flat_cons + active_cvar + self.risk_measure_constraints | |
| if use_beta: | |
| eff_b_min = max(0.0, self.b_min - 0.3) if widen_beta else self.b_min | |
| eff_b_max = self.b_max + 0.3 if widen_beta else self.b_max | |
| beta_cons_labeled = { | |
| f"Beta Target Min ({eff_b_min:.1f})": (self.w.T @ self.beta_vals >= eff_b_min), | |
| f"Beta Target Max ({eff_b_max:.1f})": (self.w.T @ self.beta_vals <= eff_b_max) | |
| } | |
| for k, v in beta_cons_labeled.items(): | |
| add_cons(k, v, True) | |
| cons_list += list(beta_cons_labeled.values()) | |
| if use_dur and any(self.durations > 0): | |
| min_dur = self.cfg.get("min_duration", 0.0) | |
| max_dur = self.cfg.get("max_duration", 50.0) | |
| port_duration = self.w.T @ self.durations | |
| if min_dur > 0: | |
| add_cons(f"Min Duration ({min_dur}y)", (port_duration >= min_dur), True) | |
| if max_dur < 50.0: | |
| add_cons(f"Max Duration ({max_dur}y)", (port_duration <= max_dur), True) | |
| cons_list += [cons_dict[k].constraint for k in cons_dict if "Duration" in k] | |
| if active_cvar: | |
| dynamic_obj = cp.Maximize(self.base_objective_term - self.cvar_cost) | |
| else: | |
| dynamic_obj = cp.Maximize(self.base_objective_term) | |
| return cp.Problem(dynamic_obj, cons_list), cons_dict, cons_list | |
| def _solution_violations(self, w_val, use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn): | |
| vals = np.asarray(w_val, dtype=float) | |
| violations = [] | |
| if vals.shape != (self.n,) or not np.all(np.isfinite(vals)): | |
| return ["solution contains non-finite or mis-shaped weights"] | |
| total_weight = float(np.sum(vals)) | |
| gross_weight = float(np.sum(np.abs(vals))) | |
| base_gross_cap = self.cfg.get("gross_leverage_cap", 1.0) | |
| adj_gross_cap = min(base_gross_cap * 1.5, base_gross_cap + (abs(cur_min) * 1.5)) if self.allow_shorts else base_gross_cap | |
| if total_weight > 1.0 + self.solver_feas_tol: | |
| violations.append(f"sum(w)={total_weight:.4f} exceeds 100% budget") | |
| if gross_weight > adj_gross_cap + self.solver_feas_tol: | |
| violations.append(f"gross={gross_weight:.4f} exceeds cap {adj_gross_cap:.4f}") | |
| if float(np.min(vals)) < cur_min - self.solver_feas_tol: | |
| violations.append(f"min weight {float(np.min(vals)):.4f} below {cur_min:.4f}") | |
| if float(np.max(vals)) > cur_max + self.solver_feas_tol: | |
| violations.append(f"max weight {float(np.max(vals)):.4f} above {cur_max:.4f}") | |
| if cur_turn < 10.0: | |
| prev = self.prev_w if np.any(self.prev_w) else np.zeros(self.n) | |
| turnover = float(np.sum(np.abs(vals - prev))) | |
| if turnover > cur_turn + self.solver_feas_tol: | |
| violations.append(f"turnover={turnover:.4f} exceeds cap {cur_turn:.4f}") | |
| sector_map = self.cfg.get("sector_map", {}) | |
| for sector in set(sector_map.values()): | |
| idx = [i for i, t in enumerate(self.tickers) if sector_map.get(t) == sector] | |
| if not idx: | |
| continue | |
| if self.allow_shorts: | |
| sector_gross = float(np.sum(np.abs(vals[idx]))) | |
| sector_gross_cap = min(cur_sec * 2.0, 0.80) | |
| if sector_gross > sector_gross_cap + self.solver_feas_tol: | |
| violations.append(f"{sector} sector gross={sector_gross:.4f} exceeds cap {sector_gross_cap:.4f}") | |
| else: | |
| sector_weight = float(np.sum(vals[idx])) | |
| if sector_weight > cur_sec + self.solver_feas_tol: | |
| violations.append(f"{sector} sector={sector_weight:.4f} exceeds cap {cur_sec:.4f}") | |
| if use_beta: | |
| eff_b_min = max(0.0, self.b_min - 0.3) if widen_beta else self.b_min | |
| eff_b_max = self.b_max + 0.3 if widen_beta else self.b_max | |
| beta = float(vals @ self.beta_vals) | |
| if beta < eff_b_min - self.solver_feas_tol or beta > eff_b_max + self.solver_feas_tol: | |
| violations.append(f"beta={beta:.4f} outside target {eff_b_min:.4f}-{eff_b_max:.4f}") | |
| if use_dur and any(self.durations > 0): | |
| port_duration = float(vals @ self.durations) | |
| min_dur = self.cfg.get("min_duration", 0.0) | |
| max_dur = self.cfg.get("max_duration", 50.0) | |
| if min_dur > 0 and port_duration < min_dur - self.solver_feas_tol: | |
| violations.append(f"duration={port_duration:.4f} below {min_dur:.4f}") | |
| if max_dur < 50.0 and port_duration > max_dur + self.solver_feas_tol: | |
| violations.append(f"duration={port_duration:.4f} above {max_dur:.4f}") | |
| return violations | |
| def _solve_prob(self, prob, use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn, use_factors=True, is_deep_relaxation=False): | |
| if np.any(self.prev_w): | |
| self.w.value = self.prev_w | |
| try: | |
| solvers_to_try = [cp.CLARABEL] if is_deep_relaxation else [cp.CLARABEL, cp.HIGHS] | |
| for solver in solvers_to_try: | |
| if solver not in cp.installed_solvers(): | |
| continue | |
| try: | |
| solve_opts = {"warm_start": True} | |
| if solver == cp.SCS: | |
| solve_opts.update({"eps_abs": 1e-5, "eps_rel": 1e-5, "eps_gap": 1e-5}) | |
| if solver == cp.OSQP: | |
| solve_opts.update({"max_iter": 100000}) | |
| if not self.silent: | |
| logger.debug(f"Attempting solver {solver}...") | |
| prob.solve(solver=solver, **solve_opts) | |
| if not self.silent: | |
| logger.debug(f"Solver {solver} finished with status {prob.status}.") | |
| if self.w.value is not None: | |
| logger.debug(f"Solver {solver} returned weights shape {np.shape(self.w.value)}") | |
| if prob.status in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]: | |
| break | |
| except Exception as exc: | |
| if not self.silent: | |
| logger.warning(f"Solver {solver} failed: {exc}") | |
| continue | |
| if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]: | |
| if not self.silent: | |
| logger.warning(f"No viable solution found. Final status {prob.status}.") | |
| return False, f"Solver status: {prob.status}" | |
| if self.w.value is not None: | |
| w_val = np.asarray(self.w.value).flatten() | |
| w_val = np.nan_to_num(w_val, nan=0.0, posinf=0.0, neginf=0.0) | |
| if w_val.shape != (self.n,) or not np.all(np.isfinite(w_val)): | |
| return False, "Non-finite or mis-shaped weights" | |
| self.w.value = w_val | |
| violations = self._solution_violations(self.w.value, use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn) | |
| if not violations: | |
| if prob.status == cp.OPTIMAL_INACCURATE and not self.silent: | |
| logger.warning(f"Solver converged to OPTIMAL_INACCURATE. Constraint violations were within accepted tolerances ({self.solver_feas_tol}).") | |
| return True, "" | |
| old_tol = self.solver_feas_tol | |
| self.solver_feas_tol = 1e-3 | |
| violations_1e3 = self._solution_violations(self.w.value, use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn) | |
| self.solver_feas_tol = old_tol | |
| if not violations_1e3 and violations: | |
| if not self.silent: | |
| logger.warning(f"Solution had violations in the [5e-4, 1e-3] band and was rejected: {violations}") | |
| else: | |
| if not self.silent: | |
| logger.warning(f"Solution rejected due to specific violations: {violations}") | |
| return False, f"Constraint violations: {violations}" | |
| return False, "Weights are None" | |
| except Exception as exc: | |
| logger.error(f"Optimization solver exception: {exc}") | |
| raise | |
| def solve(self) -> CVXPYResult: | |
| self._apply_warm_start() | |
| # Build base objective components | |
| expected_portfolio_return = self.w.T @ self.exp_rets_arr | |
| rfr = self.cfg.get("risk_free_rate", 0.0) | |
| cash_yield = (1.0 - cp.sum(self.w)) * rfr | |
| from execution import calibrate_gamma | |
| ac_gamma = self.cfg.get("tc_volume_profile", None) | |
| if ac_gamma is None: | |
| # Dynamically calculate gamma for each asset | |
| gammas = np.array([calibrate_gamma(self.adv_proxy, v) for v in self.vol_arr]) | |
| impact_coeffs = gammas * self.vol_arr * np.sqrt(self.capital / self.adv_proxy) | |
| else: | |
| impact_coeffs = ac_gamma * self.vol_arr * np.sqrt(self.capital / self.adv_proxy) | |
| market_impact_penalty = 0.0 | |
| if ac_gamma is None or ac_gamma > 0: | |
| self.t_impact = cp.Variable(self.n, nonneg=True) | |
| market_impact_penalty = cp.sum(cp.multiply(impact_coeffs, self.t_impact)) | |
| spreads_arr = np.array([self.spread_map.get(t, 0.0008) for t in self.tickers]) if self.spread_map else np.full(self.n, 0.0008) | |
| tc = self.cfg.get("transaction_cost", 0.001) | |
| friction_arr = spreads_arr + tc | |
| tc_lambda = 1.0 / max(self.risk_factor, 0.1) | |
| tc_cost = tc_lambda * cp.sum(cp.multiply(self.dw, friction_arr)) | |
| borrow_cost = self.cfg.get('short_borrow_cost', 0.0) | |
| borrow_drag = borrow_cost * cp.sum(self.short_w) | |
| stab_penalty = self.stab_lambda * cp.sum(cp.multiply(self.stability_spreads, cp.square(self.w))) if self.stab_lambda > 0 else 0.0 | |
| tax_cost = 0.0 | |
| if self.has_basis: | |
| tax_cost = cp.sum(cp.multiply(self.sell_w, self.state.gain_fractions * self.state.tax_rates)) | |
| risk_penalty, self.risk_measure_constraints, self.cvar_cost, cvar_constraints = self._build_risk_measure() | |
| self.base_objective_term = ( | |
| expected_portfolio_return | |
| + cash_yield | |
| - risk_penalty | |
| - tc_cost | |
| - market_impact_penalty | |
| - tax_cost | |
| - borrow_drag | |
| - stab_penalty | |
| ) | |
| use_cvar = self.cfg.get('cvar_enabled', True) | |
| def run_relaxation_loop(): | |
| prob = None | |
| relaxation_log = [] | |
| _ss = self.safe_min if not self.allow_shorts else -1.0 | |
| stage_params = [ | |
| ("Base", True, False, True, True, self.safe_min, self.asset_max, self.sector_limit, self.max_turnover, None), | |
| ("Widen Beta", True, True, True, True, self.safe_min, self.asset_max, self.sector_limit, self.max_turnover, "Widened portfolio beta targets."), | |
| ("Drop Beta", False, False, True, True, self.safe_min, self.asset_max, self.sector_limit, self.max_turnover, "Dropped portfolio beta target constraints."), | |
| ("Drop Duration", False, False, False, True, self.safe_min, self.asset_max, self.sector_limit, self.max_turnover, "Dropped portfolio duration targets."), | |
| ("Drop Factors", False, False, False, False, self.safe_min, self.asset_max, self.sector_limit, self.max_turnover, "Dropped Factor Neutrality/Risk Budgeting."), | |
| ("Remove Min Weights", False, False, False, False, _ss, self.asset_max, self.sector_limit, self.max_turnover, "Removed minimum asset weight limits."), | |
| ("Relax Sector Caps", False, False, False, False, _ss, self.asset_max, min(1.0,self.sector_limit*2), self.max_turnover, "Doubled sector concentration limits."), | |
| ("Remove Turnover", False, False, False, False, _ss, self.asset_max, min(1.0,self.sector_limit*2), 2.0, "Removed maximum turnover constraint."), | |
| ("Unconstrained", False, False, False, False, _ss, 1.0, 1.0, 2.0, "Removed maximum single-asset weight limits."), | |
| ] | |
| for cvar_pass, cvar_active in enumerate([True, False]): | |
| relaxation_log = [] | |
| active_cvar = list(cvar_constraints) if (use_cvar and cvar_active) else [] | |
| for stage_idx, (name, use_beta, widen_beta, use_dur, use_factors, cur_min, cur_max, cur_sec, cur_turn, log_msg) in enumerate(stage_params): | |
| prob, cons_dict, cons_list = self._get_problem(use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn, use_factors, active_cvar) | |
| is_deep = stage_idx >= 5 | |
| solved, reason = self._solve_prob(prob, use_beta, widen_beta, use_dur, cur_min, cur_max, cur_sec, cur_turn, use_factors, is_deep_relaxation=is_deep) | |
| if solved: | |
| if log_msg: | |
| relaxation_log.append(log_msg) | |
| if stage_idx >= 7: | |
| print(f"\n {Color.YELLOW}⚠ WARNING: Optimization dropped into 'Unconstrained' mode. Original constraints abandoned due to mathematical infeasibility.{Color.RESET}") | |
| if not self.silent: | |
| logger.warning("Optimization dropped into 'Unconstrained' mode.") | |
| elif stage_idx >= 4: | |
| print(f"\n {Color.YELLOW}⚠ WARNING: Optimization hit deep relaxation (Stage {stage_idx}). Constraints heavily modified.{Color.RESET}") | |
| if not self.silent: | |
| logger.warning(f"Optimization hit deep relaxation (Stage {stage_idx}).") | |
| return prob, cons_dict, cons_list, relaxation_log | |
| relaxation_log.append(f"Stage '{name}' failed: {reason}") | |
| if log_msg: | |
| relaxation_log.append(log_msg) | |
| if prob is not None and not self.silent: | |
| logger.debug(f"All relaxations failed. Final prob.status was: {prob.status}") | |
| return None, None, None, relaxation_log | |
| if not self.silent: | |
| print(f" Solving convex optimization (β range {self.b_min:.1f}–{self.b_max:.1f})...", end="", flush=True) | |
| prob, final_cons_dict, final_cons_list, relaxation_log = run_relaxation_loop() | |
| # --- Cardinality Constraint Heuristic --- | |
| if prob is not None and prob.status in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE] and self.max_assets > 0: | |
| w_val = np.asarray(self.w.value).flatten() | |
| active_mask = np.abs(w_val) > 1e-4 | |
| if np.sum(active_mask) > self.max_assets: | |
| num_to_drop = int(np.sum(active_mask)) - self.max_assets | |
| active_indices = np.where(active_mask)[0] | |
| sorted_by_weight = active_indices[np.argsort(np.abs(w_val[active_indices]))] | |
| drop_candidates = list(sorted_by_weight[:num_to_drop]) | |
| feasible_drop = False | |
| while len(drop_candidates) > 0 and not feasible_drop: | |
| self.dropped_indices = drop_candidates | |
| prob_card, cons_dict_card, cons_list_card, log_card = run_relaxation_loop() | |
| if prob_card is not None and prob_card.status in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]: | |
| prob = prob_card | |
| final_cons_dict = cons_dict_card | |
| final_cons_list = cons_list_card | |
| relaxation_log.extend(log_card) | |
| relaxation_log.append(f"Applied Cardinality Constraint (max {self.max_assets} assets), pruned {len(drop_candidates)} assets.") | |
| feasible_drop = True | |
| else: | |
| drop_candidates.pop() | |
| if not feasible_drop: | |
| relaxation_log.append("Failed to safely apply cardinality constraint (infeasible bounds). Maintaining continuous weights.") | |
| self.dropped_indices = [] | |
| prob, final_cons_dict, final_cons_list, log_card = run_relaxation_loop() | |
| # --------------------------------------- | |
| if prob is None: | |
| return CVXPYResult(None, {}, {}, relaxation_log, "All relaxation stages exhausted.") | |
| if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE] or self.w.value is None: | |
| return CVXPYResult(None, {}, {}, relaxation_log, "Solver failed to find optimal status.") | |
| best_w = pd.Series(self.w.value.copy(), index=self.tickers) | |
| display_constraints: Dict[str, float] = {} | |
| binding_constraints: Dict[str, float] = {} | |
| if prob.status in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE] and len(final_cons_list) > 0: | |
| display_constraints = self._diagnose_binding_constraints(final_cons_dict) | |
| binding_constraints = display_constraints | |
| return CVXPYResult(best_w, display_constraints, binding_constraints, relaxation_log) | |
| def _diagnose_binding_constraints(self, cons_dict, tolerance=1e-4): | |
| binding = {} | |
| if cons_dict is None: | |
| return binding | |
| for name, tc in cons_dict.items(): | |
| if not tc.display: | |
| continue | |
| c = tc.constraint | |
| if isinstance(c, list): | |
| duals = [] | |
| for sub_c in c: | |
| if hasattr(sub_c, 'dual_value') and sub_c.dual_value is not None: | |
| duals.extend(np.array(sub_c.dual_value).flatten()) | |
| if duals: | |
| vals = np.array(duals) | |
| sig_vals = vals[np.abs(vals) > tolerance] | |
| if len(sig_vals) > 0: | |
| binding[name] = float(np.mean(sig_vals)) | |
| continue | |
| if hasattr(c, 'dual_value') and c.dual_value is not None: | |
| vals = np.array(c.dual_value).flatten() | |
| sig_vals = vals[np.abs(vals) > tolerance] | |
| if len(sig_vals) > 0: | |
| binding[name] = float(np.mean(sig_vals)) | |
| return binding | |