File size: 6,139 Bytes
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3aab0c
 
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2750cce
 
 
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3aab0c
 
 
 
 
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import cvxpy as cp
import pandas as pd
import numpy as np
import logging
from typing import List, Dict, Optional
from core.schema import OptimizationResult
from config import settings

logger = logging.getLogger(__name__)

class PortfolioOptimizer:
    """
    Quantitative Optimization Engine using CVXPY.
    Objective: Minimize Tracking Error against a Benchmark.
    Constraints: 
    1. Full Investment (Sum w = 1)
    2. Long Only (w >= 0)
    3. Sector Exclusions (w[excluded] = 0)
    """
    
    def __init__(self):
        pass

    def optimize_portfolio(self, 
                           covariance_matrix: pd.DataFrame,
                           tickers: List[str],
                           benchmark_weights: pd.DataFrame,
                           sector_map: Dict[str, str],
                           excluded_sectors: List[str],
                           excluded_tickers: List[str] = None,
                           max_weight: float = None) -> OptimizationResult:
        """
        Solves the tracking error minimization problem.
        
        Args:
            covariance_matrix: (N x N) Ledoit-Wolf shrunk covariance matrix.
            tickers: List of N tickers.
            benchmark_weights: (N x 1) Weights of the benchmark (e.g. S&P 500). 
                               Un-held assets should have 0 weight.
            sector_map: Dictionary mapping ticker -> sector.
            excluded_sectors: List of sectors to exclude.
            excluded_tickers: List of specific tickers to exclude.
            
        Returns:
            OptimizationResult containing weights and status.
        """
        excluded_tickers = excluded_tickers or []
        n_assets = len(tickers)
        if covariance_matrix.shape != (n_assets, n_assets):
            raise ValueError(f"Covariance matrix shape {covariance_matrix.shape} does not match tickers count {n_assets}")

        logger.info(f"Setting up CVXPY optimization for {n_assets} assets...")

        # Variables
        w = cp.Variable(n_assets)
        
        # Benchmark Weights Vector (aligned to tickers)
        if isinstance(benchmark_weights, (pd.Series, pd.DataFrame)):
            w_b = benchmark_weights.reindex(tickers).fillna(0).values.flatten()
        else:
            w_b = np.array(benchmark_weights)

        # Objective
        active_weights = w - w_b
        tracking_error_variance = cp.quad_form(active_weights, covariance_matrix.values)
        objective = cp.Minimize(tracking_error_variance)
        
        # 1. Identify Exclusions FIRST to adjust constraints
        excluded_indices = []
        mask_vector = np.zeros(n_assets)
        
        # Sector Exclusions
        if excluded_sectors:
            logger.info(f"Applying Sector Exclusion Validation for: {excluded_sectors}")
            for i, ticker in enumerate(tickers):
                sector = sector_map.get(ticker, "Unknown")
                if sector in excluded_sectors:
                    excluded_indices.append(i)
                    mask_vector[i] = 1

        # Ticker Exclusions (NEW)
        if excluded_tickers:
            logger.info(f"Applying Ticker Exclusion Validation for: {excluded_tickers}")
            for i, ticker in enumerate(tickers):
                 if ticker in excluded_tickers:
                    excluded_indices.append(i)
                    mask_vector[i] = 1
                    
        excluded_indices = list(set(excluded_indices)) # Dedupe
            
        logger.info(f"DEBUG: Excluded Mask Sum = {mask_vector.sum()} assets out of {n_assets}")
        
        if len(excluded_indices) == n_assets:
            raise ValueError("All assets excluded! Cannot optimize.")

        # 2. Dynamic Constraints
        n_active = n_assets - len(excluded_indices)
        if n_active == 0: n_active = 1
        
        min_avg_weight = 1.0 / n_active
        dynamic_max = max(0.20, min_avg_weight * 1.5)
        
        if max_weight and max_weight > min_avg_weight:
             logger.info(f"Applying User-Defined Max Weight: {max_weight}")
             MAX_WEIGHT_LIMIT = max_weight
        else:
             MAX_WEIGHT_LIMIT = dynamic_max
        logger.info(f"DEBUG: Active Assets={n_active}, Min Avg={min_avg_weight:.4f}, Dynamic Max Limit={MAX_WEIGHT_LIMIT:.4f}")
        
        constraints = [
            cp.sum(w) == 1,
            w >= 0,
            w <= MAX_WEIGHT_LIMIT
        ]
        
        # Apply Exclusions
        if excluded_indices:
             constraints.append(w[excluded_indices] == 0)

        # Problem
        prob = cp.Problem(objective, constraints)
        
        try:
            logger.info("Solving quadratic programming problem...")
            # verbose=True to see solver output in logs
            prob.solve(verbose=True) 
        except Exception as e:
            logger.error(f"Optimization CRASHED: {e}")
            raise e

        # CHECK SOLVER STATUS
        if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
            logger.error(f"Optimization FAILED with status: {prob.status}")
            raise ValueError(f"Solver failed: {prob.status}")

        # Extract weights
        optimal_weights = w.value
        if optimal_weights is None:
             raise ValueError("Solver returned None for weights.")
             
        # Add small tolerance cleanup
        optimal_weights[optimal_weights < 1e-4] = 0
        
        # Normalize just in case (solver precision)
        # optimal_weights = optimal_weights / optimal_weights.sum() 
        
        # Format Result
        weight_dict = {
            tickers[i]: float(optimal_weights[i]) 
            for i in range(n_assets) 
            if optimal_weights[i] > 0
        }
        
        # Calculate resulting Tracking Error (volatility of active returns)
        # TE = sqrt(variance)
        te = np.sqrt(prob.value) if prob.value > 0 else 0.0
        
        logger.info(f"Optimization Solved. Tracking Error: {te:.4f}")
        
        return OptimizationResult(
            weights=weight_dict,
            tracking_error=te,
            status=prob.status
        )