Streamlitstock / app.py
Method314's picture
Update app.py
8d93a7c verified
raw
history blame
24 kB
import streamlit as st
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import requests
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from catboost import CatBoostRegressor
import shap
import ta
import matplotlib.pyplot as plt
import warnings
import openai
warnings.filterwarnings('ignore')
# Initialize the OpenAI client
OPENAI_API_KEY = "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" # Replace with your actual OpenAI API key
openai.api_key = OPENAI_API_KEY
# Alpha Vantage API key
ALPHA_VANTAGE_API_KEY = "JK0DVDNTEYBTBP5L"
# GPT Assistant ID
ASSISTANT_ID = "asst_Fl3rRrRijb8FJDpqjBexfUBp"
# Custom CSS
st.markdown("""
<style>
.reportview-container {
background: linear-gradient(to bottom right, #10161e, #1f2937);
}
.main .block-container {
padding-top: 2rem;
padding-bottom: 2rem;
}
h1, h2, h3 {
color: #3db892;
}
.stButton > button {
color: white;
background-color: #3db892;
border-radius: 5px;
border: none;
padding: 0.5rem 1rem;
font-weight: bold;
transition: all 0.3s ease 0s;
}
.stButton > button:hover {
background-color: #2c8d6f;
}
.stTextInput > div > div > input,
.stDateInput > div > div > input {
background-color: #1f2937;
color: white;
border: 1px solid #3db892;
}
.stPlotlyChart {
background-color: #1f2937;
border-radius: 5px;
padding: 10px;
}
.css-1d391kg {
background-color: #1f2937;
}
.stDataFrame {
background-color: #1f2937;
}
.stTable {
background-color: #1f2937;
}
.css-1s0xp3b {
background-color: #1f2937;
border: 1px solid #3db892;
border-radius: 5px;
}
</style>
""", unsafe_allow_html=True)
def get_financial_data(ticker, end_date):
base_url = "https://www.alphavantage.co/query"
functions = ['INCOME_STATEMENT', 'BALANCE_SHEET', 'CASH_FLOW']
data = {}
for function in functions:
params = {
"function": function,
"symbol": ticker,
"apikey": ALPHA_VANTAGE_API_KEY
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
data[function] = response.json()
else:
raise Exception(f"Failed to fetch {function} data: {response.status_code}")
for function, content in data.items():
if 'quarterlyReports' in content:
content['quarterlyReports'] = [
report for report in content['quarterlyReports']
if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date
]
if 'annualReports' in content:
content['annualReports'] = [
report for report in content['annualReports']
if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date
]
return data
def get_earnings_dates(ticker):
url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}"
response = requests.get(url)
data = response.json()
earnings_dates = {}
for report in data.get('quarterlyEarnings', []):
fiscal_date = report['fiscalDateEnding']
reported_date = report['reportedDate']
earnings_dates[fiscal_date] = reported_date
return earnings_dates
def get_earnings_data(ticker):
url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}"
response = requests.get(url)
data = response.json()
quarterly_earnings = data.get('quarterlyEarnings', [])
df = pd.DataFrame(quarterly_earnings)
df['fiscalDateEnding'] = pd.to_datetime(df['fiscalDateEnding'])
df['reportedDate'] = pd.to_datetime(df['reportedDate'])
df = df.set_index('reportedDate')
numeric_columns = ['reportedEPS', 'estimatedEPS', 'surprise', 'surprisePercentage']
for col in numeric_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def process_financial_data(data, earnings_dates, earnings_data):
quarterly_data = {}
for statement_type, statement_data in data.items():
if 'quarterlyReports' in statement_data:
for report in statement_data['quarterlyReports']:
fiscal_date = report['fiscalDateEnding']
release_date = earnings_dates.get(fiscal_date, fiscal_date)
if release_date not in quarterly_data:
quarterly_data[release_date] = {}
quarterly_data[release_date].update({f"{statement_type}_{k}": v for k, v in report.items()})
df = pd.DataFrame.from_dict(quarterly_data, orient='index')
df.index = pd.to_datetime(df.index)
df = df.sort_index()
df = df.join(earnings_data, how='left')
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def get_stock_data(ticker, start_date, end_date):
df = yf.download(ticker, start=start_date, end=end_date)
df['Price_Pct_Change'] = df['Close'].pct_change()
df['RSI'] = ta.momentum.RSIIndicator(df['Close']).rsi()
df['WILLR'] = ta.momentum.WilliamsRIndicator(df['High'], df['Low'], df['Close']).williams_r()
bb = ta.volatility.BollingerBands(df['Close'])
df['BB_upper'] = bb.bollinger_hband()
df['BB_middle'] = bb.bollinger_mavg()
df['BB_lower'] = bb.bollinger_lband()
df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume()
df['ATR'] = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close']).average_true_range()
df['MACD'] = ta.trend.MACD(df['Close']).macd()
df['ADX'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close']).adx()
df['CCI'] = ta.trend.CCIIndicator(df['High'], df['Low'], df['Close']).cci()
indicator_columns = ['RSI', 'WILLR', 'BB_upper', 'BB_middle', 'BB_lower', 'OBV', 'ATR', 'MACD', 'ADX', 'CCI']
for column in indicator_columns:
df[f'{column}_ROC'] = df[column].pct_change()
return df
def add_financial_ratios(X):
def safe_divide(a, b):
return np.where(b != 0, a / b, np.nan)
X['PE_Ratio'] = safe_divide(X['BALANCE_SHEET_totalShareholderEquity'], X['INCOME_STATEMENT_netIncome'])
X['PB_Ratio'] = safe_divide(X['BALANCE_SHEET_totalAssets'], X['BALANCE_SHEET_totalShareholderEquity'])
X['Debt_to_Equity'] = safe_divide(X['BALANCE_SHEET_totalLiabilities'], X['BALANCE_SHEET_totalShareholderEquity'])
X['ROE'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalShareholderEquity'])
X['ROA'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalAssets'])
return X
def prepare_data(quarterly_df, stock_df, end_date):
quarterly_df.index = pd.to_datetime(quarterly_df.index).date
stock_df.index = pd.to_datetime(stock_df.index).date
quarterly_df = quarterly_df[quarterly_df.index <= end_date]
stock_df = stock_df[stock_df.index <= end_date]
start_date = min(quarterly_df.index.min(), stock_df.index.min())
all_dates = pd.date_range(start=start_date, end=end_date, freq='D').date
quarterly_df_reindexed = quarterly_df.reindex(all_dates).ffill()
stock_df_reindexed = stock_df.reindex(all_dates).ffill()
merged_df = pd.concat([stock_df_reindexed['Close'], quarterly_df_reindexed], axis=1)
merged_df = merged_df.dropna(subset=['Close'])
if merged_df.empty:
raise ValueError("No overlapping data between stock prices and financial statements.")
X = merged_df.drop('Close', axis=1)
y = merged_df['Close']
X = X.fillna(X.mean())
X['EPS_Surprise'] = X['reportedEPS'] - X['estimatedEPS']
X['EPS_Surprise_Percentage'] = X['surprisePercentage']
X = add_financial_ratios(X)
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = pd.DataFrame(scaler_X.fit_transform(X), columns=X.columns, index=X.index)
y_scaled = pd.Series(scaler_y.fit_transform(y.values.reshape(-1, 1)).flatten(), index=y.index)
return X_scaled, y_scaled, merged_df.index, scaler_X, scaler_y
def train_catboost_model(X_train, X_test, y_train, y_test):
model = CatBoostRegressor(
iterations=1000,
learning_rate=0.1,
depth=6,
loss_function='RMSE',
random_state=42,
verbose=100
)
model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=50)
return model
def evaluate_model(model, X_test, y_test, scaler_y):
y_pred_scaled = model.predict(X_test)
y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
y_test_unscaled = scaler_y.inverse_transform(y_test.values.reshape(-1, 1)).flatten()
mse = mean_squared_error(y_test_unscaled, y_pred)
r2 = r2_score(y_test_unscaled, y_pred)
return r2
def conformal_prediction(model, X_train, y_train, X_test, scaler_y, alpha=0.1):
model.fit(X_train, y_train)
y_pred_train = model.predict(X_train)
y_pred_train_unscaled = scaler_y.inverse_transform(y_pred_train.reshape(-1, 1)).flatten()
y_train_unscaled = scaler_y.inverse_transform(y_train.values.reshape(-1, 1)).flatten()
relative_errors = np.abs((y_train_unscaled - y_pred_train_unscaled) / y_pred_train_unscaled)
error_threshold = np.percentile(relative_errors, (1 - alpha) * 100)
y_pred_test = model.predict(X_test)
y_pred_test_unscaled = scaler_y.inverse_transform(y_pred_test.reshape(-1, 1)).flatten()
lower_bound_unscaled = y_pred_test_unscaled * (1 - error_threshold)
upper_bound_unscaled = y_pred_test_unscaled * (1 + error_threshold)
return y_pred_test_unscaled, lower_bound_unscaled, upper_bound_unscaled
def plot_results(dates, y, fair_values, lower_bound, upper_bound, scaler_y):
y_unscaled = scaler_y.inverse_transform(y.values.reshape(-1, 1)).flatten()
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02, row_heights=[0.7, 0.3])
fig.add_trace(go.Scatter(x=dates, y=y_unscaled, mode='lines', name='Actual Price', line=dict(color='blue')), row=1, col=1)
fig.add_trace(go.Scatter(x=dates, y=fair_values, mode='lines', name='Fair Value', line=dict(color='red')), row=1, col=1)
fig.add_trace(go.Scatter(x=dates, y=upper_bound, mode='lines', name='Upper Bound', line=dict(color='gray', width=0)), row=1, col=1)
fig.add_trace(go.Scatter(x=dates, y=lower_bound, mode='lines', name='Lower Bound', line=dict(color='gray', width=0), fill='tonexty'), row=1, col=1)
percent_error = ((fair_values - y_unscaled) / y_unscaled) * 100
fig.add_trace(go.Scatter(x=dates, y=percent_error, mode='lines', name='Percent Error', line=dict(color='purple')), row=2, col=1)
fig.update_layout(height=800, title_text="Stock Price, Fair Value, and Percent Error")
fig.update_xaxes(title_text="Date", row=2, col=1)
fig.update_yaxes(title_text="Price", row=1, col=1)
fig.update_yaxes(title_text="Percent Error", row=2, col=1)
return fig
def get_monthly_seasonality(ticker, start_date, end_date):
data = yf.download(ticker, start=start_date, end=end_date)
monthly_data = data['Adj Close'].resample('M').last()
monthly_returns = monthly_data.pct_change()
monthly_returns = monthly_returns.to_frame()
monthly_returns['Month'] = monthly_returns.index.month
seasonality = monthly_returns.groupby('Month')['Adj Close'].agg(['mean', 'median', 'count', lambda x: (x > 0).mean()])
seasonality.columns = ['Mean Change%', 'Median Change%', 'Count', 'Positive Periods']
return seasonality
def plot_monthly_seasonality(seasonality, ticker, start_date, end_date):
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
fig = go.Figure()
fig.add_trace(go.Bar(
x=months,
y=seasonality['Positive Periods'] * 100,
name='Positive Periods',
marker_color=['green' if x > 0.5 else 'red' for x in seasonality['Positive Periods']],
text=[f"{seasonality['Positive Periods'][i]*100:.1f}%<br>{seasonality['Mean Change%'][i]*100:.2f}%" for i in range(1, 13)],
textposition='auto'
))
fig.add_trace(go.Scatter(
x=months,
y=seasonality['Mean Change%'] * 100,
name='Mean Change%',
mode='lines+markers',
line=dict(color='yellow', width=2)
))
fig.update_layout(
title=f'Monthly Seasonality for {ticker}<br>{start_date} to {end_date}',
xaxis_title='Month',
yaxis_title='Percentage',
template='plotly_dark',
showlegend=True,
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
height=600,
margin=dict(l=50, r=50, t=100, b=50)
)
fig.add_hline(y=50, line_dash="dash", line_color="gray")
fig.add_hline(y=0, line_dash="dash", line_color="gray")
fig.update_yaxes(ticksuffix="%", range=[0, 100])
return fig
def prepare_financial_data_for_gpt(financial_data):
def format_financial_data(data, report_type):
formatted_data = f"{report_type} (Last 5 Years):\n"
if report_type in data:
reports = data[report_type].get('annualReports', [])[:5]
for report in reports:
formatted_data += f"Fiscal Date Ending: {report.get('fiscalDateEnding', 'N/A')}\n"
for key, value in report.items():
if key != 'fiscalDateEnding':
formatted_data += f"{key}: {value}\n"
formatted_data += "\n"
return formatted_data
income_statement = format_financial_data(financial_data, 'INCOME_STATEMENT')
balance_sheet = format_financial_data(financial_data, 'BALANCE_SHEET')
cash_flow = format_financial_data(financial_data, 'CASH_FLOW')
return f"{income_statement}\n{balance_sheet}\n{cash_flow}"
def get_gpt_analysis(ticker, financial_data):
formatted_data = prepare_financial_data_for_gpt(financial_data)
prompt = f"Analyze the following financial data for {ticker} and provide insights:\n\n{formatted_data}"
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a financial analyst."},
{"role": "user", "content": prompt}
],
max_tokens=500,
n=1,
stop=None,
temperature=0.5,
)
analysis = response.choices[0].message['content'].strip()
return analysis
except Exception as e:
st.error(f"OpenAI API error: {e}")
return "GPT Assistant analysis failed. Please check the API integration."
def plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date):
stock = yf.Ticker(ticker)
data = stock.history(start=start_date, end=end_date)
x = (data.index - data.index[0]).days
y = np.log(data['Close'])
slope, intercept = np.polyfit(x, y, 1)
future_days = 365 * 10
all_days = np.arange(len(x) + future_days)
log_trend = np.exp(intercept + slope * all_days)
inner_upper_band = log_trend * 2
inner_lower_band = log_trend / 2
outer_upper_band = log_trend * 4
outer_lower_band = log_trend / 4
extended_dates = pd.date_range(start=data.index[0], periods=len(all_days), freq='D')
fig = go.Figure()
fig.add_trace(go.Scatter(x=data.index, y=data['Close'], mode='lines', name='Close Price', line=dict(color='blue')))
fig.add_trace(go.Scatter(x=extended_dates, y=log_trend, mode='lines', name='Log Trend', line=dict(color='red')))
fig.add_trace(go.Scatter(x=extended_dates, y=inner_upper_band, mode='lines', name='Inner Upper Band', line=dict(color='green')))
fig.add_trace(go.Scatter(x=extended_dates, y=inner_lower_band, mode='lines', name='Inner Lower Band', line=dict(color='green')))
fig.add_trace(go.Scatter(x=extended_dates, y=outer_upper_band, mode='lines', name='Outer Upper Band', line=dict(color='orange')))
fig.add_trace(go.Scatter(x=extended_dates, y=outer_lower_band, mode='lines', name='Outer Lower Band', line=dict(color='orange')))
fig.update_layout(
title=f'{ticker} Stock Price (Logarithmic Scale) with Extended Trend Lines and Outer Bands',
xaxis_title='Date',
yaxis_title='Price (Log Scale)',
yaxis_type="log",
legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.8)'),
hovermode='x unified',
height=800
)
fig.update_xaxes(
rangeslider_visible=True,
rangeselector=dict(
buttons=list([
dict(count=1, label="1m", step="month", stepmode="backward"),
dict(count=6, label="6m", step="month", stepmode="backward"),
dict(count=1, label="YTD", step="year", stepmode="todate"),
dict(count=1, label="1y", step="year", stepmode="backward"),
dict(step="all")
])
)
)
return fig
def analyze_stock(ticker, start_date, end_date, use_ai_assistant):
try:
financial_data = get_financial_data(ticker, end_date)
earnings_dates = get_earnings_dates(ticker)
earnings_data = get_earnings_data(ticker)
quarterly_df = process_financial_data(financial_data, earnings_dates, earnings_data)
stock_df = get_stock_data(ticker, start_date, end_date)
if quarterly_df.empty:
st.error("No financial data available for processing.")
return None
X_scaled, y_scaled, dates, scaler_X, scaler_y = prepare_data(quarterly_df, stock_df, end_date)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)
model = train_catboost_model(X_train, X_test, y_train, y_test)
r2 = evaluate_model(model, X_test, y_test, scaler_y)
if r2 < 0.5:
st.warning("Model performance is poor. Results may not be reliable.")
fair_values, lower_bound, upper_bound = conformal_prediction(model, X_train, y_train, X_scaled, scaler_y)
fig = plot_results(dates, y_scaled, fair_values, lower_bound, upper_bound, scaler_y)
feature_importance = model.feature_importances_
feature_importance_df = pd.DataFrame({'feature': X_scaled.columns, 'importance': feature_importance})
feature_importance_df = feature_importance_df.sort_values('importance', ascending=False)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_scaled)
shap_fig, ax = plt.subplots(figsize=(10, 6))
shap.summary_plot(shap_values, X_scaled, plot_type="bar", show=False)
plt.title("SHAP Feature Importance")
plt.tight_layout()
seasonality = get_monthly_seasonality(ticker, start_date, end_date)
seasonality_fig = plot_monthly_seasonality(seasonality, ticker, start_date, end_date)
log_chart = plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date)
gpt_analysis = get_gpt_analysis(ticker, financial_data) if use_ai_assistant else "AI assistant analysis not requested."
latest_close = stock_df['Close'].iloc[-1]
latest_fair_value = fair_values[-1]
latest_lower_bound = lower_bound[-1]
latest_upper_bound = upper_bound[-1]
percentage_change = ((latest_fair_value - latest_close) / latest_close) * 100
fair_price_html = f"""
<h2 style="margin-bottom: 15px;">Fair Price Analysis</h2>
<p><strong>Current Price:</strong> ${latest_close:.2f}</p>
<p><strong>Estimated Fair Value:</strong> ${latest_fair_value:.2f}</p>
<p><strong>Price Prediction Range:</strong> ${latest_lower_bound:.2f} to ${latest_upper_bound:.2f}</p>
<p><strong>R-squared Score:</strong> {r2:.4f}</p>
<h3 style="margin-top: 20px;">Top 10 most important features for fair value prediction:</h3>
<pre>{feature_importance_df.head(10).to_string(index=False)}</pre>
"""
current_month = datetime.now().month
next_month = (current_month % 12) + 1
current_month_return = seasonality.loc[current_month, 'Mean Change%'] * 100
next_month_return = seasonality.loc[next_month, 'Mean Change%'] * 100
current_month_win_rate = seasonality.loc[current_month, 'Positive Periods'] * 100
next_month_win_rate = seasonality.loc[next_month, 'Positive Periods'] * 100
seasonality_html = f"""
<h2 style="margin-bottom: 15px;">Seasonality Analysis ({start_date} to {end_date})</h2>
<h3>Current month ({datetime.now().strftime('%B')}):</h3>
<p>Average return: {current_month_return:.2f}%</p>
<p>Probability of positive return: {current_month_win_rate:.1f}%</p>
<h3>Next month ({(datetime.now() + timedelta(days=31)).strftime('%B')}):</h3>
<p>Average return: {next_month_return:.2f}%</p>
<p>Probability of positive return: {next_month_win_rate:.1f}%</p>
"""
return {
'fair_price_html': fair_price_html,
'fig': fig,
'shap_fig': shap_fig,
'seasonality_fig': seasonality_fig,
'seasonality_html': seasonality_html,
'gpt_analysis': gpt_analysis,
'log_chart': log_chart,
'feature_importance_df': feature_importance_df.head(10),
'percentage_change': percentage_change
}
except Exception as e:
st.error(f"An error occurred: {str(e)}")
return None
def main():
st.title("Advanced Stock Analysis App")
st.markdown("Enter a stock ticker and date range to perform comprehensive stock analysis.")
col1, col2, col3, col4 = st.columns([2,2,2,1])
with col1:
ticker = st.text_input("Stock Ticker", value="MSFT")
with col2:
start_date = st.date_input("Start Date", value=datetime(2015, 1, 1))
with col3:
end_date = st.date_input("End Date", value=datetime.now())
with col4:
use_ai_assistant = st.checkbox("Use AI Assistant")
if st.button("Analyze Stock", key="analyze_button"):
with st.spinner('Analyzing stock data...'):
results = analyze_stock(ticker, start_date, end_date, use_ai_assistant)
if results:
st.header("Fair Price Analysis")
st.markdown(results['fair_price_html'], unsafe_allow_html=True)
st.subheader("Fair Price Prediction")
st.plotly_chart(results['fig'], use_container_width=True)
col1, col2 = st.columns(2)
with col1:
st.subheader("SHAP Feature Importance")
st.pyplot(results['shap_fig'])
with col2:
st.subheader("Top 10 Important Features")
st.dataframe(results['feature_importance_df'], height=400)
st.subheader("Monthly Seasonality")
st.plotly_chart(results['seasonality_fig'], use_container_width=True)
st.markdown(results['seasonality_html'], unsafe_allow_html=True)
if results['gpt_analysis'] != "AI assistant analysis not requested.":
st.subheader("AI Assistant Analysis")
st.text_area("Analysis", value=results['gpt_analysis'], height=300)
st.subheader("Logarithmic Stock Chart")
st.plotly_chart(results['log_chart'], use_container_width=True)
if __name__ == "__main__":
main()