stockprediction / fastapi_app.py
kpranav22's picture
Create fastapi_app.py
e99527b verified
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException, Request
from keras.layers import LSTM, Dense, Dropout
from keras.models import Sequential
from pydantic import BaseModel
from sklearn.preprocessing import MinMaxScaler
import json
import os
import tempfile
app = FastAPI()
class StockRequest(BaseModel):
stock_name: str
stock_data: list = None # Allow passing stock data directly
# We'll use in-memory storage instead of file paths
STOCK_DATA = {}
@app.post("/LSTM_Predict")
async def predict(request: Request):
# Parse the request body manually
body = await request.json()
# Create StockRequest object
if isinstance(body, dict):
stock_request = StockRequest(**body)
else:
raise HTTPException(status_code=422, detail="Invalid request format")
stock_name = stock_request.stock_name
# Check if data was passed directly in the request
if hasattr(stock_request, 'stock_data') and stock_request.stock_data:
# Use the data provided in the request
data = pd.DataFrame(stock_request.stock_data)
else:
# Try to get data from the request body directly
try:
data = pd.DataFrame(body.get('stock_data', []))
if data.empty:
raise HTTPException(status_code=422, detail="No stock data provided")
except Exception:
raise HTTPException(status_code=422, detail="Invalid stock data format")
# Convert columns to numeric, handling any non-numeric values
if 'Close' not in data.columns:
raise HTTPException(status_code=422, detail="Missing 'Close' column in data")
data['Close'] = pd.to_numeric(data['Close'], errors='coerce')
# Drop any rows with NaN values after conversion
data.dropna(subset=['Close'], inplace=True)
# Ensure we have enough data
if len(data) < 100:
raise HTTPException(status_code=400, detail="Not enough data for prediction (need at least 100 points)")
dataset = data[['Close']].values
train_len = int(np.ceil(len(dataset) * 0.8))
# Normalize the data
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(dataset)
# Prepare training data
train_data = scaled_data[:train_len, :]
seq_len = min(60, len(train_data) // 2) # Make sequence length adaptive
x_train, y_train = [], []
for i in range(seq_len, len(train_data)):
x_train.append(train_data[i - seq_len : i, 0])
y_train.append(train_data[i, 0])
x_train, y_train = np.array(x_train), np.array(y_train)
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
# Build LSTM model
model = Sequential()
model.add(LSTM(64, return_sequences=True, input_shape=(x_train.shape[1], 1)))
model.add(Dropout(0.2))
model.add(LSTM(32, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(1, activation="linear"))
model.compile(
optimizer="adam", loss="mean_squared_error", metrics=["mean_squared_error"]
)
# Train the model (reduced epochs for faster response)
model.fit(x_train, y_train, batch_size=32, epochs=5, verbose=0)
# Prepare data for prediction (we'll predict for the entire dataset)
x_all = []
for i in range(seq_len, len(scaled_data)):
x_all.append(scaled_data[i - seq_len : i, 0])
x_all = np.array(x_all)
x_all = np.reshape(x_all, (x_all.shape[0], x_all.shape[1], 1))
# Make predictions
predictions = model.predict(x_all)
predictions = scaler.inverse_transform(predictions)
# Create full predictions array (including the initial sequence length)
full_predictions = np.zeros((len(dataset), 1))
# Fill the first seq_len positions with actual values
full_predictions[:seq_len, 0] = dataset[:seq_len, 0]
# Fill the rest with predictions
full_predictions[seq_len:, 0] = predictions.flatten()
# Convert to list of floats for JSON response
predict_prices = [float(price) for price in full_predictions.flatten()]
return {"prediction": predict_prices}
# For testing locally
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)