"""Portfolio Optimizer - Risk-aware allocation engine.""" import numpy as np import pandas as pd from scipy.optimize import minimize from typing import Dict, List, Optional, Tuple import warnings warnings.filterwarnings('ignore') class PortfolioOptimizer: """Portfolio optimizer with constraints and robust optimization""" def __init__(self, max_weight: float = 0.20, min_weight: float = 0.0, target_return: Optional[float] = None, risk_free_rate: float = 0.04, transaction_cost: float = 0.0003, turnover_penalty: float = 0.001, risk_aversion: float = 1.0): self.max_weight = max_weight self.min_weight = min_weight self.target_return = target_return self.risk_free_rate = risk_free_rate self.transaction_cost = transaction_cost self.turnover_penalty = turnover_penalty self.risk_aversion = risk_aversion def optimize_mean_variance(self, mu: np.ndarray, Sigma: np.ndarray, current_weights: Optional[np.ndarray] = None, long_only: bool = True) -> Dict: """ Mean-variance optimization with transaction costs Args: mu: Expected returns vector (n_assets,) Sigma: Covariance matrix (n_assets, n_assets) current_weights: Current portfolio weights (n_assets,) long_only: If True, weights must be >= 0 Returns: Dict with weights, expected_return, volatility, sharpe """ n_assets = len(mu) # Objective: maximize utility = return - risk_aversion * variance - transaction_costs def objective(w): port_return = np.dot(w, mu) port_variance = np.dot(w, np.dot(Sigma, w)) # Transaction cost penalty if current_weights is not None: turnover = np.sum(np.abs(w - current_weights)) tc_penalty = self.turnover_penalty * turnover else: tc_penalty = 0 # Negative utility (for minimization) return -(port_return - self.risk_aversion * port_variance - tc_penalty) # Constraints constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] # Fully invested if self.target_return is not None: constraints.append( {'type': 'eq', 'fun': lambda w: np.dot(w, mu) - self.target_return} ) # Bounds if long_only: bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)] else: bounds = [(-self.max_weight, self.max_weight) for _ in range(n_assets)] # Initial guess: equal weight w0 = np.ones(n_assets) / n_assets # Optimize result = minimize( objective, w0, method='SLSQP', bounds=bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9} ) if not result.success: print(f"Optimization warning: {result.message}") weights = result.x weights = np.maximum(weights, 0) # Clean small negatives weights /= np.sum(weights) # Renormalize # Compute portfolio metrics port_return = np.dot(weights, mu) port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights))) sharpe = (port_return - self.risk_free_rate) / port_vol if port_vol > 0 else 0 return { 'weights': weights, 'expected_return': port_return, 'volatility': port_vol, 'sharpe_ratio': sharpe, 'success': result.success } def optimize_max_sharpe(self, mu: np.ndarray, Sigma: np.ndarray, current_weights: Optional[np.ndarray] = None) -> Dict: """Optimize for maximum Sharpe ratio""" n_assets = len(mu) def neg_sharpe(w): port_return = np.dot(w, mu) port_vol = np.sqrt(np.dot(w, np.dot(Sigma, w))) if current_weights is not None: turnover = np.sum(np.abs(w - current_weights)) port_return -= self.turnover_penalty * turnover return -(port_return - self.risk_free_rate) / (port_vol + 1e-8) constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)] w0 = np.ones(n_assets) / n_assets result = minimize( neg_sharpe, w0, method='SLSQP', bounds=bounds, constraints=constraints, options={'maxiter': 1000} ) weights = result.x weights = np.maximum(weights, 0) weights /= np.sum(weights) port_return = np.dot(weights, mu) port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights))) sharpe = (port_return - self.risk_free_rate) / port_vol return { 'weights': weights, 'expected_return': port_return, 'volatility': port_vol, 'sharpe_ratio': sharpe, 'success': result.success } def optimize_min_volatility(self, mu: np.ndarray, Sigma: np.ndarray, min_return: Optional[float] = None) -> Dict: """Optimize for minimum volatility with optional return constraint""" n_assets = len(mu) def variance(w): return np.dot(w, np.dot(Sigma, w)) constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] if min_return is not None: constraints.append( {'type': 'ineq', 'fun': lambda w: np.dot(w, mu) - min_return} ) bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)] w0 = np.ones(n_assets) / n_assets result = minimize( variance, w0, method='SLSQP', bounds=bounds, constraints=constraints, options={'maxiter': 1000} ) weights = result.x weights = np.maximum(weights, 0) weights /= np.sum(weights) port_return = np.dot(weights, mu) port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights))) sharpe = (port_return - self.risk_free_rate) / port_vol if port_vol > 0 else 0 return { 'weights': weights, 'expected_return': port_return, 'volatility': port_vol, 'sharpe_ratio': sharpe, 'success': result.success } def robust_optimization(self, mu: np.ndarray, Sigma: np.ndarray, mu_uncertainty: Optional[np.ndarray] = None, Sigma_uncertainty: Optional[float] = None) -> Dict: """ Robust optimization with uncertainty sets Uses worst-case approach: optimize for worst-case mu within uncertainty ellipsoid """ n_assets = len(mu) if mu_uncertainty is None: # Default: 20% uncertainty on expected returns mu_uncertainty = np.abs(mu) * 0.2 # Worst-case return: mu - uncertainty mu_worst = mu - mu_uncertainty # Add covariance uncertainty if Sigma_uncertainty is not None: Sigma_robust = Sigma + np.eye(n_assets) * Sigma_uncertainty else: Sigma_robust = Sigma return self.optimize_mean_variance(mu_worst, Sigma_robust) def black_litterman(self, market_caps: np.ndarray, Sigma: np.ndarray, risk_aversion: float = 2.5, views: Optional[List[Dict]] = None, view_confidence: float = 0.5) -> Dict: """ Black-Litterman model for incorporating investor views Args: market_caps: Market capitalization weights Sigma: Covariance matrix risk_aversion: Risk aversion parameter views: List of view dicts with 'assets', 'direction', 'magnitude' view_confidence: Confidence in views (0-1) """ n_assets = len(market_caps) # Implied equilibrium returns Pi = risk_aversion * np.dot(Sigma, market_caps) if views is None or len(views) == 0: # No views: use market equilibrium return self.optimize_mean_variance(Pi, Sigma) # Build view matrix P and view vector Q P = [] Q = [] Omega_diag = [] for view in views: assets = view['assets'] direction = view['direction'] # 'overweight' or 'underweight' magnitude = view['magnitude'] p_row = np.zeros(n_assets) for asset in assets: p_row[asset] = 1.0 / len(assets) P.append(p_row) Q.append(magnitude if direction == 'overweight' else -magnitude) Omega_diag.append(view_confidence) P = np.array(P) Q = np.array(Q) Omega = np.diag(Omega_diag) # Black-Litterman formula tau = 0.05 # Uncertainty scaling M_inverse = np.linalg.inv(tau * Sigma) middle = np.linalg.inv(np.dot(np.dot(P, tau * Sigma), P.T) + Omega) BL_mu = Pi + np.dot( np.dot(tau * Sigma, P.T), np.dot(middle, Q - np.dot(P, Pi)) ) BL_Sigma = Sigma + tau * Sigma - np.dot( np.dot(tau * Sigma, P.T), np.dot(middle, np.dot(P, tau * Sigma)) ) return self.optimize_mean_variance(BL_mu, BL_Sigma) def compute_efficient_frontier(self, mu: np.ndarray, Sigma: np.ndarray, n_points: int = 50) -> pd.DataFrame: """Compute efficient frontier points""" min_vol_result = self.optimize_min_volatility(mu, Sigma) max_ret_result = self.optimize_mean_variance(mu, Sigma, target_return=np.max(mu)) min_ret = min_vol_result['expected_return'] max_ret = max_ret_result['expected_return'] target_returns = np.linspace(min_ret, max_ret, n_points) frontier = [] for target in target_returns: result = self.optimize_mean_variance(mu, Sigma, target_return=target) frontier.append({ 'target_return': target, 'volatility': result['volatility'], 'sharpe': result['sharpe_ratio'] }) return pd.DataFrame(frontier)