Spaces:
Sleeping
Sleeping
| # portfolio_optimization.py | |
| from typing import Dict, List | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.optimize import minimize | |
| import plotly.graph_objects as go # Add this import | |
| import streamlit as st # Add this import | |
| class PortfolioOptimizer: | |
| def __init__(self, returns_df: pd.DataFrame): | |
| self.returns = returns_df | |
| self.mean_returns = returns_df.mean() | |
| self.cov_matrix = returns_df.cov() | |
| def validate_data(self) -> None: | |
| """Validate input data and handle missing values.""" | |
| if self.returns.empty: | |
| raise ValueError("Returns data is empty") | |
| # Fill missing values | |
| self.returns.fillna(method='ffill', inplace=True) | |
| self.returns.fillna(method='bfill', inplace=True) | |
| # Check for remaining NaN values | |
| if self.returns.isna().any().any(): | |
| raise ValueError("Unable to fill all missing values in returns data") | |
| # Check for zero variance | |
| if (self.returns.std() == 0).any(): | |
| raise ValueError("One or more assets have zero variance") | |
| # Check for insufficient data | |
| if len(self.returns) < 30: # Minimum required data points | |
| raise ValueError("Insufficient data points for optimization (minimum 30 required)") | |
| def portfolio_performance(self, weights: np.array) -> tuple: | |
| """ | |
| Calculate portfolio performance metrics. | |
| Args: | |
| weights (np.array): Array of portfolio weights | |
| Returns: | |
| tuple: (returns, volatility, sharpe) | |
| """ | |
| try: | |
| returns = np.sum(self.mean_returns * weights) | |
| volatility = np.sqrt(np.dot(weights.T, np.dot(self.cov_matrix, weights))) | |
| sharpe = returns / volatility if volatility > 0 else 0 | |
| return returns, volatility, sharpe | |
| except Exception as e: | |
| raise ValueError(f"Error calculating portfolio performance: {str(e)}") | |
| def optimize_portfolio(self, target_return: float, risk_free_rate: float = 0.0) -> Dict: | |
| """Optimize portfolio weights for target return.""" | |
| try: | |
| num_assets = len(self.returns.columns) | |
| if num_assets < 2: | |
| return { | |
| 'success': False, | |
| 'message': 'Need at least 2 assets for optimization' | |
| } | |
| # Initial guess (equal weights) | |
| init_weights = np.array([1.0/num_assets] * num_assets) | |
| # Constraints | |
| bounds = tuple((0, 1) for _ in range(num_assets)) | |
| constraints = [ | |
| {'type': 'eq', 'fun': lambda x: np.sum(x) - 1} # weights sum to 1 | |
| ] | |
| # Optimize | |
| result = minimize( | |
| lambda w: self._portfolio_volatility(w), | |
| init_weights, | |
| method='SLSQP', | |
| bounds=bounds, | |
| constraints=constraints | |
| ) | |
| if not result.success: | |
| return { | |
| 'success': False, | |
| 'message': 'Optimization failed to converge' | |
| } | |
| optimal_weights = result.x | |
| portfolio_return = np.sum(self.mean_returns * optimal_weights) * 252 | |
| portfolio_volatility = self._portfolio_volatility(optimal_weights) | |
| sharpe_ratio = (portfolio_return - risk_free_rate) / portfolio_volatility | |
| return { | |
| 'success': True, | |
| 'weights': dict(zip(self.returns.columns, optimal_weights)), | |
| 'return': portfolio_return, | |
| 'volatility': portfolio_volatility, | |
| 'sharpe_ratio': sharpe_ratio | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'message': str(e) | |
| } | |
| def _portfolio_volatility(self, weights: np.ndarray) -> float: | |
| """Calculate portfolio volatility.""" | |
| return np.sqrt(np.dot(weights.T, np.dot(self.cov_matrix * 252, weights))) | |
| def get_efficient_frontier(self, num_portfolios: int = 100) -> List[Dict]: | |
| """Generate efficient frontier points.""" | |
| efficient_portfolios = [] | |
| min_ret = self.mean_returns.min() * 252 | |
| max_ret = self.mean_returns.max() * 252 | |
| for target in np.linspace(min_ret, max_ret, num_portfolios): | |
| result = self.optimize_portfolio(target) | |
| if result['success']: | |
| efficient_portfolios.append({ | |
| 'return': result['return'], | |
| 'volatility': result['volatility'], | |
| 'weights': result['weights'] | |
| }) | |
| return efficient_portfolios | |
| def _portfolio_return(self, weights: np.array) -> float: | |
| """Calculate portfolio return.""" | |
| return np.sum(self.mean_returns * weights) | |
| def _negative_sharpe(self, weights: np.array) -> float: | |
| """Calculate negative Sharpe ratio for minimization.""" | |
| returns, volatility, sharpe = self.portfolio_performance(weights) | |
| return -sharpe if volatility > 0 else 0 | |
| def plot_signals(data: pd.DataFrame, signals: pd.DataFrame, asset: str) -> go.Figure: | |
| """Create a plotly figure with price and signals.""" | |
| # Make sure data is properly indexed | |
| if not isinstance(data.index, pd.RangeIndex): | |
| data = data.reset_index() | |
| fig = go.Figure() | |
| # Add candlestick chart | |
| fig.add_trace(go.Candlestick( | |
| x=data['Date'], | |
| open=data['Open'], | |
| high=data['High'], | |
| low=data['Low'], | |
| close=data['Close'], | |
| name='Price' | |
| )) | |
| # Add buy signals | |
| buy_points = data[signals['Position'] == 1] | |
| if not buy_points.empty: | |
| fig.add_trace(go.Scatter( | |
| x=buy_points['Date'], | |
| y=buy_points['High'], | |
| mode='markers', | |
| name='Buy Signal', | |
| marker=dict( | |
| symbol='triangle-up', | |
| size=15, | |
| color='green' | |
| ) | |
| )) | |
| # Add sell signals | |
| sell_points = data[signals['Position'] == -1] | |
| if not sell_points.empty: | |
| fig.add_trace(go.Scatter( | |
| x=sell_points['Date'], | |
| y=sell_points['Low'], | |
| mode='markers', | |
| name='Sell Signal', | |
| marker=dict( | |
| symbol='triangle-down', | |
| size=15, | |
| color='red' | |
| ) | |
| )) | |
| # Update layout | |
| fig.update_layout( | |
| title=f'{asset} Trading Signals', | |
| xaxis_title='Date', | |
| yaxis_title='Price', | |
| template='plotly_dark' if st.session_state.theme == 'dark' else 'plotly_white', | |
| xaxis_rangeslider_visible=False, | |
| height=600, | |
| hovermode='x unified' | |
| ) | |
| return fig |