import numpy as np import pandas as pd from fastapi import FastAPI, HTTPException, Request from keras.layers import LSTM, Dense, Dropout from keras.models import Sequential from pydantic import BaseModel from sklearn.preprocessing import MinMaxScaler import json import os import tempfile app = FastAPI() class StockRequest(BaseModel): stock_name: str stock_data: list = None # Allow passing stock data directly # We'll use in-memory storage instead of file paths STOCK_DATA = {} @app.post("/LSTM_Predict") async def predict(request: Request): # Parse the request body manually body = await request.json() # Create StockRequest object if isinstance(body, dict): stock_request = StockRequest(**body) else: raise HTTPException(status_code=422, detail="Invalid request format") stock_name = stock_request.stock_name # Check if data was passed directly in the request if hasattr(stock_request, 'stock_data') and stock_request.stock_data: # Use the data provided in the request data = pd.DataFrame(stock_request.stock_data) else: # Try to get data from the request body directly try: data = pd.DataFrame(body.get('stock_data', [])) if data.empty: raise HTTPException(status_code=422, detail="No stock data provided") except Exception: raise HTTPException(status_code=422, detail="Invalid stock data format") # Convert columns to numeric, handling any non-numeric values if 'Close' not in data.columns: raise HTTPException(status_code=422, detail="Missing 'Close' column in data") data['Close'] = pd.to_numeric(data['Close'], errors='coerce') # Drop any rows with NaN values after conversion data.dropna(subset=['Close'], inplace=True) # Ensure we have enough data if len(data) < 100: raise HTTPException(status_code=400, detail="Not enough data for prediction (need at least 100 points)") dataset = data[['Close']].values train_len = int(np.ceil(len(dataset) * 0.8)) # Normalize the data scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(dataset) # Prepare training data train_data = scaled_data[:train_len, :] seq_len = min(60, len(train_data) // 2) # Make sequence length adaptive x_train, y_train = [], [] for i in range(seq_len, len(train_data)): x_train.append(train_data[i - seq_len : i, 0]) y_train.append(train_data[i, 0]) x_train, y_train = np.array(x_train), np.array(y_train) x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) # Build LSTM model model = Sequential() model.add(LSTM(64, return_sequences=True, input_shape=(x_train.shape[1], 1))) model.add(Dropout(0.2)) model.add(LSTM(32, return_sequences=False)) model.add(Dropout(0.2)) model.add(Dense(1, activation="linear")) model.compile( optimizer="adam", loss="mean_squared_error", metrics=["mean_squared_error"] ) # Train the model (reduced epochs for faster response) model.fit(x_train, y_train, batch_size=32, epochs=5, verbose=0) # Prepare data for prediction (we'll predict for the entire dataset) x_all = [] for i in range(seq_len, len(scaled_data)): x_all.append(scaled_data[i - seq_len : i, 0]) x_all = np.array(x_all) x_all = np.reshape(x_all, (x_all.shape[0], x_all.shape[1], 1)) # Make predictions predictions = model.predict(x_all) predictions = scaler.inverse_transform(predictions) # Create full predictions array (including the initial sequence length) full_predictions = np.zeros((len(dataset), 1)) # Fill the first seq_len positions with actual values full_predictions[:seq_len, 0] = dataset[:seq_len, 0] # Fill the rest with predictions full_predictions[seq_len:, 0] = predictions.flatten() # Convert to list of floats for JSON response predict_prices = [float(price) for price in full_predictions.flatten()] return {"prediction": predict_prices} # For testing locally if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)