Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| import pickle | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Custom unpickler to handle numpy version issues | |
| class SafeUnpickler(pickle.Unpickler): | |
| def find_class(self, module, name): | |
| if module == "numpy._core.multiarray" or module == "numpy.core.multiarray": | |
| return np.core.multiarray.scalar | |
| if module == "numpy": | |
| return getattr(np, name) | |
| return super().find_class(module, name) | |
| def load_model(): | |
| try: | |
| with open('portfolio_risk_model.pkl', 'rb') as f: | |
| model = SafeUnpickler(f).load() | |
| print("β Model loaded successfully!") | |
| return model | |
| except Exception as e: | |
| print(f"β Model loading error: {str(e)}") | |
| # Create model from scratch using yfinance data | |
| return create_model_from_scratch() | |
| def create_model_from_scratch(): | |
| """Create model directly from yfinance data""" | |
| print("π Creating model from scratch...") | |
| stocks = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA'] | |
| try: | |
| # Fetch fresh data | |
| data = yf.download(stocks, period='5y')['Adj Close'] | |
| returns = data.pct_change().dropna() | |
| # Calculate model parameters | |
| mean_returns = returns.mean().tolist() | |
| cov_matrix = returns.cov().values.tolist() | |
| volatilities = (returns.std() * np.sqrt(252)).tolist() | |
| # Calculate portfolio metrics | |
| portfolio_returns = returns.dot([0.2, 0.2, 0.2, 0.2, 0.2]) | |
| model = { | |
| 'mean_returns': mean_returns, | |
| 'covariance_matrix': cov_matrix, | |
| 'volatilities': volatilities, | |
| 'risk_metrics': { | |
| 'historical_var_95': float(np.percentile(portfolio_returns, 5)), | |
| 'cvar_95': float(portfolio_returns[portfolio_returns <= np.percentile(portfolio_returns, 5)].mean()), | |
| 'annual_mean_return': float(portfolio_returns.mean() * 252), | |
| 'annual_volatility': float(portfolio_returns.std() * np.sqrt(252)), | |
| 'sharpe_ratio': float((portfolio_returns.mean() * 252) / (portfolio_returns.std() * np.sqrt(252))) | |
| }, | |
| 'loss_probabilities': { | |
| 'probability_10_percent_loss': 0.15, # Placeholder | |
| 'probability_20_percent_loss': 0.08, # Placeholder | |
| 'probability_30_percent_loss': 0.03, # Placeholder | |
| 'expected_portfolio_value': 110.0, # Placeholder | |
| 'median_portfolio_value': 108.5, # Placeholder | |
| 'worst_case_5th_percentile': 85.0, # Placeholder | |
| 'best_case_95th_percentile': 135.0 # Placeholder | |
| }, | |
| 'tickers': stocks, | |
| 'training_date': datetime.now().strftime("%Y-%m-%d"), | |
| 'data_period': '5 years' | |
| } | |
| print("β Model created from scratch!") | |
| return model | |
| except Exception as e: | |
| print(f"β Error creating model: {str(e)}") | |
| return None | |
| class GradioRiskApp: | |
| def __init__(self): | |
| self.model = load_model() | |
| self.available_stocks = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA'] | |
| print(f"π― Available stocks: {self.available_stocks}") | |
| def run_monte_carlo(self, selected_stocks, days=252, simulations=5000): | |
| """Run Monte Carlo simulation""" | |
| try: | |
| if self.model is None: | |
| print("β No model available") | |
| return None | |
| if set(selected_stocks) != set(self.available_stocks): | |
| print("β Stocks don't match trained stocks") | |
| return None | |
| print("π― Running Monte Carlo simulation...") | |
| # Extract parameters from model | |
| mean_returns = np.array(self.model['mean_returns']) | |
| cov_matrix = np.array(self.model['covariance_matrix']) | |
| # Ensure covariance matrix is positive definite | |
| cov_matrix = self.make_positive_definite(cov_matrix) | |
| # Generate simulation | |
| np.random.seed(42) | |
| L = np.linalg.cholesky(cov_matrix) | |
| simulation_results = [] | |
| weights = np.array([0.2, 0.2, 0.2, 0.2, 0.2]) | |
| for i in range(simulations): | |
| # Generate correlated returns | |
| random_numbers = np.random.normal(0, 1, size=(days, 5)) | |
| correlated_returns = random_numbers @ L.T + mean_returns | |
| # Calculate portfolio path | |
| portfolio_value = 100.0 | |
| portfolio_path = [portfolio_value] | |
| for day in range(days): | |
| daily_return = np.dot(correlated_returns[day], weights) | |
| portfolio_value = portfolio_value * (1 + daily_return) | |
| portfolio_path.append(portfolio_value) | |
| simulation_results.append(portfolio_path) | |
| print(f"β Simulation completed with {len(simulation_results)} paths") | |
| return np.array(simulation_results) | |
| except Exception as e: | |
| print(f"β Simulation error: {str(e)}") | |
| return None | |
| def make_positive_definite(self, matrix): | |
| """Ensure covariance matrix is positive definite""" | |
| min_eig = np.min(np.real(np.linalg.eigvals(matrix))) | |
| if min_eig < 0: | |
| matrix -= 10 * min_eig * np.eye(*matrix.shape) | |
| return matrix | |
| def calculate_metrics(self, simulation_results): | |
| """Calculate risk metrics""" | |
| if simulation_results is None: | |
| return None | |
| final_values = simulation_results[:, -1] | |
| metrics = { | |
| 'expected_value': float(np.mean(final_values)), | |
| 'prob_10_loss': float(np.mean(final_values < 90)), | |
| 'prob_20_loss': float(np.mean(final_values < 80)), | |
| 'prob_30_loss': float(np.mean(final_values < 70)), | |
| 'var_95': float(np.percentile(final_values, 5)), | |
| 'best_case': float(np.percentile(final_values, 95)), | |
| 'worst_case': float(np.percentile(final_values, 5)) | |
| } | |
| return metrics | |
| def create_simulation_plot(self, simulation_results): | |
| """Create simulation plot""" | |
| if simulation_results is None: | |
| return None | |
| fig = go.Figure() | |
| # Sample paths | |
| for i in range(min(50, len(simulation_results))): | |
| fig.add_trace(go.Scatter( | |
| y=simulation_results[i], | |
| mode='lines', | |
| line=dict(width=1, color='lightblue'), | |
| opacity=0.1, | |
| showlegend=False | |
| )) | |
| # Mean path | |
| mean_path = np.mean(simulation_results, axis=0) | |
| fig.add_trace(go.Scatter( | |
| y=mean_path, | |
| mode='lines', | |
| line=dict(width=3, color='red'), | |
| name='Average Path' | |
| )) | |
| fig.update_layout( | |
| title="π Monte Carlo Simulation Paths", | |
| xaxis_title="Trading Days", | |
| yaxis_title="Portfolio Value ($)", | |
| height=400 | |
| ) | |
| return fig | |
| def create_distribution_plot(self, simulation_results, metrics): | |
| """Create distribution plot""" | |
| if simulation_results is None: | |
| return None | |
| final_values = simulation_results[:, -1] | |
| fig = px.histogram( | |
| x=final_values, | |
| nbins=50, | |
| title="π Portfolio Value Distribution", | |
| color_discrete_sequence=['#667eea'] | |
| ) | |
| if metrics: | |
| fig.add_vline(x=metrics['expected_value'], line_dash="dash", | |
| line_color="red", annotation_text=f"Mean: ${metrics['expected_value']:.2f}") | |
| fig.add_vline(x=metrics['var_95'], line_dash="dash", | |
| line_color="orange", annotation_text=f"5% VaR: ${metrics['var_95']:.2f}") | |
| fig.update_layout(height=400, showlegend=False) | |
| return fig | |
| def create_risk_gauge(self, metrics): | |
| """Create risk gauge""" | |
| if metrics is None: | |
| return None | |
| risk_prob = metrics['prob_10_loss'] * 100 | |
| fig = go.Figure(go.Indicator( | |
| mode="gauge+number", | |
| value=risk_prob, | |
| domain={'x': [0, 1], 'y': [0, 1]}, | |
| title={'text': "Probability of >10% Loss"}, | |
| gauge={ | |
| 'axis': {'range': [0, 50]}, | |
| 'bar': {'color': "darkblue"}, | |
| 'steps': [ | |
| {'range': [0, 15], 'color': "lightgreen"}, | |
| {'range': [15, 30], 'color': "yellow"}, | |
| {'range': [30, 50], 'color': "red"} | |
| ] | |
| } | |
| )) | |
| fig.update_layout(height=300) | |
| return fig | |
| def analyze_portfolio(self, selected_stocks, simulation_days, num_simulations): | |
| """Main analysis function""" | |
| print(f"π Analyzing: {selected_stocks}") | |
| if not selected_stocks: | |
| return self.create_error_html("Please select at least one stock"), None, None, None | |
| if set(selected_stocks) != set(self.available_stocks): | |
| return self.create_error_html(f"Please select exactly these 5 stocks: {', '.join(self.available_stocks)}"), None, None, None | |
| try: | |
| simulations = self.run_monte_carlo(selected_stocks, simulation_days, int(num_simulations)) | |
| if simulations is None: | |
| return self.create_error_html("Simulation failed"), None, None, None | |
| metrics = self.calculate_metrics(simulations) | |
| if metrics is None: | |
| return self.create_error_html("Metrics calculation failed"), None, None, None | |
| summary = self.create_summary_html(metrics, selected_stocks, simulation_days, num_simulations) | |
| sim_plot = self.create_simulation_plot(simulations) | |
| dist_plot = self.create_distribution_plot(simulations, metrics) | |
| gauge = self.create_risk_gauge(metrics) | |
| return summary, sim_plot, dist_plot, gauge | |
| except Exception as e: | |
| return self.create_error_html(f"Analysis error: {str(e)}"), None, None, None | |
| def create_error_html(self, message): | |
| """Create error message HTML""" | |
| return f""" | |
| <div style='text-align: center; padding: 40px; background: #f8d7da; border-radius: 10px;'> | |
| <h3 style='color: #721c24;'>β Error</h3> | |
| <p>{message}</p> | |
| </div> | |
| """ | |
| def create_summary_html(self, metrics, stocks, days, sims): | |
| """Create summary HTML""" | |
| risk_level = "LOW" if metrics['prob_10_loss'] < 0.1 else "MEDIUM" if metrics['prob_10_loss'] < 0.2 else "HIGH" | |
| risk_color = "#28a745" if risk_level == "LOW" else "#ffc107" if risk_level == "MEDIUM" else "#dc3545" | |
| return f""" | |
| <div style="padding: 25px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; color: white;"> | |
| <h2 style="text-align: center;">π Risk Analysis Report</h2> | |
| <div style="background: rgba(255,255,255,0.1); padding: 20px; border-radius: 10px; margin: 15px 0;"> | |
| <h3>Portfolio: {', '.join(stocks)}</h3> | |
| <p>Weights: 20% each | Period: {days} days | Simulations: {sims}</p> | |
| </div> | |
| <div style="display: grid; grid-template-columns: 1fr 1fr; gap: 15px;"> | |
| <div style="background: rgba(255,255,255,0.2); padding: 15px; border-radius: 8px;"> | |
| <h4>Expected Value</h4> | |
| <p style="font-size: 24px; font-weight: bold;">${metrics['expected_value']:.2f}</p> | |
| </div> | |
| <div style="background: rgba(255,255,255,0.2); padding: 15px; border-radius: 8px;"> | |
| <h4>Risk Level</h4> | |
| <p style="font-size: 24px; font-weight: bold; color: {risk_color};">{risk_level}</p> | |
| </div> | |
| <div style="background: rgba(255,255,255,0.2); padding: 15px; border-radius: 8px;"> | |
| <h4>10% Loss Probability</h4> | |
| <p style="font-size: 24px; font-weight: bold;">{metrics['prob_10_loss']*100:.1f}%</p> | |
| </div> | |
| <div style="background: rgba(255,255,255,0.2); padding: 15px; border-radius: 8px;"> | |
| <h4>Worst Case (5%)</h4> | |
| <p style="font-size: 24px; font-weight: bold;">${metrics['var_95']:.2f}</p> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| # Create and launch app | |
| app = GradioRiskApp() | |
| with gr.Blocks(theme=gr.themes.Soft(), title="QuantRisk Pro") as demo: | |
| gr.Markdown("# π QuantRisk Pro - Portfolio Risk Analyzer") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π§ Configuration") | |
| stocks = gr.CheckboxGroup( | |
| choices=app.available_stocks, | |
| value=app.available_stocks, | |
| label="Select Stocks (All 5 Required)" | |
| ) | |
| days = gr.Slider(30, 500, 252, label="Time Horizon (Days)") | |
| sims = gr.Dropdown([1000, 2500, 5000, 10000], value=5000, label="Simulations") | |
| btn = gr.Button("π Analyze", variant="primary") | |
| with gr.Column(scale=2): | |
| summary = gr.HTML() | |
| with gr.Row(): | |
| gauge = gr.Plot() | |
| sim_plot = gr.Plot() | |
| dist_plot = gr.Plot() | |
| btn.click( | |
| fn=app.analyze_portfolio, | |
| inputs=[stocks, days, sims], | |
| outputs=[summary, sim_plot, dist_plot, gauge] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |