MarketMindP / static /pipelines /lstm_pipeline.py
AronWolverine's picture
initial
56bd117
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import Huber
from flask import jsonify
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
# Function to predict future stock prices
def predict_future(model, last_sequence, scaler, n_steps=1):
last_sequence_reshaped = last_sequence.reshape((1, last_sequence.shape[0], last_sequence.shape[1]))
future_predictions = []
current_input = last_sequence_reshaped
for _ in range(n_steps):
predicted_price = model.predict(current_input)[0, 0]
future_predictions.append(predicted_price)
current_input = np.roll(current_input, -1, axis=1)
current_input[0, -1, 0] = predicted_price
future_predictions = scaler.inverse_transform(
np.hstack((np.array(future_predictions).reshape(-1, 1), np.zeros((len(future_predictions), 6))))
)[:, 0]
return future_predictions
# Function to find optimal buy points
def find_optimal_buy_points(prices, window=3, threshold=0.01):
buy_indices = []
buy_signals = {"points": [], "reasons": []}
if len(prices) < 2 * window + 1:
return buy_indices, buy_signals
for i in range(window, len(prices) - window):
window_slice = prices[i - window:i + window + 1]
if prices[i] == min(window_slice):
before_diff = prices[i] - min(prices[i - window:i])
after_diff = prices[i] - min(prices[i + 1:i + window + 1])
if before_diff > threshold and after_diff > threshold:
buy_indices.append(i)
buy_signals["points"].append(prices[i])
buy_signals["reasons"].append("Local minimum with significant change before and after")
if not buy_indices and len(prices) > 0:
min_idx = np.argmin(prices)
buy_indices.append(min_idx)
buy_signals["points"].append(prices[min_idx])
buy_signals["reasons"].append("Lowest price point")
return buy_indices, buy_signals
# 6. Create Sequences for Time Series Forecasting
def create_sequences(data, seq_length=20):
sequences = []
for i in range(len(data) - seq_length):
sequences.append(data[i: i + seq_length])
return np.array(sequences)
# Main pipeline function to train LSTM model and return predictions
def run_lstm_prediction(data):
ticker = data['ticker']
prediction_date = data['prediction_date']
epochs = int(data.get('epochs', 100)) # default to 100 if not provided
# Download stock data
df = yf.download(ticker, start='2019-01-01', end=datetime.today().strftime('%Y-%m-%d'))
if df.empty:
return jsonify({'error': 'Failed to fetch stock data.'}), 400
# Download market indices
nifty = yf.download("^NSEI", start='2019-01-01', end=datetime.today().strftime('%Y-%m-%d'))[['Close']].rename(columns={'Close': 'NIFTY_Close'})
sensex = yf.download("^BSESN", start='2019-01-01', end=datetime.today().strftime('%Y-%m-%d'))[['Close']].rename(columns={'Close': 'SENSEX_Close'})
# Merge indices with stock data
df = df.merge(nifty, left_index=True, right_index=True)
df = df.merge(sensex, left_index=True, right_index=True)
df = df[['Close', 'Open', 'High', 'Low', 'Volume', 'NIFTY_Close', 'SENSEX_Close']]
# Scaling
scaler = MinMaxScaler(feature_range=(-1, 1))
df_scaled = scaler.fit_transform(df)
# 5. Convert to NumPy Array
df_array = df_scaled.astype(np.float32)
seq_length = 20 # Use past 20 days to predict next day
sequences = create_sequences(df_array, seq_length+1)
# 7. Split Data into Train (80%), Validation (10%), Test (10%)
def split_data(sequence):
train_data, test_data = train_test_split(sequence, test_size=0.2, shuffle=False)
val_data, test_data = train_test_split(test_data, test_size=0.5, shuffle=False)
return train_data, val_data, test_data
train_data, val_data, test_data = split_data(sequences)
# 8. Separate Features (X) and Target (y)
X_train, y_train = train_data[:, :-1, :], train_data[:, -1, 0]
X_valid, y_valid = val_data[:, :-1, :], val_data[:, -1, 0]
X_test, y_test = test_data[:, :-1, :], test_data[:, -1, 0]
print(f"x_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"x_valid shape: {X_valid.shape}, y_valid shape: {y_valid.shape}")
print(f"x_test shape: {X_test.shape}, y_test shape: {y_test.shape}")
# Build LSTM model
model = Sequential()
model.add(LSTM(128, return_sequences=True, activation='relu', input_shape=(X_test.shape[1], X_test.shape[2])))
model.add(LSTM(64, return_sequences=False,activation='relu'))
model.add(Dense(39, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse',metrics=['mean_squared_error'])
model.fit(X_train, y_train, epochs=epochs, batch_size=32, verbose=1,shuffle=False,validation_data=(X_valid,y_valid))
# Predict and Reshape Results
predictions = model.predict(X_test)
predictions = predictions.reshape(-1, 1)
# Inverse Transform Predictions (Only Close Price)
predictions = scaler.inverse_transform(np.hstack((predictions, np.zeros((len(predictions), 6)))))[:, 0]
# Inverse Transform y_test (Only Close Price)
y_test_unscaled = scaler.inverse_transform(np.hstack((y_test.reshape(-1, 1), np.zeros((len(y_test), 6)))))[:, 0]
# Align predictions and actuals
predictions = predictions[-len(y_test_unscaled):]
rmse= np.sqrt(np.mean((predictions-y_test_unscaled)**2))
mae = mean_absolute_error(y_test_unscaled, predictions)
mse= mean_squared_error(y_test_unscaled, predictions)
r2= r2_score(y_test_unscaled, predictions)
print(f"Test RMSE: {rmse}")
print(f"Test MAE: {mae}")
print(f"Test MSE: {mse}")
print(f"Test R2: {r2}")
# Split into Train and Validation/Test Data
train_size = int(len(df) * 0.9)
train = df.iloc[:train_size].copy()
valid = df.iloc[train_size:].copy()
# Ensure valid has the same length as predictions
valid = valid.iloc[-len(predictions):].copy()
valid['Predictions'] = predictions
valid.index = df.index[train_size:][-len(valid):] # Align timestamps
# Optional: Fill missing data (if any)
valid['Predictions'] = valid['Predictions'].interpolate()
# Fix Training Data Index (if needed)
train.index = df.index[:train_size]
# Historical data for response
#test_indices = df.index[-len(X_test):]
# Historical data for response (use test_indices)
historical_data = [
{
'date': str(train.index[i].date()) if hasattr(train.index[i], 'date') else str(train.index[i]),
'price': float(train['Close'].iloc[i])
}
for i in range(len(train))
]
# Future prediction logic
last_sequence = X_test[-1]
last_date = df.index[-1]
future_start_date = datetime.strptime(prediction_date, '%Y-%m-%d')
future_days = (future_start_date - last_date).days
if future_days <= 0:
return jsonify({'error': 'Prediction date must be in the future.'}), 400
# Predict future prices
future_prices = predict_future(model, last_sequence, scaler, n_steps=future_days)
future_dates = pd.date_range(start=last_date + timedelta(days=1), periods=future_days, freq='B')
# Find buy points
buy_indices, buy_signals = find_optimal_buy_points(future_prices)
# # Validation DataFrame for actual vs predicted
# valid = pd.DataFrame({
# 'Close': y_test_unscaled,
# 'Predictions': predictions
# }, index=test_indices)
# Prepare actual vs predicted for validation/test period
actual_vs_predicted = [
{
'date': str(valid.index[i].date()),
'actual': float(valid['Close'].iloc[i]),
'predicted': float(valid['Predictions'].iloc[i])
}
for i in range(len(valid))
]
# Final result JSON
result = {
'stock': str(ticker),
'predictedPrice': float(round(predictions[-1], 2)),
'historicalData': historical_data,
'futurePrices': [float(x) for x in future_prices.tolist()],
'futureDates': [str(x) for x in future_dates.strftime('%Y-%m-%d').tolist()],
'buySignals': {
"points": [float(x) for x in buy_signals["points"]],
"reasons": [str(r) for r in buy_signals["reasons"]]
},
'buyIndices': [int(x) for x in buy_indices],
'actualVsPredicted': actual_vs_predicted
}
return jsonify(result)