Spaces:
Sleeping
Sleeping
| import cvxpy as cp | |
| import pandas as pd | |
| import numpy as np | |
| import logging | |
| from typing import List, Dict, Optional | |
| from core.schema import OptimizationResult | |
| from config import settings | |
| logger = logging.getLogger(__name__) | |
| class PortfolioOptimizer: | |
| """ | |
| Quantitative Optimization Engine using CVXPY. | |
| Objective: Minimize Tracking Error against a Benchmark. | |
| Constraints: | |
| 1. Full Investment (Sum w = 1) | |
| 2. Long Only (w >= 0) | |
| 3. Sector Exclusions (w[excluded] = 0) | |
| """ | |
| def __init__(self): | |
| pass | |
| def optimize_portfolio(self, | |
| covariance_matrix: pd.DataFrame, | |
| tickers: List[str], | |
| benchmark_weights: pd.DataFrame, | |
| sector_map: Dict[str, str], | |
| excluded_sectors: List[str], | |
| excluded_tickers: List[str] = None, | |
| max_weight: float = None) -> OptimizationResult: | |
| """ | |
| Solves the tracking error minimization problem. | |
| Args: | |
| covariance_matrix: (N x N) Ledoit-Wolf shrunk covariance matrix. | |
| tickers: List of N tickers. | |
| benchmark_weights: (N x 1) Weights of the benchmark (e.g. S&P 500). | |
| Un-held assets should have 0 weight. | |
| sector_map: Dictionary mapping ticker -> sector. | |
| excluded_sectors: List of sectors to exclude. | |
| excluded_tickers: List of specific tickers to exclude. | |
| Returns: | |
| OptimizationResult containing weights and status. | |
| """ | |
| excluded_tickers = excluded_tickers or [] | |
| n_assets = len(tickers) | |
| if covariance_matrix.shape != (n_assets, n_assets): | |
| raise ValueError(f"Covariance matrix shape {covariance_matrix.shape} does not match tickers count {n_assets}") | |
| logger.info(f"Setting up CVXPY optimization for {n_assets} assets...") | |
| # Variables | |
| w = cp.Variable(n_assets) | |
| # Benchmark Weights Vector (aligned to tickers) | |
| if isinstance(benchmark_weights, (pd.Series, pd.DataFrame)): | |
| w_b = benchmark_weights.reindex(tickers).fillna(0).values.flatten() | |
| else: | |
| w_b = np.array(benchmark_weights) | |
| # Objective | |
| active_weights = w - w_b | |
| tracking_error_variance = cp.quad_form(active_weights, covariance_matrix.values) | |
| objective = cp.Minimize(tracking_error_variance) | |
| # 1. Identify Exclusions FIRST to adjust constraints | |
| excluded_indices = [] | |
| mask_vector = np.zeros(n_assets) | |
| # Sector Exclusions | |
| if excluded_sectors: | |
| logger.info(f"Applying Sector Exclusion Validation for: {excluded_sectors}") | |
| for i, ticker in enumerate(tickers): | |
| sector = sector_map.get(ticker, "Unknown") | |
| if sector in excluded_sectors: | |
| excluded_indices.append(i) | |
| mask_vector[i] = 1 | |
| # Ticker Exclusions (NEW) | |
| if excluded_tickers: | |
| logger.info(f"Applying Ticker Exclusion Validation for: {excluded_tickers}") | |
| for i, ticker in enumerate(tickers): | |
| if ticker in excluded_tickers: | |
| excluded_indices.append(i) | |
| mask_vector[i] = 1 | |
| excluded_indices = list(set(excluded_indices)) # Dedupe | |
| logger.info(f"DEBUG: Excluded Mask Sum = {mask_vector.sum()} assets out of {n_assets}") | |
| if len(excluded_indices) == n_assets: | |
| raise ValueError("All assets excluded! Cannot optimize.") | |
| # 2. Dynamic Constraints | |
| n_active = n_assets - len(excluded_indices) | |
| if n_active == 0: n_active = 1 | |
| min_avg_weight = 1.0 / n_active | |
| dynamic_max = max(0.20, min_avg_weight * 1.5) | |
| if max_weight and max_weight > min_avg_weight: | |
| logger.info(f"Applying User-Defined Max Weight: {max_weight}") | |
| MAX_WEIGHT_LIMIT = max_weight | |
| else: | |
| MAX_WEIGHT_LIMIT = dynamic_max | |
| logger.info(f"DEBUG: Active Assets={n_active}, Min Avg={min_avg_weight:.4f}, Dynamic Max Limit={MAX_WEIGHT_LIMIT:.4f}") | |
| constraints = [ | |
| cp.sum(w) == 1, | |
| w >= 0, | |
| w <= MAX_WEIGHT_LIMIT | |
| ] | |
| # Apply Exclusions | |
| if excluded_indices: | |
| constraints.append(w[excluded_indices] == 0) | |
| # Problem | |
| prob = cp.Problem(objective, constraints) | |
| try: | |
| logger.info("Solving quadratic programming problem...") | |
| # verbose=True to see solver output in logs | |
| prob.solve(verbose=True) | |
| except Exception as e: | |
| logger.error(f"Optimization CRASHED: {e}") | |
| raise e | |
| # CHECK SOLVER STATUS | |
| if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]: | |
| logger.error(f"Optimization FAILED with status: {prob.status}") | |
| raise ValueError(f"Solver failed: {prob.status}") | |
| # Extract weights | |
| optimal_weights = w.value | |
| if optimal_weights is None: | |
| raise ValueError("Solver returned None for weights.") | |
| # Add small tolerance cleanup | |
| optimal_weights[optimal_weights < 1e-4] = 0 | |
| # Normalize just in case (solver precision) | |
| # optimal_weights = optimal_weights / optimal_weights.sum() | |
| # Format Result | |
| weight_dict = { | |
| tickers[i]: float(optimal_weights[i]) | |
| for i in range(n_assets) | |
| if optimal_weights[i] > 0 | |
| } | |
| # Calculate resulting Tracking Error (volatility of active returns) | |
| # TE = sqrt(variance) | |
| te = np.sqrt(prob.value) if prob.value > 0 else 0.0 | |
| logger.info(f"Optimization Solved. Tracking Error: {te:.4f}") | |
| return OptimizationResult( | |
| weights=weight_dict, | |
| tracking_error=te, | |
| status=prob.status | |
| ) | |