Spaces:
Runtime error
Runtime error
File size: 7,402 Bytes
ce1e937 e6aa77e 16ce86d e6aa77e 16ce86d ce1e937 a176b34 16ce86d a176b34 16ce86d a176b34 16ce86d a176b34 6b0183e 16ce86d a176b34 5f085da 6b0183e aeac5dc 6b0183e 5f085da 6b0183e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.mixture import GaussianMixture
import plotly.graph_objects as go
import yfinance as yf
from datetime import datetime, timedelta
import random
import gradio as gr
def fetch_data(symbol, start_date, end_date):
return yf.download(symbol, start=start_date, end=end_date)
def calculate_beta(stock_returns, market_returns):
if len(stock_returns) < 2 or len(market_returns) < 2:
return np.nan, np.nan, np.nan
covariance_matrix = np.cov(stock_returns, market_returns)
beta = covariance_matrix[0, 1] / covariance_matrix[1, 1]
return beta, covariance_matrix[0, 1], covariance_matrix[1, 1]
def calculate_r_squared(stock_returns, market_returns):
if len(stock_returns) < 2 or len(market_returns) < 2:
return np.nan
correlation_matrix = np.corrcoef(stock_returns, market_returns)
correlation_xy = correlation_matrix[0, 1]
r_squared = correlation_xy ** 2
return r_squared
def align_data(stock_data, index_data):
aligned_data = stock_data.join(index_data, how='inner', lsuffix='_stock', rsuffix='_index')
return aligned_data
def risk_level(beta):
if beta < 0.5:
return "Very Low Risk"
elif beta < 1:
return "Low Risk"
elif beta < 1.5:
return "Moderate Risk"
elif beta < 2:
return "High Risk"
else:
return "Very High Risk"
def analyze_stocks(stocks_filepath, index_symbol, years=5):
end_date = datetime.now()
start_date = end_date - timedelta(days=365 * years)
stock_symbols_df = pd.read_csv(stocks_filepath)
# Filter out USD and GBP
stock_symbols_df = stock_symbols_df[~stock_symbols_df['Symbol'].isin(['USD', 'GBP'])]
stock_symbols = stock_symbols_df['Symbol'].tolist()
index_data = fetch_data(index_symbol, start_date, end_date)
index_data = index_data['Close'].to_frame(name='Close_index')
betas = {}
r_squared_values = {}
latest_close_values = {}
valid_stocks_count = 0
for symbol in stock_symbols:
stock_data = fetch_data(symbol, start_date, end_date)
stock_data = stock_data['Close'].to_frame(name='Close_stock')
if not stock_data.empty:
stock_returns = stock_data['Close_stock'].pct_change().dropna()
market_returns = index_data['Close_index'].pct_change().dropna()
aligned_data = align_data(stock_returns.to_frame(), market_returns.to_frame())
if not aligned_data['Close_stock'].empty and not aligned_data['Close_index'].empty:
beta, _, _ = calculate_beta(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna())
if np.isfinite(beta):
betas[symbol] = round(beta, 3)
r_squared_values[symbol] = round(calculate_r_squared(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna()), 3)
latest_close_values[symbol] = round(stock_data['Close_stock'].iloc[-1], 3)
valid_stocks_count += 1
results_df = pd.DataFrame({
'Symbol': list(betas.keys()),
'Name': [stock_symbols_df[stock_symbols_df['Symbol'] == symbol]['Name'].values[0] for symbol in betas.keys()],
'Beta': list(betas.values()),
'R-Squared': [r_squared_values[symbol] for symbol in betas.keys()],
'Latest Close': [latest_close_values[symbol] for symbol in betas.keys()]
}).sort_values(by='Beta')
results_df.dropna(inplace=True)
features = results_df[['Beta', 'R-Squared']].values
scaler = StandardScaler()
features = scaler.fit_transform(features)
optimal_clusters = 5 # You can implement a method to determine this dynamically
gmm = GaussianMixture(n_components=optimal_clusters, random_state=42)
cluster_labels = gmm.fit_predict(features)
results_df['Cluster'] = cluster_labels
results_df['Risk Level'] = results_df['Beta'].apply(risk_level)
custom_colors = [f'rgb({random.random()}, {random.random()}, {random.random()})' for _ in range(optimal_clusters)]
traces = []
cluster_probs = gmm.predict_proba(features)
for cluster in sorted(results_df['Cluster'].unique()):
cluster_df = results_df[results_df['Cluster'] == cluster]
trace = go.Scatter(x=cluster_df['Beta'], y=cluster_df['R-Squared'],
mode='markers', marker=dict(size=cluster_df['Latest Close'],
sizeref=2. * max(cluster_df['Latest Close']) / (40. ** 2),
sizemode='area'),
hovertext=cluster_df['Symbol'] + '<br>' + cluster_df['Name'] + '<br>Beta: ' + cluster_df['Beta'].astype(str) +
'<br>R-Squared: ' + cluster_df['R-Squared'].astype(str) +
'<br>Latest Close Price: ' + cluster_df['Latest Close'].astype(str) +
'<br>Risk Level: ' + cluster_df['Risk Level'] +
'<br>Cluster Prob: ' + cluster_probs[cluster_df.index, cluster].round(3).astype(str),
showlegend=False,
marker_color=custom_colors[cluster % len(custom_colors)])
traces.append(trace)
layout = go.Layout(title=f'S&P500 Index Stock Clustering based on Beta and R-Squared',
xaxis=dict(title='β (Risk)', tickmode='linear', dtick=0.25, range=[0, 2.5]),
yaxis=dict(title='R² (Market Dependency)'),
width=1200,
height=800)
fig = go.Figure(data=traces, layout=layout)
fig.add_trace(go.Scatter(x=[0, 0], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()],
mode="lines", line=dict(color="black", width=2), showlegend=False))
fig.add_trace(go.Scatter(x=[1, 1], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()],
mode="lines", line=dict(color="black", width=2), showlegend=False))
latest_date = end_date.strftime('%d/%m/%Y')
fig.add_annotation(x=3.4, y=results_df['R-Squared'].max(),
text=f"Valid Stocks: {valid_stocks_count} | Date: {latest_date}",
showarrow=False, font=dict(size=14, color="black"))
return fig
def gradio_interface(years):
stocks_filepath = "SP500_stock_list_Jan-1-2024.csv"
index_symbol = "^GSPC"
fig = analyze_stocks(stocks_filepath, index_symbol, years)
return fig
# Create the Gradio interface using Blocks
with gr.Blocks() as iface:
gr.Markdown("# S&P 500 Stock Analysis")
gr.Markdown("Analyze S&P 500 stocks based on Beta and R-Squared values.")
with gr.Row():
years_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Years of Data")
with gr.Row():
analyze_button = gr.Button("Analyze")
with gr.Row():
output_plot = gr.Plot(elem_id="large-plot")
analyze_button.click(fn=gradio_interface, inputs=years_slider, outputs=output_plot)
# Add custom CSS to make the plot larger
gr.HTML("""
<style>
#large-plot {
height: 800px !important;
width: 1200px !important;
}
</style>
""")
# Launch the interface
iface.launch() |