Spaces:
Build error
Build error
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from datetime import datetime, timedelta | |
| import pickle | |
| import joblib | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.metrics import mean_squared_error, mean_absolute_error | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Try to import models | |
| try: | |
| from statsmodels.tsa.arima.model import ARIMA | |
| import tensorflow as tf | |
| from tensorflow.keras.models import Sequential, load_model | |
| from tensorflow.keras.layers import LSTM, Dense, Dropout | |
| from tensorflow.keras.optimizers import Adam | |
| from tensorflow.keras.callbacks import EarlyStopping | |
| MODELS_AVAILABLE = True | |
| except ImportError as e: | |
| MODELS_AVAILABLE = False | |
| st.error(f"Required libraries not installed. Please install statsmodels and tensorflow. Error: {e}") | |
| st.set_page_config( | |
| page_title="Stock Price Forecasting: ARIMA vs LSTM", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| # Title and description | |
| st.title("Stock Price Forecasting: ARIMA vs LSTM") | |
| st.markdown(""" | |
| This application demonstrates stock price forecasting using traditional statistical methods (ARIMA) | |
| and deep learning approaches (LSTM). Upload your stock data to get predictions and performance comparisons. | |
| """) | |
| # Sidebar for model selection and parameters | |
| st.sidebar.header("Model Configuration") | |
| model_choice = st.sidebar.selectbox( | |
| "Select Forecasting Model", | |
| ["ARIMA", "LSTM", "Both Models"] | |
| ) | |
| forecast_days = st.sidebar.slider("Forecast Days", 1, 60, 30) | |
| # File upload | |
| st.header(" Data Upload") | |
| uploaded_file = st.file_uploader( | |
| "Upload your stock data CSV file", | |
| type=['csv'], | |
| help="Expected format: columns should include 'date', 'close', 'open', 'high', 'low', 'volume', 'Name'" | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| # Load data | |
| df = pd.read_csv(uploaded_file) | |
| st.success("Data loaded successfully!") | |
| # Display basic info | |
| st.subheader("Dataset Overview") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Records", len(df)) | |
| with col2: | |
| st.metric("Companies", df['Name'].nunique() if 'Name' in df.columns else 1) | |
| with col3: | |
| st.metric("Date Range", f"{len(df)} days") | |
| # Show sample data | |
| st.dataframe(df.head(), use_container_width=True) | |
| # Company selection | |
| if 'Name' in df.columns: | |
| companies = df['Name'].unique() | |
| selected_company = st.selectbox("Select Company", companies) | |
| stock_data = df[df['Name'] == selected_company].copy() | |
| else: | |
| selected_company = "Stock" | |
| stock_data = df.copy() | |
| # Data preprocessing | |
| stock_data['date'] = pd.to_datetime(stock_data['date']) | |
| stock_data = stock_data.sort_values('date').set_index('date') | |
| # Display stock price chart | |
| st.subheader(f"π {selected_company} Stock Price Over Time") | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| ax.plot(stock_data.index, stock_data['close'], linewidth=2) | |
| ax.set_title(f"{selected_company} Closing Price") | |
| ax.set_xlabel("Date") | |
| ax.set_ylabel("Price ($)") | |
| ax.grid(True, alpha=0.3) | |
| st.pyplot(fig) | |
| # Model predictions section | |
| st.header(" Forecasting Results") | |
| if st.button("Generate Forecasts", type="primary"): | |
| with st.spinner("Training models and generating forecasts..."): | |
| # Prepare data | |
| ts_data = stock_data['close'].dropna() | |
| results = {} | |
| if model_choice in ["ARIMA", "Both Models"] and MODELS_AVAILABLE: | |
| try: | |
| # ARIMA Model | |
| st.info("Training ARIMA model...") | |
| # Simple ARIMA parameters (for demo) | |
| arima_model = ARIMA(ts_data, order=(1, 1, 1)) | |
| fitted_arima = arima_model.fit() | |
| # Generate forecast | |
| arima_forecast = fitted_arima.forecast(steps=forecast_days) | |
| results['ARIMA'] = { | |
| 'forecast': arima_forecast, | |
| 'model': fitted_arima | |
| } | |
| except Exception as e: | |
| st.error(f"ARIMA model error: {str(e)}") | |
| if model_choice in ["LSTM", "Both Models"] and MODELS_AVAILABLE: | |
| try: | |
| # LSTM Model (Real Neural Network Implementation) | |
| # Create a cache key for this dataset | |
| cache_key = f"lstm_model_{selected_company}_{len(ts_data)}" | |
| # Prepare LSTM data | |
| scaler = MinMaxScaler() | |
| scaled_data = scaler.fit_transform(ts_data.values.reshape(-1, 1)) | |
| sequence_length = min(60, len(scaled_data) // 4) | |
| if len(scaled_data) <= sequence_length + 10: | |
| st.warning("Insufficient data for LSTM training. Need at least 70 data points.") | |
| # Fallback to simple trend method | |
| last_values = ts_data.tail(10) | |
| trend = np.polyfit(range(len(last_values)), last_values, 1)[0] | |
| lstm_forecast = [ts_data.iloc[-1] + trend * i for i in range(1, forecast_days + 1)] | |
| results['LSTM (Trend Fallback)'] = { | |
| 'forecast': np.array(lstm_forecast), | |
| 'scaler': None | |
| } | |
| elif cache_key not in st.session_state: | |
| # Train new LSTM model | |
| st.info("Training LSTM model (this may take a minute)...") | |
| # Set seeds for reproducibility | |
| np.random.seed(42) | |
| tf.random.set_seed(42) | |
| def create_sequences(data, seq_length): | |
| X, y = [], [] | |
| for i in range(len(data) - seq_length): | |
| X.append(data[i:(i + seq_length)]) | |
| y.append(data[i + seq_length]) | |
| return np.array(X), np.array(y) | |
| # Create training sequences | |
| X, y = create_sequences(scaled_data, sequence_length) | |
| # Split data for training (use 80% for training) | |
| train_size = int(len(X) * 0.8) | |
| X_train, X_test = X[:train_size], X[train_size:] | |
| y_train, y_test = y[:train_size], y[train_size:] | |
| # Build LSTM model | |
| model = Sequential([ | |
| LSTM(50, return_sequences=True, input_shape=(sequence_length, 1)), | |
| Dropout(0.2), | |
| LSTM(50, return_sequences=False), | |
| Dropout(0.2), | |
| Dense(25), | |
| Dense(1) | |
| ]) | |
| model.compile(optimizer=Adam(learning_rate=0.001), loss='mse') | |
| # Training with early stopping | |
| early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) | |
| # Train the model | |
| with st.spinner("Training LSTM neural network..."): | |
| history = model.fit( | |
| X_train, y_train, | |
| batch_size=32, | |
| epochs=50, | |
| validation_data=(X_test, y_test), | |
| callbacks=[early_stopping], | |
| verbose=0 | |
| ) | |
| # Calculate model performance on test set | |
| test_predictions = model.predict(X_test, verbose=0) | |
| test_predictions = scaler.inverse_transform(test_predictions) | |
| y_test_actual = scaler.inverse_transform(y_test) | |
| lstm_test_rmse = np.sqrt(mean_squared_error(y_test_actual, test_predictions)) | |
| st.success(f"LSTM Training Complete - Test RMSE: ${lstm_test_rmse:.2f}") | |
| # Cache the trained model and related data | |
| st.session_state[cache_key] = { | |
| 'model': model, | |
| 'scaler': scaler, | |
| 'test_rmse': lstm_test_rmse, | |
| 'sequence_length': sequence_length | |
| } | |
| # Generate forecast | |
| st.info("Generating LSTM predictions...") | |
| last_sequence = scaled_data[-sequence_length:].reshape(1, sequence_length, 1) | |
| lstm_forecast_scaled = [] | |
| current_sequence = last_sequence.copy() | |
| for _ in range(forecast_days): | |
| next_pred = model.predict(current_sequence, verbose=0)[0, 0] | |
| lstm_forecast_scaled.append(next_pred) | |
| current_sequence = np.roll(current_sequence, -1, axis=1) | |
| current_sequence[0, -1, 0] = next_pred | |
| lstm_forecast_scaled = np.array(lstm_forecast_scaled).reshape(-1, 1) | |
| lstm_forecast = scaler.inverse_transform(lstm_forecast_scaled).flatten() | |
| results['LSTM'] = { | |
| 'forecast': lstm_forecast, | |
| 'model': model, | |
| 'scaler': scaler, | |
| 'test_rmse': lstm_test_rmse | |
| } | |
| else: | |
| # Use cached model | |
| st.info("Using cached LSTM model...") | |
| cached_data = st.session_state[cache_key] | |
| model = cached_data['model'] | |
| scaler = cached_data['scaler'] | |
| lstm_test_rmse = cached_data['test_rmse'] | |
| sequence_length = cached_data['sequence_length'] | |
| # Prepare data for cached model | |
| scaled_data = scaler.transform(ts_data.values.reshape(-1, 1)) | |
| # Generate forecast with cached model | |
| st.info("Generating LSTM predictions...") | |
| last_sequence = scaled_data[-sequence_length:].reshape(1, sequence_length, 1) | |
| lstm_forecast_scaled = [] | |
| current_sequence = last_sequence.copy() | |
| for _ in range(forecast_days): | |
| next_pred = model.predict(current_sequence, verbose=0)[0, 0] | |
| lstm_forecast_scaled.append(next_pred) | |
| current_sequence = np.roll(current_sequence, -1, axis=1) | |
| current_sequence[0, -1, 0] = next_pred | |
| lstm_forecast_scaled = np.array(lstm_forecast_scaled).reshape(-1, 1) | |
| lstm_forecast = scaler.inverse_transform(lstm_forecast_scaled).flatten() | |
| results['LSTM'] = { | |
| 'forecast': lstm_forecast, | |
| 'model': model, | |
| 'scaler': scaler, | |
| 'test_rmse': lstm_test_rmse | |
| } | |
| except Exception as e: | |
| st.error(f"LSTM model error: {str(e)}") # Display results | |
| if results: | |
| # Create forecast dates | |
| last_date = stock_data.index[-1] | |
| forecast_dates = pd.date_range( | |
| start=last_date + timedelta(days=1), | |
| periods=forecast_days, | |
| freq='B' | |
| ) | |
| # Plot forecasts | |
| st.subheader("Forecast Visualization") | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6)) | |
| # Historical + Forecast plot | |
| historical_period = 180 | |
| hist_data = stock_data['close'].iloc[-historical_period:] | |
| ax1.plot(hist_data.index, hist_data.values, | |
| label='Historical', color='black', linewidth=2) | |
| colors = ['blue', 'red'] | |
| styles = ['--', ':'] | |
| for i, (model_name, result) in enumerate(results.items()): | |
| ax1.plot(forecast_dates, result['forecast'], | |
| label=f'{model_name} Forecast', | |
| color=colors[i], linestyle=styles[i], linewidth=2) | |
| ax1.set_title(f"{selected_company} Stock Price Forecast") | |
| ax1.set_xlabel("Date") | |
| ax1.set_ylabel("Price ($)") | |
| ax1.legend() | |
| ax1.grid(True, alpha=0.3) | |
| ax1.tick_params(axis='x', rotation=45) | |
| # Forecast comparison | |
| if len(results) > 1: | |
| forecast_df = pd.DataFrame({ | |
| 'Date': forecast_dates, | |
| **{name: result['forecast'] for name, result in results.items()} | |
| }) | |
| x = np.arange(min(10, len(forecast_dates))) | |
| width = 0.35 | |
| models = list(results.keys()) | |
| for i, model in enumerate(models): | |
| ax2.bar(x + i * width, results[model]['forecast'][:len(x)], | |
| width, label=model, color=colors[i], alpha=0.7) | |
| ax2.set_title('First 10 Days Forecast Comparison') | |
| ax2.set_xlabel('Forecast Day') | |
| ax2.set_ylabel('Predicted Price ($)') | |
| ax2.set_xticks(x + width / 2) | |
| ax2.set_xticklabels([f'Day {i+1}' for i in range(len(x))]) | |
| ax2.legend() | |
| ax2.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| # Summary table | |
| st.subheader(" Forecast Summary") | |
| current_price = stock_data['close'].iloc[-1] | |
| summary_data = [] | |
| summary_data.append(['Current Price', f'${current_price:.2f}']) | |
| for model_name, result in results.items(): | |
| # Handle different forecast types (pandas Series or numpy array) | |
| if hasattr(result['forecast'], 'iloc'): | |
| forecast_price = result['forecast'].iloc[-1] | |
| else: | |
| forecast_price = result['forecast'][-1] | |
| price_change = ((forecast_price - current_price) / current_price) * 100 | |
| summary_data.append([ | |
| f'{model_name} Forecast ({forecast_days}d)', | |
| f'${forecast_price:.2f}' | |
| ]) | |
| summary_data.append([ | |
| f'{model_name} Change (%)', | |
| f'{price_change:+.1f}%' | |
| ]) | |
| summary_df = pd.DataFrame(summary_data, columns=['Metric', 'Value']) | |
| st.dataframe(summary_df, use_container_width=True) | |
| # Download forecast data | |
| if len(results) > 0: | |
| forecast_export = pd.DataFrame({ | |
| 'Date': forecast_dates, | |
| **{f'{name}_Forecast': result['forecast'] | |
| for name, result in results.items()} | |
| }) | |
| csv = forecast_export.to_csv(index=False) | |
| st.download_button( | |
| label="π₯ Download Forecast Data", | |
| data=csv, | |
| file_name=f"{selected_company}_forecast_{datetime.now().strftime('%Y%m%d')}.csv", | |
| mime="text/csv" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error processing data: {str(e)}") | |
| st.info("Please ensure your CSV has the required columns: date, close, open, high, low, volume, Name") | |
| else: | |
| st.info(" Please upload a CSV file to get started") | |
| # Show example data format | |
| st.subheader(" Expected Data Format") | |
| example_data = pd.DataFrame({ | |
| 'date': ['2023-01-01', '2023-01-02', '2023-01-03'], | |
| 'open': [100.0, 101.0, 99.5], | |
| 'high': [102.0, 103.0, 101.0], | |
| 'low': [99.0, 100.0, 98.0], | |
| 'close': [101.0, 102.5, 100.0], | |
| 'volume': [1000000, 1200000, 900000], | |
| 'Name': ['AAPL', 'AAPL', 'AAPL'] | |
| }) | |
| st.dataframe(example_data, use_container_width=True) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown(""" | |
| **DataSynthis ML Job Task** - Stock Price Forecasting Application | |
| Built with Streamlit β’ Models: ARIMA & LSTM β’ Deployed on Hugging Face Spaces | |
| """) |