Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.mixture import GaussianMixture | |
| import plotly.graph_objects as go | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| import random | |
| import gradio as gr | |
| def fetch_data(symbol, start_date, end_date): | |
| return yf.download(symbol, start=start_date, end=end_date) | |
| def calculate_beta(stock_returns, market_returns): | |
| if len(stock_returns) < 2 or len(market_returns) < 2: | |
| return np.nan, np.nan, np.nan | |
| covariance_matrix = np.cov(stock_returns, market_returns) | |
| beta = covariance_matrix[0, 1] / covariance_matrix[1, 1] | |
| return beta, covariance_matrix[0, 1], covariance_matrix[1, 1] | |
| def calculate_r_squared(stock_returns, market_returns): | |
| if len(stock_returns) < 2 or len(market_returns) < 2: | |
| return np.nan | |
| correlation_matrix = np.corrcoef(stock_returns, market_returns) | |
| correlation_xy = correlation_matrix[0, 1] | |
| r_squared = correlation_xy ** 2 | |
| return r_squared | |
| def align_data(stock_data, index_data): | |
| aligned_data = stock_data.join(index_data, how='inner', lsuffix='_stock', rsuffix='_index') | |
| return aligned_data | |
| def risk_level(beta): | |
| if beta < 0.5: | |
| return "Very Low Risk" | |
| elif beta < 1: | |
| return "Low Risk" | |
| elif beta < 1.5: | |
| return "Moderate Risk" | |
| elif beta < 2: | |
| return "High Risk" | |
| else: | |
| return "Very High Risk" | |
| def analyze_stocks(stocks_filepath, index_symbol, years=5): | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=365 * years) | |
| stock_symbols_df = pd.read_csv(stocks_filepath) | |
| # Filter out USD and GBP | |
| stock_symbols_df = stock_symbols_df[~stock_symbols_df['Symbol'].isin(['USD', 'GBP'])] | |
| stock_symbols = stock_symbols_df['Symbol'].tolist() | |
| index_data = fetch_data(index_symbol, start_date, end_date) | |
| index_data = index_data['Close'].to_frame(name='Close_index') | |
| betas = {} | |
| r_squared_values = {} | |
| latest_close_values = {} | |
| valid_stocks_count = 0 | |
| for symbol in stock_symbols: | |
| stock_data = fetch_data(symbol, start_date, end_date) | |
| stock_data = stock_data['Close'].to_frame(name='Close_stock') | |
| if not stock_data.empty: | |
| stock_returns = stock_data['Close_stock'].pct_change().dropna() | |
| market_returns = index_data['Close_index'].pct_change().dropna() | |
| aligned_data = align_data(stock_returns.to_frame(), market_returns.to_frame()) | |
| if not aligned_data['Close_stock'].empty and not aligned_data['Close_index'].empty: | |
| beta, _, _ = calculate_beta(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna()) | |
| if np.isfinite(beta): | |
| betas[symbol] = round(beta, 3) | |
| r_squared_values[symbol] = round(calculate_r_squared(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna()), 3) | |
| latest_close_values[symbol] = round(stock_data['Close_stock'].iloc[-1], 3) | |
| valid_stocks_count += 1 | |
| results_df = pd.DataFrame({ | |
| 'Symbol': list(betas.keys()), | |
| 'Name': [stock_symbols_df[stock_symbols_df['Symbol'] == symbol]['Name'].values[0] for symbol in betas.keys()], | |
| 'Beta': list(betas.values()), | |
| 'R-Squared': [r_squared_values[symbol] for symbol in betas.keys()], | |
| 'Latest Close': [latest_close_values[symbol] for symbol in betas.keys()] | |
| }).sort_values(by='Beta') | |
| results_df.dropna(inplace=True) | |
| features = results_df[['Beta', 'R-Squared']].values | |
| scaler = StandardScaler() | |
| features = scaler.fit_transform(features) | |
| optimal_clusters = 5 # You can implement a method to determine this dynamically | |
| gmm = GaussianMixture(n_components=optimal_clusters, random_state=42) | |
| cluster_labels = gmm.fit_predict(features) | |
| results_df['Cluster'] = cluster_labels | |
| results_df['Risk Level'] = results_df['Beta'].apply(risk_level) | |
| custom_colors = [f'rgb({random.random()}, {random.random()}, {random.random()})' for _ in range(optimal_clusters)] | |
| traces = [] | |
| cluster_probs = gmm.predict_proba(features) | |
| for cluster in sorted(results_df['Cluster'].unique()): | |
| cluster_df = results_df[results_df['Cluster'] == cluster] | |
| trace = go.Scatter(x=cluster_df['Beta'], y=cluster_df['R-Squared'], | |
| mode='markers', marker=dict(size=cluster_df['Latest Close'], | |
| sizeref=2. * max(cluster_df['Latest Close']) / (40. ** 2), | |
| sizemode='area'), | |
| hovertext=cluster_df['Symbol'] + '<br>' + cluster_df['Name'] + '<br>Beta: ' + cluster_df['Beta'].astype(str) + | |
| '<br>R-Squared: ' + cluster_df['R-Squared'].astype(str) + | |
| '<br>Latest Close Price: ' + cluster_df['Latest Close'].astype(str) + | |
| '<br>Risk Level: ' + cluster_df['Risk Level'] + | |
| '<br>Cluster Prob: ' + cluster_probs[cluster_df.index, cluster].round(3).astype(str), | |
| showlegend=False, | |
| marker_color=custom_colors[cluster % len(custom_colors)]) | |
| traces.append(trace) | |
| layout = go.Layout(title=f'S&P500 Index Stock Clustering based on Beta and R-Squared', | |
| xaxis=dict(title='β (Risk)', tickmode='linear', dtick=0.25, range=[0, 2.5]), | |
| yaxis=dict(title='R² (Market Dependency)'), | |
| width=1200, | |
| height=800) | |
| fig = go.Figure(data=traces, layout=layout) | |
| fig.add_trace(go.Scatter(x=[0, 0], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()], | |
| mode="lines", line=dict(color="black", width=2), showlegend=False)) | |
| fig.add_trace(go.Scatter(x=[1, 1], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()], | |
| mode="lines", line=dict(color="black", width=2), showlegend=False)) | |
| latest_date = end_date.strftime('%d/%m/%Y') | |
| fig.add_annotation(x=3.4, y=results_df['R-Squared'].max(), | |
| text=f"Valid Stocks: {valid_stocks_count} | Date: {latest_date}", | |
| showarrow=False, font=dict(size=14, color="black")) | |
| return fig | |
| def gradio_interface(years): | |
| stocks_filepath = "SP500_stock_list_Jan-1-2024.csv" | |
| index_symbol = "^GSPC" | |
| fig = analyze_stocks(stocks_filepath, index_symbol, years) | |
| return fig | |
| # Create the Gradio interface using Blocks | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# S&P 500 Stock Analysis") | |
| gr.Markdown("Analyze S&P 500 stocks based on Beta and R-Squared values.") | |
| with gr.Row(): | |
| years_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Years of Data") | |
| with gr.Row(): | |
| analyze_button = gr.Button("Analyze") | |
| with gr.Row(): | |
| output_plot = gr.Plot(elem_id="large-plot") | |
| analyze_button.click(fn=gradio_interface, inputs=years_slider, outputs=output_plot) | |
| # Add custom CSS to make the plot larger | |
| gr.HTML(""" | |
| <style> | |
| #large-plot { | |
| height: 800px !important; | |
| width: 1200px !important; | |
| } | |
| </style> | |
| """) | |
| # Launch the interface | |
| iface.launch() |