Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import yfinance as yf | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from datetime import datetime, timedelta | |
| import requests | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| from sklearn.preprocessing import StandardScaler | |
| from catboost import CatBoostRegressor | |
| import shap | |
| import ta | |
| import matplotlib.pyplot as plt | |
| import warnings | |
| import colorsys | |
| import openai | |
| warnings.filterwarnings('ignore') | |
| OPENAI_API_KEY = "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" # Replace with your actual OpenAI API key | |
| openai.api_key = OPENAI_API_KEY | |
| # Alpha Vantage API key | |
| ALPHA_VANTAGE_API_KEY = "JK0DVDNTEYBTBP5L" | |
| def adjust_color_intensity(base_color, percentage): | |
| r = int(base_color[1:3], 16) / 255.0 | |
| g = int(base_color[3:5], 16) / 255.0 | |
| b = int(base_color[5:7], 16) / 255.0 | |
| h, l, s = colorsys.rgb_to_hls(r, g, b) | |
| l = max(0, min(1, l - (abs(percentage) / 100 * 0.5))) | |
| r, g, b = colorsys.hls_to_rgb(h, l, s) | |
| return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}" | |
| def create_color_box(text, background_color, percentage): | |
| adjusted_color = adjust_color_intensity(background_color, percentage) | |
| return f""" | |
| <div style="background-color: {adjusted_color}; padding: 20px; border-radius: 10px; margin-bottom: 20px; font-size: 16px; line-height: 1.6; display: flex; justify-content: space-between; align-items: center;"> | |
| <div style="flex: 1;"> | |
| {text} | |
| </div> | |
| <div style="flex: 0 0 40%; display: flex; justify-content: center; align-items: center;"> | |
| <div style="font-size: 72px; font-weight: bold; color: {'#006400' if percentage >= 0 else '#8B0000'};"> | |
| {'+' if percentage >= 0 else ''}{percentage:.2f}% | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| def create_gradient_box(text, start_color, end_color, start_percentage, end_percentage): | |
| adjusted_start_color = adjust_color_intensity(start_color, start_percentage) | |
| adjusted_end_color = adjust_color_intensity(end_color, end_percentage) | |
| return f""" | |
| <div style="background: linear-gradient(to right, {adjusted_start_color}, {adjusted_end_color}); padding: 20px; border-radius: 10px; margin-bottom: 20px; font-size: 16px; line-height: 1.6;"> | |
| {text} | |
| </div> | |
| """ | |
| def get_financial_data(ticker, end_date): | |
| base_url = "https://www.alphavantage.co/query" | |
| functions = ['INCOME_STATEMENT', 'BALANCE_SHEET', 'CASH_FLOW'] | |
| data = {} | |
| for function in functions: | |
| params = { | |
| "function": function, | |
| "symbol": ticker, | |
| "apikey": ALPHA_VANTAGE_API_KEY | |
| } | |
| response = requests.get(base_url, params=params) | |
| if response.status_code == 200: | |
| data[function] = response.json() | |
| else: | |
| raise Exception(f"Failed to fetch {function} data: {response.status_code}") | |
| for function, content in data.items(): | |
| if 'quarterlyReports' in content: | |
| content['quarterlyReports'] = [ | |
| report for report in content['quarterlyReports'] | |
| if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date | |
| ] | |
| if 'annualReports' in content: | |
| content['annualReports'] = [ | |
| report for report in content['annualReports'] | |
| if datetime.strptime(report['fiscalDateEnding'], '%Y-%m-%d').date() <= end_date | |
| ] | |
| return data | |
| def get_earnings_dates(ticker): | |
| url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}" | |
| response = requests.get(url) | |
| data = response.json() | |
| earnings_dates = {} | |
| for report in data.get('quarterlyEarnings', []): | |
| fiscal_date = report['fiscalDateEnding'] | |
| reported_date = report['reportedDate'] | |
| earnings_dates[fiscal_date] = reported_date | |
| return earnings_dates | |
| def get_earnings_data(ticker): | |
| url = f"https://www.alphavantage.co/query?function=EARNINGS&symbol={ticker}&apikey={ALPHA_VANTAGE_API_KEY}" | |
| response = requests.get(url) | |
| data = response.json() | |
| quarterly_earnings = data.get('quarterlyEarnings', []) | |
| df = pd.DataFrame(quarterly_earnings) | |
| df['fiscalDateEnding'] = pd.to_datetime(df['fiscalDateEnding']) | |
| df['reportedDate'] = pd.to_datetime(df['reportedDate']) | |
| df = df.set_index('reportedDate') | |
| numeric_columns = ['reportedEPS', 'estimatedEPS', 'surprise', 'surprisePercentage'] | |
| for col in numeric_columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| return df | |
| def process_financial_data(data, earnings_dates, earnings_data): | |
| quarterly_data = {} | |
| for statement_type, statement_data in data.items(): | |
| if 'quarterlyReports' in statement_data: | |
| for report in statement_data['quarterlyReports']: | |
| fiscal_date = report['fiscalDateEnding'] | |
| release_date = earnings_dates.get(fiscal_date, fiscal_date) | |
| if release_date not in quarterly_data: | |
| quarterly_data[release_date] = {} | |
| quarterly_data[release_date].update({f"{statement_type}_{k}": v for k, v in report.items()}) | |
| df = pd.DataFrame.from_dict(quarterly_data, orient='index') | |
| df.index = pd.to_datetime(df.index) | |
| df = df.sort_index() | |
| df = df.join(earnings_data, how='left') | |
| for col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| return df | |
| def get_stock_data(ticker, start_date, end_date): | |
| df = yf.download(ticker, start=start_date, end=end_date) | |
| df['Price_Pct_Change'] = df['Close'].pct_change() | |
| df['RSI'] = ta.momentum.RSIIndicator(df['Close']).rsi() | |
| df['WILLR'] = ta.momentum.WilliamsRIndicator(df['High'], df['Low'], df['Close']).williams_r() | |
| bb = ta.volatility.BollingerBands(df['Close']) | |
| df['BB_upper'] = bb.bollinger_hband() | |
| df['BB_middle'] = bb.bollinger_mavg() | |
| df['BB_lower'] = bb.bollinger_lband() | |
| df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume() | |
| df['ATR'] = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close']).average_true_range() | |
| df['MACD'] = ta.trend.MACD(df['Close']).macd() | |
| df['ADX'] = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close']).adx() | |
| df['CCI'] = ta.trend.CCIIndicator(df['High'], df['Low'], df['Close']).cci() | |
| indicator_columns = ['RSI', 'WILLR', 'BB_upper', 'BB_middle', 'BB_lower', 'OBV', 'ATR', 'MACD', 'ADX', 'CCI'] | |
| for column in indicator_columns: | |
| df[f'{column}_ROC'] = df[column].pct_change() | |
| return df | |
| def add_financial_ratios(X): | |
| print("Adding financial ratios...") | |
| def safe_divide(a, b): | |
| return np.where(b != 0, a / b, np.nan) | |
| X['PE_Ratio'] = safe_divide(X['BALANCE_SHEET_totalShareholderEquity'], X['INCOME_STATEMENT_netIncome']) | |
| X['PB_Ratio'] = safe_divide(X['BALANCE_SHEET_totalAssets'], X['BALANCE_SHEET_totalShareholderEquity']) | |
| X['Debt_to_Equity'] = safe_divide(X['BALANCE_SHEET_totalLiabilities'], X['BALANCE_SHEET_totalShareholderEquity']) | |
| X['ROE'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalShareholderEquity']) | |
| X['ROA'] = safe_divide(X['INCOME_STATEMENT_netIncome'], X['BALANCE_SHEET_totalAssets']) | |
| print("Financial ratios added.") | |
| return X | |
| def prepare_data(quarterly_df, stock_df, end_date): | |
| print("Starting data preparation...") | |
| print(f"Initial quarterly_df shape: {quarterly_df.shape}") | |
| print(f"Initial stock_df shape: {stock_df.shape}") | |
| quarterly_df.index = pd.to_datetime(quarterly_df.index).date | |
| stock_df.index = pd.to_datetime(stock_df.index).date | |
| quarterly_df = quarterly_df[quarterly_df.index <= end_date] | |
| stock_df = stock_df[stock_df.index <= end_date] | |
| start_date = min(quarterly_df.index.min(), stock_df.index.min()) | |
| all_dates = pd.date_range(start=start_date, end=end_date, freq='D').date | |
| quarterly_df_reindexed = quarterly_df.reindex(all_dates).ffill() | |
| stock_df_reindexed = stock_df.reindex(all_dates).ffill() | |
| merged_df = pd.concat([stock_df_reindexed['Close'], quarterly_df_reindexed], axis=1) | |
| merged_df = merged_df.dropna(subset=['Close']) | |
| print(f"Merged dataframe shape: {merged_df.shape}") | |
| if merged_df.empty: | |
| raise ValueError("No overlapping data between stock prices and financial statements.") | |
| X = merged_df.drop('Close', axis=1) | |
| y = merged_df['Close'] | |
| X = X.fillna(X.mean()) | |
| X['EPS_Surprise'] = X['reportedEPS'] - X['estimatedEPS'] | |
| X['EPS_Surprise_Percentage'] = X['surprisePercentage'] | |
| X = add_financial_ratios(X) | |
| scaler_X = StandardScaler() | |
| scaler_y = StandardScaler() | |
| X_scaled = pd.DataFrame(scaler_X.fit_transform(X), columns=X.columns, index=X.index) | |
| y_scaled = pd.Series(scaler_y.fit_transform(y.values.reshape(-1, 1)).flatten(), index=y.index) | |
| print(f"Final data shape: X: {X_scaled.shape}, y: {y_scaled.shape}") | |
| print(f"Date range: {X_scaled.index.min()} to {X_scaled.index.max()}") | |
| return X_scaled, y_scaled, merged_df.index, scaler_X, scaler_y | |
| def train_catboost_model(X_train, X_test, y_train, y_test): | |
| model = CatBoostRegressor( | |
| iterations=1000, | |
| learning_rate=0.1, | |
| depth=6, | |
| loss_function='RMSE', | |
| random_state=42, | |
| verbose=100 | |
| ) | |
| model.fit(X_train, y_train, eval_set=(X_test, y_test), early_stopping_rounds=50) | |
| return model | |
| def evaluate_model(model, X_test, y_test, scaler_y): | |
| y_pred_scaled = model.predict(X_test) | |
| y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() | |
| y_test_unscaled = scaler_y.inverse_transform(y_test.values.reshape(-1, 1)).flatten() | |
| mse = mean_squared_error(y_test_unscaled, y_pred) | |
| r2 = r2_score(y_test_unscaled, y_pred) | |
| print(f"Mean Squared Error: {mse}") | |
| print(f"R-squared Score: {r2}") | |
| return r2 | |
| def conformal_prediction(model, X_train, y_train, X_test, scaler_y, alpha=0.1): | |
| model.fit(X_train, y_train) | |
| y_pred_train = model.predict(X_train) | |
| y_pred_train_unscaled = scaler_y.inverse_transform(y_pred_train.reshape(-1, 1)).flatten() | |
| y_train_unscaled = scaler_y.inverse_transform(y_train.values.reshape(-1, 1)).flatten() | |
| relative_errors = np.abs((y_train_unscaled - y_pred_train_unscaled) / y_pred_train_unscaled) | |
| error_threshold = np.percentile(relative_errors, (1 - alpha) * 100) | |
| y_pred_test = model.predict(X_test) | |
| y_pred_test_unscaled = scaler_y.inverse_transform(y_pred_test.reshape(-1, 1)).flatten() | |
| lower_bound_unscaled = y_pred_test_unscaled * (1 - error_threshold) | |
| upper_bound_unscaled = y_pred_test_unscaled * (1 + error_threshold) | |
| return y_pred_test_unscaled, lower_bound_unscaled, upper_bound_unscaled | |
| def plot_results(dates, y, fair_values, lower_bound, upper_bound, scaler_y): | |
| y_unscaled = scaler_y.inverse_transform(y.values.reshape(-1, 1)).flatten() | |
| fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02, row_heights=[0.7, 0.3]) | |
| fig.add_trace(go.Scatter(x=dates, y=y_unscaled, mode='lines', name='Actual Price', line=dict(color='blue')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=dates, y=fair_values, mode='lines', name='Fair Value', line=dict(color='red')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=dates, y=upper_bound, mode='lines', name='Upper Bound', line=dict(color='gray', width=0)), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=dates, y=lower_bound, mode='lines', name='Lower Bound', line=dict(color='gray', width=0), fill='tonexty'), row=1, col=1) | |
| percent_error = ((fair_values - y_unscaled) / y_unscaled) * 100 | |
| fig.add_trace(go.Scatter(x=dates, y=percent_error, mode='lines', name='Percent Error', line=dict(color='purple')), row=2, col=1) | |
| fig.update_layout(height=800, title_text="Stock Price, Fair Value, and Percent Error") | |
| fig.update_xaxes(title_text="Date", row=2, col=1) | |
| fig.update_yaxes(title_text="Price", row=1, col=1) | |
| fig.update_yaxes(title_text="Percent Error", row=2, col=1) | |
| return fig | |
| def get_monthly_seasonality(ticker, start_date, end_date): | |
| data = yf.download(ticker, start=start_date, end=end_date) | |
| monthly_data = data['Adj Close'].resample('M').last() | |
| monthly_returns = monthly_data.pct_change() | |
| monthly_returns = monthly_returns.to_frame() | |
| monthly_returns['Month'] = monthly_returns.index.month | |
| seasonality = monthly_returns.groupby('Month')['Adj Close'].agg(['mean', 'median', 'count', lambda x: (x > 0).mean()]) | |
| seasonality.columns = ['Mean Change%', 'Median Change%', 'Count', 'Positive Periods'] | |
| return seasonality | |
| def plot_monthly_seasonality(seasonality, ticker, start_date, end_date): | |
| months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] | |
| fig = go.Figure() | |
| fig.add_trace(go.Bar( | |
| x=months, | |
| y=seasonality['Positive Periods'] * 100, | |
| name='Positive Periods', | |
| marker_color=['green' if x > 0.5 else 'red' for x in seasonality['Positive Periods']], | |
| text=[f"{seasonality['Positive Periods'][i]*100:.1f}%<br>{seasonality['Mean Change%'][i]*100:.2f}%" for i in range(1, 13)], | |
| textposition='auto' | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=months, | |
| y=seasonality['Mean Change%'] * 100, | |
| name='Mean Change%', | |
| mode='lines+markers', | |
| line=dict(color='yellow', width=2) | |
| )) | |
| fig.update_layout( | |
| title=f'Monthly Seasonality for {ticker}<br>{start_date} to {end_date}', | |
| xaxis_title='Month', | |
| yaxis_title='Percentage', | |
| template='plotly_dark', | |
| showlegend=True, | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| height=600, | |
| margin=dict(l=50, r=50, t=100, b=50) | |
| ) | |
| fig.add_hline(y=50, line_dash="dash", line_color="gray") | |
| fig.add_hline(y=0, line_dash="dash", line_color="gray") | |
| fig.update_yaxes(ticksuffix="%", range=[0, 100]) | |
| return fig | |
| def prepare_financial_data_for_gpt(financial_data): | |
| def format_financial_data(data, report_type): | |
| formatted_data = f"{report_type} (Last 5 Years):\n" | |
| if report_type in data: | |
| reports = data[report_type].get('annualReports', [])[:5] | |
| for report in reports: | |
| formatted_data += f"Fiscal Date Ending: {report.get('fiscalDateEnding', 'N/A')}\n" | |
| for key, value in report.items(): | |
| if key != 'fiscalDateEnding': | |
| formatted_data += f"{key}: {value}\n" | |
| formatted_data += "\n" | |
| return formatted_data | |
| income_statement = format_financial_data(financial_data, 'INCOME_STATEMENT') | |
| balance_sheet = format_financial_data(financial_data, 'BALANCE_SHEET') | |
| cash_flow = format_financial_data(financial_data, 'CASH_FLOW') | |
| return f"{income_statement}\n{balance_sheet}\n{cash_flow}" | |
| def get_gpt_analysis(ticker, financial_data): | |
| formatted_data = prepare_financial_data_for_gpt(financial_data) | |
| prompt = f"Analyze the following financial data for {ticker} and provide insights:\n\n{formatted_data}" | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model="gpt-4", | |
| messages=[ | |
| {"role": "system", "content": "You are a financial analyst."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=500, | |
| n=1, | |
| stop=None, | |
| temperature=0.5, | |
| ) | |
| analysis = response.choices[0].message['content'].strip() | |
| return analysis | |
| except Exception as e: | |
| print(f"OpenAI API error: {e}") | |
| return "GPT Assistant analysis failed. Please check the API integration." | |
| def plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date): | |
| stock = yf.Ticker(ticker) | |
| data = stock.history(start=start_date, end=end_date) | |
| x = (data.index - data.index[0]).days | |
| y = np.log(data['Close']) | |
| slope, intercept = np.polyfit(x, y, 1) | |
| future_days = 365 * 10 | |
| all_days = np.arange(len(x) + future_days) | |
| log_trend = np.exp(intercept + slope * all_days) | |
| inner_upper_band = log_trend * 2 | |
| inner_lower_band = log_trend / 2 | |
| outer_upper_band = log_trend * 4 | |
| outer_lower_band = log_trend / 4 | |
| extended_dates = pd.date_range(start=data.index[0], periods=len(all_days), freq='D') | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter(x=data.index, y=data['Close'], mode='lines', name='Close Price', line=dict(color='blue'))) | |
| fig.add_trace(go.Scatter(x=extended_dates, y=log_trend, mode='lines', name='Log Trend', line=dict(color='red'))) | |
| fig.add_trace(go.Scatter(x=extended_dates, y=inner_upper_band, mode='lines', name='Inner Upper Band', line=dict(color='green'))) | |
| fig.add_trace(go.Scatter(x=extended_dates, y=inner_lower_band, mode='lines', name='Inner Lower Band', line=dict(color='green'))) | |
| fig.add_trace(go.Scatter(x=extended_dates, y=outer_upper_band, mode='lines', name='Outer Upper Band', line=dict(color='orange'))) | |
| fig.add_trace(go.Scatter(x=extended_dates, y=outer_lower_band, mode='lines', name='Outer Lower Band', line=dict(color='orange'))) | |
| fig.update_layout( | |
| title=f'{ticker} Stock Price (Logarithmic Scale) with Extended Trend Lines and Outer Bands', | |
| xaxis_title='Date', | |
| yaxis_title='Price (Log Scale)', | |
| yaxis_type="log", | |
| legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.8)'), | |
| hovermode='x unified', | |
| height=800 | |
| ) | |
| fig.update_xaxes( | |
| rangeslider_visible=True, | |
| rangeselector=dict( | |
| buttons=list([ | |
| dict(count=1, label="1m", step="month", stepmode="backward"), | |
| dict(count=6, label="6m", step="month", stepmode="backward"), | |
| dict(count=1, label="YTD", step="year", stepmode="todate"), | |
| dict(count=1, label="1y", step="year", stepmode="backward"), | |
| dict(step="all") | |
| ]) | |
| ) | |
| ) | |
| return fig | |
| def analyze_stock(ticker, start_date, end_date, use_ai_assistant): | |
| try: | |
| print(f"Starting analysis for {ticker} from {start_date} to {end_date}") | |
| end_date_dt = end_date | |
| print("Fetching financial data...") | |
| financial_data = get_financial_data(ticker, end_date_dt) | |
| print("Fetching earnings dates...") | |
| earnings_dates = get_earnings_dates(ticker) | |
| print("Fetching earnings data...") | |
| earnings_data = get_earnings_data(ticker) | |
| print("Processing financial data...") | |
| quarterly_df = process_financial_data(financial_data, earnings_dates, earnings_data) | |
| print("Downloading stock data...") | |
| stock_df = get_stock_data(ticker, start_date, end_date) | |
| if quarterly_df.empty: | |
| return "No financial data available for processing.", None, None, None, None, None, None, None, None | |
| print(f"Quarterly data shape: {quarterly_df.shape}") | |
| print(f"Stock data shape: {stock_df.shape}") | |
| print("Preparing data for analysis...") | |
| X_scaled, y_scaled, dates, scaler_X, scaler_y = prepare_data(quarterly_df, stock_df, end_date_dt) | |
| if X_scaled is None or y_scaled is None: | |
| return "Not enough data for model training.", None, None, None, None, None, None, None, None | |
| print(f"Prepared data shape: X: {X_scaled.shape}, y: {y_scaled.shape}") | |
| print(f"X column names: {X_scaled.columns.tolist()}") | |
| print(f"Final number of features: {X_scaled.shape[1]}") | |
| print("Data prepared successfully. Starting model training...") | |
| X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42) | |
| print("Training CatBoost model...") | |
| model = train_catboost_model(X_train, X_test, y_train, y_test) | |
| print("Evaluating model performance...") | |
| r2 = evaluate_model(model, X_test, y_test, scaler_y) | |
| if r2 < 0.5: | |
| return "Model performance is poor. Re-evaluate features or model parameters.", None, None, None, None, None, None, None, None | |
| print("Model trained successfully. Calculating fair values with conformal prediction...") | |
| fair_values, lower_bound, upper_bound = conformal_prediction(model, X_train, y_train, X_scaled, scaler_y) | |
| print("Plotting results...") | |
| fig = plot_results(dates, y_scaled, fair_values, lower_bound, upper_bound, scaler_y) | |
| print("Calculating feature importance...") | |
| feature_importance = model.feature_importances_ | |
| feature_importance_df = pd.DataFrame({'feature': X_scaled.columns, 'importance': feature_importance}) | |
| feature_importance_df = feature_importance_df.sort_values('importance', ascending=False) | |
| print("\nTop 10 most important features:") | |
| print(feature_importance_df.head(10)) | |
| print("\nCalculating SHAP values for feature importance...") | |
| explainer = shap.TreeExplainer(model) | |
| shap_values = explainer.shap_values(X_scaled) | |
| shap_fig = plt.figure(figsize=(10, 6)) | |
| shap.summary_plot(shap_values, X_scaled, plot_type="bar", show=False) | |
| plt.title("SHAP Feature Importance") | |
| plt.tight_layout() | |
| seasonality = get_monthly_seasonality(ticker, start_date, end_date) | |
| seasonality_fig = plot_monthly_seasonality(seasonality, ticker, start_date, end_date) | |
| current_month = datetime.now().month | |
| next_month = (current_month % 12) + 1 | |
| current_month_return = seasonality.loc[current_month, 'Mean Change%'] * 100 | |
| next_month_return = seasonality.loc[next_month, 'Mean Change%'] * 100 | |
| current_month_win_rate = seasonality.loc[current_month, 'Positive Periods'] * 100 | |
| next_month_win_rate = seasonality.loc[next_month, 'Positive Periods'] * 100 | |
| seasonality_text = f""" | |
| <h2 style="margin-bottom: 15px;">Seasonality Analysis ({start_date} to {end_date})</h2> | |
| <h3>Current month ({datetime.now().strftime('%B')}):</h3> | |
| <p>Average return: {current_month_return:.2f}%</p> | |
| <p>Probability of positive return: {current_month_win_rate:.1f}%</p> | |
| <h3>Next month ({(datetime.now() + timedelta(days=31)).strftime('%B')}):</h3> | |
| <p>Average return: {next_month_return:.2f}%</p> | |
| <p>Probability of positive return: {next_month_win_rate:.1f}%</p> | |
| """ | |
| latest_close = stock_df['Close'].iloc[-1] | |
| latest_fair_value = fair_values[-1] | |
| latest_lower_bound = lower_bound[-1] | |
| latest_upper_bound = upper_bound[-1] | |
| fair_price_text = f""" | |
| <h2 style="margin-bottom: 15px;">Fair Price Analysis</h2> | |
| <p><strong>Current Price:</strong> ${latest_close:.2f}</p> | |
| <p><strong>Estimated Fair Value:</strong> ${latest_fair_value:.2f}</p> | |
| <p><strong>Price Prediction Range:</strong> ${latest_lower_bound:.2f} to ${latest_upper_bound:.2f}</p> | |
| <p><strong>R-squared Score:</strong> {r2:.4f}</p> | |
| <h3 style="margin-top: 20px;">Top 10 most important features for fair value prediction:</h3> | |
| <pre>{feature_importance_df.head(10).to_string(index=False)}</pre> | |
| """ | |
| # Determine background color and percentage change | |
| percentage_change = ((latest_fair_value - latest_close) / latest_close) * 100 | |
| background_color = "#d4edda" if percentage_change > 0 else "#f8d7da" | |
| fair_price_html = create_color_box(fair_price_text, background_color, percentage_change) | |
| # Format the seasonality analysis results | |
| current_month_color = "#d4edda" if current_month_return > 0 else "#f8d7da" | |
| next_month_color = "#d4edda" if next_month_return > 0 else "#f8d7da" | |
| seasonality_html = create_gradient_box(seasonality_text, current_month_color, next_month_color, current_month_return, next_month_return) | |
| # Generate logarithmic chart | |
| log_chart = plot_interactive_logarithmic_stock_chart(ticker, start_date, end_date) | |
| # Get GPT analysis if requested | |
| gpt_analysis = get_gpt_analysis(ticker, financial_data) if use_ai_assistant else "AI assistant analysis not requested." | |
| return fair_price_html, fig, shap_fig, seasonality_fig, seasonality_html, gpt_analysis, log_chart | |
| except Exception as e: | |
| error_message = f"An error occurred: {str(e)}" | |
| print(error_message) | |
| return error_message, None, None, None, None, None, None | |
| # Streamlit app | |
| def main(): | |
| st.set_page_config(page_title="Advanced Stock Analysis", layout="wide") | |
| st.title("Advanced Stock Analysis App") | |
| st.markdown("Enter a stock ticker and date range to perform comprehensive stock analysis.") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| ticker = st.text_input("Stock Ticker", value="MSFT") | |
| with col2: | |
| start_date = st.date_input("Start Date", value=datetime(2015, 1, 1)) | |
| with col3: | |
| end_date = st.date_input("End Date", value=datetime.now()) | |
| use_ai_assistant = st.checkbox("Use AI Assistant") | |
| if st.button("Submit", type="primary"): | |
| with st.spinner("Analyzing..."): | |
| results = analyze_stock(ticker, start_date, end_date, use_ai_assistant) | |
| display_results(results) | |
| def display_results(results): | |
| if isinstance(results, str): # Error occurred | |
| st.error(results) | |
| return | |
| fair_price_html, fig, shap_fig, seasonality_fig, seasonality_html, gpt_analysis, log_chart = results | |
| st.subheader("Fair Price Analysis") | |
| st.markdown(fair_price_html, unsafe_allow_html=True) | |
| st.subheader("Fair Price Prediction") | |
| st.plotly_chart(fig, use_container_width=True) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("SHAP Feature Importance") | |
| st.pyplot(shap_fig) | |
| with col2: | |
| st.subheader("Monthly Seasonality") | |
| st.plotly_chart(seasonality_fig, use_container_width=True) | |
| st.markdown(seasonality_html, unsafe_allow_html=True) | |
| if gpt_analysis != "AI assistant analysis not requested.": | |
| st.subheader("GPT Assistant Analysis") | |
| st.text_area("Analysis", value=gpt_analysis, height=300) | |
| st.subheader("Logarithmic Stock Chart") | |
| st.plotly_chart(log_chart, use_container_width=True) | |
| if __name__ == "__main__": | |
| main() |