Premchan369 commited on
Commit
72c80ab
·
verified ·
1 Parent(s): a50d1b8

Upload portfolio_optimizer.py

Browse files
Files changed (1) hide show
  1. portfolio_optimizer.py +315 -0
portfolio_optimizer.py ADDED
@@ -0,0 +1,315 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Portfolio Optimizer - Risk-aware allocation engine."""
2
+ import numpy as np
3
+ import pandas as pd
4
+ from scipy.optimize import minimize
5
+ from typing import Dict, List, Optional, Tuple
6
+ import warnings
7
+ warnings.filterwarnings('ignore')
8
+
9
+
10
+ class PortfolioOptimizer:
11
+ """Portfolio optimizer with constraints and robust optimization"""
12
+
13
+ def __init__(self,
14
+ max_weight: float = 0.20,
15
+ min_weight: float = 0.0,
16
+ target_return: Optional[float] = None,
17
+ risk_free_rate: float = 0.04,
18
+ transaction_cost: float = 0.0003,
19
+ turnover_penalty: float = 0.001,
20
+ risk_aversion: float = 1.0):
21
+ self.max_weight = max_weight
22
+ self.min_weight = min_weight
23
+ self.target_return = target_return
24
+ self.risk_free_rate = risk_free_rate
25
+ self.transaction_cost = transaction_cost
26
+ self.turnover_penalty = turnover_penalty
27
+ self.risk_aversion = risk_aversion
28
+
29
+ def optimize_mean_variance(self,
30
+ mu: np.ndarray,
31
+ Sigma: np.ndarray,
32
+ current_weights: Optional[np.ndarray] = None,
33
+ long_only: bool = True) -> Dict:
34
+ """
35
+ Mean-variance optimization with transaction costs
36
+
37
+ Args:
38
+ mu: Expected returns vector (n_assets,)
39
+ Sigma: Covariance matrix (n_assets, n_assets)
40
+ current_weights: Current portfolio weights (n_assets,)
41
+ long_only: If True, weights must be >= 0
42
+
43
+ Returns:
44
+ Dict with weights, expected_return, volatility, sharpe
45
+ """
46
+ n_assets = len(mu)
47
+
48
+ # Objective: maximize utility = return - risk_aversion * variance - transaction_costs
49
+ def objective(w):
50
+ port_return = np.dot(w, mu)
51
+ port_variance = np.dot(w, np.dot(Sigma, w))
52
+
53
+ # Transaction cost penalty
54
+ if current_weights is not None:
55
+ turnover = np.sum(np.abs(w - current_weights))
56
+ tc_penalty = self.turnover_penalty * turnover
57
+ else:
58
+ tc_penalty = 0
59
+
60
+ # Negative utility (for minimization)
61
+ return -(port_return - self.risk_aversion * port_variance - tc_penalty)
62
+
63
+ # Constraints
64
+ constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] # Fully invested
65
+
66
+ if self.target_return is not None:
67
+ constraints.append(
68
+ {'type': 'eq', 'fun': lambda w: np.dot(w, mu) - self.target_return}
69
+ )
70
+
71
+ # Bounds
72
+ if long_only:
73
+ bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)]
74
+ else:
75
+ bounds = [(-self.max_weight, self.max_weight) for _ in range(n_assets)]
76
+
77
+ # Initial guess: equal weight
78
+ w0 = np.ones(n_assets) / n_assets
79
+
80
+ # Optimize
81
+ result = minimize(
82
+ objective,
83
+ w0,
84
+ method='SLSQP',
85
+ bounds=bounds,
86
+ constraints=constraints,
87
+ options={'maxiter': 1000, 'ftol': 1e-9}
88
+ )
89
+
90
+ if not result.success:
91
+ print(f"Optimization warning: {result.message}")
92
+
93
+ weights = result.x
94
+ weights = np.maximum(weights, 0) # Clean small negatives
95
+ weights /= np.sum(weights) # Renormalize
96
+
97
+ # Compute portfolio metrics
98
+ port_return = np.dot(weights, mu)
99
+ port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights)))
100
+ sharpe = (port_return - self.risk_free_rate) / port_vol if port_vol > 0 else 0
101
+
102
+ return {
103
+ 'weights': weights,
104
+ 'expected_return': port_return,
105
+ 'volatility': port_vol,
106
+ 'sharpe_ratio': sharpe,
107
+ 'success': result.success
108
+ }
109
+
110
+ def optimize_max_sharpe(self,
111
+ mu: np.ndarray,
112
+ Sigma: np.ndarray,
113
+ current_weights: Optional[np.ndarray] = None) -> Dict:
114
+ """Optimize for maximum Sharpe ratio"""
115
+ n_assets = len(mu)
116
+
117
+ def neg_sharpe(w):
118
+ port_return = np.dot(w, mu)
119
+ port_vol = np.sqrt(np.dot(w, np.dot(Sigma, w)))
120
+
121
+ if current_weights is not None:
122
+ turnover = np.sum(np.abs(w - current_weights))
123
+ port_return -= self.turnover_penalty * turnover
124
+
125
+ return -(port_return - self.risk_free_rate) / (port_vol + 1e-8)
126
+
127
+ constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]
128
+ bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)]
129
+ w0 = np.ones(n_assets) / n_assets
130
+
131
+ result = minimize(
132
+ neg_sharpe,
133
+ w0,
134
+ method='SLSQP',
135
+ bounds=bounds,
136
+ constraints=constraints,
137
+ options={'maxiter': 1000}
138
+ )
139
+
140
+ weights = result.x
141
+ weights = np.maximum(weights, 0)
142
+ weights /= np.sum(weights)
143
+
144
+ port_return = np.dot(weights, mu)
145
+ port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights)))
146
+ sharpe = (port_return - self.risk_free_rate) / port_vol
147
+
148
+ return {
149
+ 'weights': weights,
150
+ 'expected_return': port_return,
151
+ 'volatility': port_vol,
152
+ 'sharpe_ratio': sharpe,
153
+ 'success': result.success
154
+ }
155
+
156
+ def optimize_min_volatility(self,
157
+ mu: np.ndarray,
158
+ Sigma: np.ndarray,
159
+ min_return: Optional[float] = None) -> Dict:
160
+ """Optimize for minimum volatility with optional return constraint"""
161
+ n_assets = len(mu)
162
+
163
+ def variance(w):
164
+ return np.dot(w, np.dot(Sigma, w))
165
+
166
+ constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]
167
+
168
+ if min_return is not None:
169
+ constraints.append(
170
+ {'type': 'ineq', 'fun': lambda w: np.dot(w, mu) - min_return}
171
+ )
172
+
173
+ bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)]
174
+ w0 = np.ones(n_assets) / n_assets
175
+
176
+ result = minimize(
177
+ variance,
178
+ w0,
179
+ method='SLSQP',
180
+ bounds=bounds,
181
+ constraints=constraints,
182
+ options={'maxiter': 1000}
183
+ )
184
+
185
+ weights = result.x
186
+ weights = np.maximum(weights, 0)
187
+ weights /= np.sum(weights)
188
+
189
+ port_return = np.dot(weights, mu)
190
+ port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights)))
191
+ sharpe = (port_return - self.risk_free_rate) / port_vol if port_vol > 0 else 0
192
+
193
+ return {
194
+ 'weights': weights,
195
+ 'expected_return': port_return,
196
+ 'volatility': port_vol,
197
+ 'sharpe_ratio': sharpe,
198
+ 'success': result.success
199
+ }
200
+
201
+ def robust_optimization(self,
202
+ mu: np.ndarray,
203
+ Sigma: np.ndarray,
204
+ mu_uncertainty: Optional[np.ndarray] = None,
205
+ Sigma_uncertainty: Optional[float] = None) -> Dict:
206
+ """
207
+ Robust optimization with uncertainty sets
208
+
209
+ Uses worst-case approach: optimize for worst-case mu within uncertainty ellipsoid
210
+ """
211
+ n_assets = len(mu)
212
+
213
+ if mu_uncertainty is None:
214
+ # Default: 20% uncertainty on expected returns
215
+ mu_uncertainty = np.abs(mu) * 0.2
216
+
217
+ # Worst-case return: mu - uncertainty
218
+ mu_worst = mu - mu_uncertainty
219
+
220
+ # Add covariance uncertainty
221
+ if Sigma_uncertainty is not None:
222
+ Sigma_robust = Sigma + np.eye(n_assets) * Sigma_uncertainty
223
+ else:
224
+ Sigma_robust = Sigma
225
+
226
+ return self.optimize_mean_variance(mu_worst, Sigma_robust)
227
+
228
+ def black_litterman(self,
229
+ market_caps: np.ndarray,
230
+ Sigma: np.ndarray,
231
+ risk_aversion: float = 2.5,
232
+ views: Optional[List[Dict]] = None,
233
+ view_confidence: float = 0.5) -> Dict:
234
+ """
235
+ Black-Litterman model for incorporating investor views
236
+
237
+ Args:
238
+ market_caps: Market capitalization weights
239
+ Sigma: Covariance matrix
240
+ risk_aversion: Risk aversion parameter
241
+ views: List of view dicts with 'assets', 'direction', 'magnitude'
242
+ view_confidence: Confidence in views (0-1)
243
+ """
244
+ n_assets = len(market_caps)
245
+
246
+ # Implied equilibrium returns
247
+ Pi = risk_aversion * np.dot(Sigma, market_caps)
248
+
249
+ if views is None or len(views) == 0:
250
+ # No views: use market equilibrium
251
+ return self.optimize_mean_variance(Pi, Sigma)
252
+
253
+ # Build view matrix P and view vector Q
254
+ P = []
255
+ Q = []
256
+ Omega_diag = []
257
+
258
+ for view in views:
259
+ assets = view['assets']
260
+ direction = view['direction'] # 'overweight' or 'underweight'
261
+ magnitude = view['magnitude']
262
+
263
+ p_row = np.zeros(n_assets)
264
+ for asset in assets:
265
+ p_row[asset] = 1.0 / len(assets)
266
+
267
+ P.append(p_row)
268
+ Q.append(magnitude if direction == 'overweight' else -magnitude)
269
+ Omega_diag.append(view_confidence)
270
+
271
+ P = np.array(P)
272
+ Q = np.array(Q)
273
+ Omega = np.diag(Omega_diag)
274
+
275
+ # Black-Litterman formula
276
+ tau = 0.05 # Uncertainty scaling
277
+
278
+ M_inverse = np.linalg.inv(tau * Sigma)
279
+ middle = np.linalg.inv(np.dot(np.dot(P, tau * Sigma), P.T) + Omega)
280
+
281
+ BL_mu = Pi + np.dot(
282
+ np.dot(tau * Sigma, P.T),
283
+ np.dot(middle, Q - np.dot(P, Pi))
284
+ )
285
+
286
+ BL_Sigma = Sigma + tau * Sigma - np.dot(
287
+ np.dot(tau * Sigma, P.T),
288
+ np.dot(middle, np.dot(P, tau * Sigma))
289
+ )
290
+
291
+ return self.optimize_mean_variance(BL_mu, BL_Sigma)
292
+
293
+ def compute_efficient_frontier(self,
294
+ mu: np.ndarray,
295
+ Sigma: np.ndarray,
296
+ n_points: int = 50) -> pd.DataFrame:
297
+ """Compute efficient frontier points"""
298
+ min_vol_result = self.optimize_min_volatility(mu, Sigma)
299
+ max_ret_result = self.optimize_mean_variance(mu, Sigma, target_return=np.max(mu))
300
+
301
+ min_ret = min_vol_result['expected_return']
302
+ max_ret = max_ret_result['expected_return']
303
+
304
+ target_returns = np.linspace(min_ret, max_ret, n_points)
305
+
306
+ frontier = []
307
+ for target in target_returns:
308
+ result = self.optimize_mean_variance(mu, Sigma, target_return=target)
309
+ frontier.append({
310
+ 'target_return': target,
311
+ 'volatility': result['volatility'],
312
+ 'sharpe': result['sharpe_ratio']
313
+ })
314
+
315
+ return pd.DataFrame(frontier)