Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException, Request | |
| from keras.layers import LSTM, Dense, Dropout | |
| from keras.models import Sequential | |
| from pydantic import BaseModel | |
| from sklearn.preprocessing import MinMaxScaler | |
| import json | |
| import os | |
| import tempfile | |
| app = FastAPI() | |
| class StockRequest(BaseModel): | |
| stock_name: str | |
| stock_data: list = None # Allow passing stock data directly | |
| # We'll use in-memory storage instead of file paths | |
| STOCK_DATA = {} | |
| async def predict(request: Request): | |
| # Parse the request body manually | |
| body = await request.json() | |
| # Create StockRequest object | |
| if isinstance(body, dict): | |
| stock_request = StockRequest(**body) | |
| else: | |
| raise HTTPException(status_code=422, detail="Invalid request format") | |
| stock_name = stock_request.stock_name | |
| # Check if data was passed directly in the request | |
| if hasattr(stock_request, 'stock_data') and stock_request.stock_data: | |
| # Use the data provided in the request | |
| data = pd.DataFrame(stock_request.stock_data) | |
| else: | |
| # Try to get data from the request body directly | |
| try: | |
| data = pd.DataFrame(body.get('stock_data', [])) | |
| if data.empty: | |
| raise HTTPException(status_code=422, detail="No stock data provided") | |
| except Exception: | |
| raise HTTPException(status_code=422, detail="Invalid stock data format") | |
| # Convert columns to numeric, handling any non-numeric values | |
| if 'Close' not in data.columns: | |
| raise HTTPException(status_code=422, detail="Missing 'Close' column in data") | |
| data['Close'] = pd.to_numeric(data['Close'], errors='coerce') | |
| # Drop any rows with NaN values after conversion | |
| data.dropna(subset=['Close'], inplace=True) | |
| # Ensure we have enough data | |
| if len(data) < 100: | |
| raise HTTPException(status_code=400, detail="Not enough data for prediction (need at least 100 points)") | |
| dataset = data[['Close']].values | |
| train_len = int(np.ceil(len(dataset) * 0.8)) | |
| # Normalize the data | |
| scaler = MinMaxScaler(feature_range=(0, 1)) | |
| scaled_data = scaler.fit_transform(dataset) | |
| # Prepare training data | |
| train_data = scaled_data[:train_len, :] | |
| seq_len = min(60, len(train_data) // 2) # Make sequence length adaptive | |
| x_train, y_train = [], [] | |
| for i in range(seq_len, len(train_data)): | |
| x_train.append(train_data[i - seq_len : i, 0]) | |
| y_train.append(train_data[i, 0]) | |
| x_train, y_train = np.array(x_train), np.array(y_train) | |
| x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) | |
| # Build LSTM model | |
| model = Sequential() | |
| model.add(LSTM(64, return_sequences=True, input_shape=(x_train.shape[1], 1))) | |
| model.add(Dropout(0.2)) | |
| model.add(LSTM(32, return_sequences=False)) | |
| model.add(Dropout(0.2)) | |
| model.add(Dense(1, activation="linear")) | |
| model.compile( | |
| optimizer="adam", loss="mean_squared_error", metrics=["mean_squared_error"] | |
| ) | |
| # Train the model (reduced epochs for faster response) | |
| model.fit(x_train, y_train, batch_size=32, epochs=5, verbose=0) | |
| # Prepare data for prediction (we'll predict for the entire dataset) | |
| x_all = [] | |
| for i in range(seq_len, len(scaled_data)): | |
| x_all.append(scaled_data[i - seq_len : i, 0]) | |
| x_all = np.array(x_all) | |
| x_all = np.reshape(x_all, (x_all.shape[0], x_all.shape[1], 1)) | |
| # Make predictions | |
| predictions = model.predict(x_all) | |
| predictions = scaler.inverse_transform(predictions) | |
| # Create full predictions array (including the initial sequence length) | |
| full_predictions = np.zeros((len(dataset), 1)) | |
| # Fill the first seq_len positions with actual values | |
| full_predictions[:seq_len, 0] = dataset[:seq_len, 0] | |
| # Fill the rest with predictions | |
| full_predictions[seq_len:, 0] = predictions.flatten() | |
| # Convert to list of floats for JSON response | |
| predict_prices = [float(price) for price in full_predictions.flatten()] | |
| return {"prediction": predict_prices} | |
| # For testing locally | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="127.0.0.1", port=8000) |