import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.mixture import GaussianMixture import plotly.graph_objects as go import yfinance as yf from datetime import datetime, timedelta import random import gradio as gr def fetch_data(symbol, start_date, end_date): return yf.download(symbol, start=start_date, end=end_date) def calculate_beta(stock_returns, market_returns): if len(stock_returns) < 2 or len(market_returns) < 2: return np.nan, np.nan, np.nan covariance_matrix = np.cov(stock_returns, market_returns) beta = covariance_matrix[0, 1] / covariance_matrix[1, 1] return beta, covariance_matrix[0, 1], covariance_matrix[1, 1] def calculate_r_squared(stock_returns, market_returns): if len(stock_returns) < 2 or len(market_returns) < 2: return np.nan correlation_matrix = np.corrcoef(stock_returns, market_returns) correlation_xy = correlation_matrix[0, 1] r_squared = correlation_xy ** 2 return r_squared def align_data(stock_data, index_data): aligned_data = stock_data.join(index_data, how='inner', lsuffix='_stock', rsuffix='_index') return aligned_data def risk_level(beta): if beta < 0.5: return "Very Low Risk" elif beta < 1: return "Low Risk" elif beta < 1.5: return "Moderate Risk" elif beta < 2: return "High Risk" else: return "Very High Risk" def analyze_stocks(stocks_filepath, index_symbol, years=5): end_date = datetime.now() start_date = end_date - timedelta(days=365 * years) stock_symbols_df = pd.read_csv(stocks_filepath) # Filter out USD and GBP stock_symbols_df = stock_symbols_df[~stock_symbols_df['Symbol'].isin(['USD', 'GBP'])] stock_symbols = stock_symbols_df['Symbol'].tolist() index_data = fetch_data(index_symbol, start_date, end_date) index_data = index_data['Close'].to_frame(name='Close_index') betas = {} r_squared_values = {} latest_close_values = {} valid_stocks_count = 0 for symbol in stock_symbols: stock_data = fetch_data(symbol, start_date, end_date) stock_data = stock_data['Close'].to_frame(name='Close_stock') if not stock_data.empty: stock_returns = stock_data['Close_stock'].pct_change().dropna() market_returns = index_data['Close_index'].pct_change().dropna() aligned_data = align_data(stock_returns.to_frame(), market_returns.to_frame()) if not aligned_data['Close_stock'].empty and not aligned_data['Close_index'].empty: beta, _, _ = calculate_beta(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna()) if np.isfinite(beta): betas[symbol] = round(beta, 3) r_squared_values[symbol] = round(calculate_r_squared(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna()), 3) latest_close_values[symbol] = round(stock_data['Close_stock'].iloc[-1], 3) valid_stocks_count += 1 results_df = pd.DataFrame({ 'Symbol': list(betas.keys()), 'Name': [stock_symbols_df[stock_symbols_df['Symbol'] == symbol]['Name'].values[0] for symbol in betas.keys()], 'Beta': list(betas.values()), 'R-Squared': [r_squared_values[symbol] for symbol in betas.keys()], 'Latest Close': [latest_close_values[symbol] for symbol in betas.keys()] }).sort_values(by='Beta') results_df.dropna(inplace=True) features = results_df[['Beta', 'R-Squared']].values scaler = StandardScaler() features = scaler.fit_transform(features) optimal_clusters = 5 # You can implement a method to determine this dynamically gmm = GaussianMixture(n_components=optimal_clusters, random_state=42) cluster_labels = gmm.fit_predict(features) results_df['Cluster'] = cluster_labels results_df['Risk Level'] = results_df['Beta'].apply(risk_level) custom_colors = [f'rgb({random.random()}, {random.random()}, {random.random()})' for _ in range(optimal_clusters)] traces = [] cluster_probs = gmm.predict_proba(features) for cluster in sorted(results_df['Cluster'].unique()): cluster_df = results_df[results_df['Cluster'] == cluster] trace = go.Scatter(x=cluster_df['Beta'], y=cluster_df['R-Squared'], mode='markers', marker=dict(size=cluster_df['Latest Close'], sizeref=2. * max(cluster_df['Latest Close']) / (40. ** 2), sizemode='area'), hovertext=cluster_df['Symbol'] + '
' + cluster_df['Name'] + '
Beta: ' + cluster_df['Beta'].astype(str) + '
R-Squared: ' + cluster_df['R-Squared'].astype(str) + '
Latest Close Price: ' + cluster_df['Latest Close'].astype(str) + '
Risk Level: ' + cluster_df['Risk Level'] + '
Cluster Prob: ' + cluster_probs[cluster_df.index, cluster].round(3).astype(str), showlegend=False, marker_color=custom_colors[cluster % len(custom_colors)]) traces.append(trace) layout = go.Layout(title=f'S&P500 Index Stock Clustering based on Beta and R-Squared', xaxis=dict(title='β (Risk)', tickmode='linear', dtick=0.25, range=[0, 2.5]), yaxis=dict(title='R² (Market Dependency)'), width=1200, height=800) fig = go.Figure(data=traces, layout=layout) fig.add_trace(go.Scatter(x=[0, 0], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()], mode="lines", line=dict(color="black", width=2), showlegend=False)) fig.add_trace(go.Scatter(x=[1, 1], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()], mode="lines", line=dict(color="black", width=2), showlegend=False)) latest_date = end_date.strftime('%d/%m/%Y') fig.add_annotation(x=3.4, y=results_df['R-Squared'].max(), text=f"Valid Stocks: {valid_stocks_count} | Date: {latest_date}", showarrow=False, font=dict(size=14, color="black")) return fig def gradio_interface(years): stocks_filepath = "SP500_stock_list_Jan-1-2024.csv" index_symbol = "^GSPC" fig = analyze_stocks(stocks_filepath, index_symbol, years) return fig # Create the Gradio interface using Blocks with gr.Blocks() as iface: gr.Markdown("# S&P 500 Stock Analysis") gr.Markdown("Analyze S&P 500 stocks based on Beta and R-Squared values.") with gr.Row(): years_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Years of Data") with gr.Row(): analyze_button = gr.Button("Analyze") with gr.Row(): output_plot = gr.Plot(elem_id="large-plot") analyze_button.click(fn=gradio_interface, inputs=years_slider, outputs=output_plot) # Add custom CSS to make the plot larger gr.HTML(""" """) # Launch the interface iface.launch()