reab5555's picture
Update app.py
16ce86d verified
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.mixture import GaussianMixture
import plotly.graph_objects as go
import yfinance as yf
from datetime import datetime, timedelta
import random
import gradio as gr
def fetch_data(symbol, start_date, end_date):
return yf.download(symbol, start=start_date, end=end_date)
def calculate_beta(stock_returns, market_returns):
if len(stock_returns) < 2 or len(market_returns) < 2:
return np.nan, np.nan, np.nan
covariance_matrix = np.cov(stock_returns, market_returns)
beta = covariance_matrix[0, 1] / covariance_matrix[1, 1]
return beta, covariance_matrix[0, 1], covariance_matrix[1, 1]
def calculate_r_squared(stock_returns, market_returns):
if len(stock_returns) < 2 or len(market_returns) < 2:
return np.nan
correlation_matrix = np.corrcoef(stock_returns, market_returns)
correlation_xy = correlation_matrix[0, 1]
r_squared = correlation_xy ** 2
return r_squared
def align_data(stock_data, index_data):
aligned_data = stock_data.join(index_data, how='inner', lsuffix='_stock', rsuffix='_index')
return aligned_data
def risk_level(beta):
if beta < 0.5:
return "Very Low Risk"
elif beta < 1:
return "Low Risk"
elif beta < 1.5:
return "Moderate Risk"
elif beta < 2:
return "High Risk"
else:
return "Very High Risk"
def analyze_stocks(stocks_filepath, index_symbol, years=5):
end_date = datetime.now()
start_date = end_date - timedelta(days=365 * years)
stock_symbols_df = pd.read_csv(stocks_filepath)
# Filter out USD and GBP
stock_symbols_df = stock_symbols_df[~stock_symbols_df['Symbol'].isin(['USD', 'GBP'])]
stock_symbols = stock_symbols_df['Symbol'].tolist()
index_data = fetch_data(index_symbol, start_date, end_date)
index_data = index_data['Close'].to_frame(name='Close_index')
betas = {}
r_squared_values = {}
latest_close_values = {}
valid_stocks_count = 0
for symbol in stock_symbols:
stock_data = fetch_data(symbol, start_date, end_date)
stock_data = stock_data['Close'].to_frame(name='Close_stock')
if not stock_data.empty:
stock_returns = stock_data['Close_stock'].pct_change().dropna()
market_returns = index_data['Close_index'].pct_change().dropna()
aligned_data = align_data(stock_returns.to_frame(), market_returns.to_frame())
if not aligned_data['Close_stock'].empty and not aligned_data['Close_index'].empty:
beta, _, _ = calculate_beta(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna())
if np.isfinite(beta):
betas[symbol] = round(beta, 3)
r_squared_values[symbol] = round(calculate_r_squared(aligned_data['Close_stock'].dropna(), aligned_data['Close_index'].dropna()), 3)
latest_close_values[symbol] = round(stock_data['Close_stock'].iloc[-1], 3)
valid_stocks_count += 1
results_df = pd.DataFrame({
'Symbol': list(betas.keys()),
'Name': [stock_symbols_df[stock_symbols_df['Symbol'] == symbol]['Name'].values[0] for symbol in betas.keys()],
'Beta': list(betas.values()),
'R-Squared': [r_squared_values[symbol] for symbol in betas.keys()],
'Latest Close': [latest_close_values[symbol] for symbol in betas.keys()]
}).sort_values(by='Beta')
results_df.dropna(inplace=True)
features = results_df[['Beta', 'R-Squared']].values
scaler = StandardScaler()
features = scaler.fit_transform(features)
optimal_clusters = 5 # You can implement a method to determine this dynamically
gmm = GaussianMixture(n_components=optimal_clusters, random_state=42)
cluster_labels = gmm.fit_predict(features)
results_df['Cluster'] = cluster_labels
results_df['Risk Level'] = results_df['Beta'].apply(risk_level)
custom_colors = [f'rgb({random.random()}, {random.random()}, {random.random()})' for _ in range(optimal_clusters)]
traces = []
cluster_probs = gmm.predict_proba(features)
for cluster in sorted(results_df['Cluster'].unique()):
cluster_df = results_df[results_df['Cluster'] == cluster]
trace = go.Scatter(x=cluster_df['Beta'], y=cluster_df['R-Squared'],
mode='markers', marker=dict(size=cluster_df['Latest Close'],
sizeref=2. * max(cluster_df['Latest Close']) / (40. ** 2),
sizemode='area'),
hovertext=cluster_df['Symbol'] + '<br>' + cluster_df['Name'] + '<br>Beta: ' + cluster_df['Beta'].astype(str) +
'<br>R-Squared: ' + cluster_df['R-Squared'].astype(str) +
'<br>Latest Close Price: ' + cluster_df['Latest Close'].astype(str) +
'<br>Risk Level: ' + cluster_df['Risk Level'] +
'<br>Cluster Prob: ' + cluster_probs[cluster_df.index, cluster].round(3).astype(str),
showlegend=False,
marker_color=custom_colors[cluster % len(custom_colors)])
traces.append(trace)
layout = go.Layout(title=f'S&P500 Index Stock Clustering based on Beta and R-Squared',
xaxis=dict(title='β (Risk)', tickmode='linear', dtick=0.25, range=[0, 2.5]),
yaxis=dict(title='R² (Market Dependency)'),
width=1200,
height=800)
fig = go.Figure(data=traces, layout=layout)
fig.add_trace(go.Scatter(x=[0, 0], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()],
mode="lines", line=dict(color="black", width=2), showlegend=False))
fig.add_trace(go.Scatter(x=[1, 1], y=[results_df['R-Squared'].min(), results_df['R-Squared'].max()],
mode="lines", line=dict(color="black", width=2), showlegend=False))
latest_date = end_date.strftime('%d/%m/%Y')
fig.add_annotation(x=3.4, y=results_df['R-Squared'].max(),
text=f"Valid Stocks: {valid_stocks_count} | Date: {latest_date}",
showarrow=False, font=dict(size=14, color="black"))
return fig
def gradio_interface(years):
stocks_filepath = "SP500_stock_list_Jan-1-2024.csv"
index_symbol = "^GSPC"
fig = analyze_stocks(stocks_filepath, index_symbol, years)
return fig
# Create the Gradio interface using Blocks
with gr.Blocks() as iface:
gr.Markdown("# S&P 500 Stock Analysis")
gr.Markdown("Analyze S&P 500 stocks based on Beta and R-Squared values.")
with gr.Row():
years_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Years of Data")
with gr.Row():
analyze_button = gr.Button("Analyze")
with gr.Row():
output_plot = gr.Plot(elem_id="large-plot")
analyze_button.click(fn=gradio_interface, inputs=years_slider, outputs=output_plot)
# Add custom CSS to make the plot larger
gr.HTML("""
<style>
#large-plot {
height: 800px !important;
width: 1200px !important;
}
</style>
""")
# Launch the interface
iface.launch()