Spaces:
Running
Running
File size: 6,139 Bytes
cafdd88 c3aab0c cafdd88 2750cce cafdd88 c3aab0c cafdd88 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | import cvxpy as cp
import pandas as pd
import numpy as np
import logging
from typing import List, Dict, Optional
from core.schema import OptimizationResult
from config import settings
logger = logging.getLogger(__name__)
class PortfolioOptimizer:
"""
Quantitative Optimization Engine using CVXPY.
Objective: Minimize Tracking Error against a Benchmark.
Constraints:
1. Full Investment (Sum w = 1)
2. Long Only (w >= 0)
3. Sector Exclusions (w[excluded] = 0)
"""
def __init__(self):
pass
def optimize_portfolio(self,
covariance_matrix: pd.DataFrame,
tickers: List[str],
benchmark_weights: pd.DataFrame,
sector_map: Dict[str, str],
excluded_sectors: List[str],
excluded_tickers: List[str] = None,
max_weight: float = None) -> OptimizationResult:
"""
Solves the tracking error minimization problem.
Args:
covariance_matrix: (N x N) Ledoit-Wolf shrunk covariance matrix.
tickers: List of N tickers.
benchmark_weights: (N x 1) Weights of the benchmark (e.g. S&P 500).
Un-held assets should have 0 weight.
sector_map: Dictionary mapping ticker -> sector.
excluded_sectors: List of sectors to exclude.
excluded_tickers: List of specific tickers to exclude.
Returns:
OptimizationResult containing weights and status.
"""
excluded_tickers = excluded_tickers or []
n_assets = len(tickers)
if covariance_matrix.shape != (n_assets, n_assets):
raise ValueError(f"Covariance matrix shape {covariance_matrix.shape} does not match tickers count {n_assets}")
logger.info(f"Setting up CVXPY optimization for {n_assets} assets...")
# Variables
w = cp.Variable(n_assets)
# Benchmark Weights Vector (aligned to tickers)
if isinstance(benchmark_weights, (pd.Series, pd.DataFrame)):
w_b = benchmark_weights.reindex(tickers).fillna(0).values.flatten()
else:
w_b = np.array(benchmark_weights)
# Objective
active_weights = w - w_b
tracking_error_variance = cp.quad_form(active_weights, covariance_matrix.values)
objective = cp.Minimize(tracking_error_variance)
# 1. Identify Exclusions FIRST to adjust constraints
excluded_indices = []
mask_vector = np.zeros(n_assets)
# Sector Exclusions
if excluded_sectors:
logger.info(f"Applying Sector Exclusion Validation for: {excluded_sectors}")
for i, ticker in enumerate(tickers):
sector = sector_map.get(ticker, "Unknown")
if sector in excluded_sectors:
excluded_indices.append(i)
mask_vector[i] = 1
# Ticker Exclusions (NEW)
if excluded_tickers:
logger.info(f"Applying Ticker Exclusion Validation for: {excluded_tickers}")
for i, ticker in enumerate(tickers):
if ticker in excluded_tickers:
excluded_indices.append(i)
mask_vector[i] = 1
excluded_indices = list(set(excluded_indices)) # Dedupe
logger.info(f"DEBUG: Excluded Mask Sum = {mask_vector.sum()} assets out of {n_assets}")
if len(excluded_indices) == n_assets:
raise ValueError("All assets excluded! Cannot optimize.")
# 2. Dynamic Constraints
n_active = n_assets - len(excluded_indices)
if n_active == 0: n_active = 1
min_avg_weight = 1.0 / n_active
dynamic_max = max(0.20, min_avg_weight * 1.5)
if max_weight and max_weight > min_avg_weight:
logger.info(f"Applying User-Defined Max Weight: {max_weight}")
MAX_WEIGHT_LIMIT = max_weight
else:
MAX_WEIGHT_LIMIT = dynamic_max
logger.info(f"DEBUG: Active Assets={n_active}, Min Avg={min_avg_weight:.4f}, Dynamic Max Limit={MAX_WEIGHT_LIMIT:.4f}")
constraints = [
cp.sum(w) == 1,
w >= 0,
w <= MAX_WEIGHT_LIMIT
]
# Apply Exclusions
if excluded_indices:
constraints.append(w[excluded_indices] == 0)
# Problem
prob = cp.Problem(objective, constraints)
try:
logger.info("Solving quadratic programming problem...")
# verbose=True to see solver output in logs
prob.solve(verbose=True)
except Exception as e:
logger.error(f"Optimization CRASHED: {e}")
raise e
# CHECK SOLVER STATUS
if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
logger.error(f"Optimization FAILED with status: {prob.status}")
raise ValueError(f"Solver failed: {prob.status}")
# Extract weights
optimal_weights = w.value
if optimal_weights is None:
raise ValueError("Solver returned None for weights.")
# Add small tolerance cleanup
optimal_weights[optimal_weights < 1e-4] = 0
# Normalize just in case (solver precision)
# optimal_weights = optimal_weights / optimal_weights.sum()
# Format Result
weight_dict = {
tickers[i]: float(optimal_weights[i])
for i in range(n_assets)
if optimal_weights[i] > 0
}
# Calculate resulting Tracking Error (volatility of active returns)
# TE = sqrt(variance)
te = np.sqrt(prob.value) if prob.value > 0 else 0.0
logger.info(f"Optimization Solved. Tracking Error: {te:.4f}")
return OptimizationResult(
weights=weight_dict,
tracking_error=te,
status=prob.status
)
|