Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.metrics import mean_squared_error, mean_absolute_error | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import Dense, LSTM, Dropout | |
| from datetime import timedelta | |
| # Title and description | |
| st.title("Stock Price Prediction with LSTM") | |
| st.write("This application uses LSTM (Long Short-Term Memory) neural networks to predict stock prices.") | |
| # Load the data directly (replace 'AAPL_dataset_copied.csv' with your actual file path) | |
| data = pd.read_csv('AAPL_dataset_copied.csv') | |
| # Convert 'date' column to datetime and set as index | |
| data['date'] = pd.to_datetime(data['date']) | |
| data.set_index('date', inplace=True) | |
| # Select only the 'Close' column | |
| data = data[['close']] | |
| # Show the first few rows of the dataset | |
| st.subheader("Dataset Preview") | |
| st.write(data.head()) | |
| # Normalize the data for faster convergence | |
| scaler = MinMaxScaler(feature_range=(0, 1)) | |
| data['close_scaled'] = scaler.fit_transform(data[['close']]) | |
| # Split data into training (70%), validation (15%), and testing (15%) sets | |
| train_size = int(len(data) * 0.7) | |
| val_size = int(len(data) * 0.15) | |
| train_data = data['close_scaled'][:train_size].values.reshape(-1, 1) | |
| val_data = data['close_scaled'][train_size:train_size + val_size].values.reshape(-1, 1) | |
| test_data = data['close_scaled'][train_size + val_size:].values.reshape(-1, 1) | |
| # Function to create sequences for LSTM | |
| def create_sequences(dataset, time_step=60): | |
| X, Y = [], [] | |
| for i in range(len(dataset) - time_step): | |
| X.append(dataset[i:(i + time_step), 0]) | |
| Y.append(dataset[i + time_step, 0]) | |
| return np.array(X), np.array(Y) | |
| # Define time step (e.g., 60 days) | |
| time_step = 60 | |
| X_train, y_train = create_sequences(train_data, time_step) | |
| X_val, y_val = create_sequences(val_data, time_step) | |
| X_test, y_test = create_sequences(test_data, time_step) | |
| # Reshape input to be [samples, time steps, features] for LSTM | |
| X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1) | |
| X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 1) | |
| X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1) | |
| # Build the LSTM model with Dropout for regularization | |
| model = Sequential([ | |
| LSTM(100, return_sequences=True, input_shape=(X_train.shape[1], 1)), | |
| Dropout(0.2), | |
| LSTM(50, return_sequences=True), | |
| Dropout(0.2), | |
| LSTM(50, return_sequences=False), | |
| Dropout(0.2), | |
| Dense(25), | |
| Dense(1) | |
| ]) | |
| # Compile the model with Adam optimizer and mean squared error loss | |
| model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_absolute_error']) | |
| # Train the model without EarlyStopping | |
| st.write("Training the LSTM model...") | |
| history = model.fit(X_train, y_train, validation_data=(X_val, y_val), | |
| epochs=50, batch_size=64, verbose=1) | |
| # Evaluate on the test data | |
| test_loss, test_mae = model.evaluate(X_test, y_test, verbose=0) | |
| # Make predictions on the test data | |
| train_predict = model.predict(X_train) | |
| val_predict = model.predict(X_val) | |
| test_predict = model.predict(X_test) | |
| # Inverse transform the predictions and actual values to original scale | |
| train_predict = scaler.inverse_transform(train_predict) | |
| val_predict = scaler.inverse_transform(val_predict) | |
| test_predict = scaler.inverse_transform(test_predict) | |
| y_train = scaler.inverse_transform([y_train]) | |
| y_val = scaler.inverse_transform([y_val]) | |
| y_test = scaler.inverse_transform([y_test]) | |
| # Calculate evaluation metrics | |
| train_rmse = np.sqrt(mean_squared_error(y_train[0], train_predict[:,0])) | |
| val_rmse = np.sqrt(mean_squared_error(y_val[0], val_predict[:,0])) | |
| test_rmse = np.sqrt(mean_squared_error(y_test[0], test_predict[:,0])) | |
| train_mae = mean_absolute_error(y_train[0], train_predict[:,0]) | |
| val_mae = mean_absolute_error(y_val[0], val_predict[:,0]) | |
| test_mae = mean_absolute_error(y_test[0], test_predict[:,0]) | |
| # Mean Absolute Percentage Error (MAPE) as accuracy | |
| mape = np.mean(np.abs((y_test[0] - test_predict[:, 0]) / y_test[0])) * 100 | |
| accuracy = 100 - mape | |
| st.write(f"LSTM Model - Train RMSE: {train_rmse:.2f}, Train MAE: {train_mae:.2f}") | |
| st.write(f"LSTM Model - Validation RMSE: {val_rmse:.2f}, Validation MAE: {val_mae:.2f}") | |
| st.write(f"LSTM Model - Test RMSE: {test_rmse:.2f}, Test MAE: {test_mae:.2f}") | |
| st.write(f"LSTM Model - Test Accuracy: {accuracy:.2f}%") | |
| # Plot the results | |
| st.subheader("Prediction Results") | |
| plt.figure(figsize=(14,6)) | |
| plt.plot(data.index[:train_size], scaler.inverse_transform(train_data), label='Training Data') | |
| plt.plot(data.index[train_size + time_step:train_size + time_step + len(val_predict)], val_predict, label='Validation Predictions') | |
| plt.plot(data.index[train_size + val_size + time_step:], test_predict, label='Test Predictions') | |
| plt.plot(data.index[train_size + val_size + time_step:], y_test[0], label='Actual Test Data') | |
| plt.xlabel('Date') | |
| plt.ylabel('Stock Price') | |
| plt.legend(['Training Data', 'Validation Predictions', 'Test Predictions', 'Actual Test Data'], loc='upper left') | |
| st.pyplot(plt) | |
| # User-defined future prediction days | |
| num_days_to_predict = st.slider("Select the number of days to predict into the future", min_value=1, max_value=30, value=10) | |
| # Predict future prices for the next 'num_days_to_predict' days | |
| temp_input = np.array(test_data[-time_step:]).reshape(-1).tolist() | |
| lst_output = [] | |
| for i in range(num_days_to_predict): | |
| if len(temp_input) > time_step: | |
| x_input = np.array(temp_input[-time_step:]) | |
| x_input = x_input.reshape((1, time_step, 1)) | |
| yhat = model.predict(x_input, verbose=0) | |
| temp_input.append(yhat[0][0]) | |
| lst_output.append(yhat[0][0]) | |
| else: | |
| x_input = np.array(temp_input).reshape((1, time_step, 1)) | |
| yhat = model.predict(x_input, verbose=0) | |
| temp_input.append(yhat[0][0]) | |
| lst_output.append(yhat[0][0]) | |
| # Inverse transform future predictions to the original scale | |
| future_predictions = scaler.inverse_transform(np.array(lst_output).reshape(-1, 1)) | |
| # Generate dates for future predictions | |
| last_date = data.index[-1] | |
| future_dates = [last_date + timedelta(days=i) for i in range(1, num_days_to_predict + 1)] | |
| # Display future predictions with dates | |
| st.subheader(f"Future Predictions for the next {num_days_to_predict} days:") | |
| future_df = pd.DataFrame({'Date': future_dates, 'Predicted Price (LSTM)': future_predictions.flatten()}) | |
| st.write(future_df) | |