Spaces:
Sleeping
Sleeping
File size: 2,968 Bytes
4204769 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# inference.py (final robust)
import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
def arima_forecast(ts_data, order=(5,1,0), steps=5):
ts_series = pd.Series(ts_data)
model = ARIMA(ts_series, order=order)
model_fit = model.fit()
forecast = model_fit.forecast(steps=steps)
return forecast.tolist()
def lstm_forecast(ts_data, look_back=60, steps=5, epochs=20):
# Adjust look_back if input is too short
if len(ts_data) < look_back + 1:
look_back = max(1, len(ts_data) - 1)
# Normalize
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(np.array(ts_data).reshape(-1,1))
# Create sequences
def create_sequences(dataset, look_back):
X, Y = [], []
for i in range(len(dataset) - look_back):
X.append(dataset[i:(i+look_back), 0])
Y.append(dataset[i + look_back, 0])
return np.array(X), np.array(Y)
X, y = create_sequences(scaled_data, look_back)
if X.size == 0:
# fallback: train on the whole series as single sample (not ideal but avoids crash)
X = scaled_data[:-1].reshape(1, -1, 1)
y = np.array([scaled_data[-1,0]])
else:
X = X.reshape((X.shape[0], X.shape[1], 1))
# Build LSTM
model = Sequential()
# prefer Input layer to avoid warnings
model.add(LSTM(50, return_sequences=True, input_shape=(X.shape[1], 1)))
model.add(LSTM(50))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
# Train
model.fit(X, y, epochs=epochs, batch_size=32, verbose=0)
# Forecast future steps
last_seq = scaled_data[-look_back:].reshape(1, look_back, 1)
predictions = []
for _ in range(steps):
pred = model.predict(last_seq, verbose=0) # pred shape can vary by TF version
val = float(np.asarray(pred).reshape(-1)[0]) # extract scalar safely
predictions.append(val)
# make pred into shape (1,1,1)
pred_reshaped = np.asarray(pred).reshape(1,1,1)
last_seq = np.concatenate([last_seq[:,1:,:], pred_reshaped], axis=1)
predictions = scaler.inverse_transform(np.array(predictions).reshape(-1,1))
return predictions.flatten().tolist()
def infer(model_type: str, input_data: list, steps: int = 5, epochs: int = 20):
"""
model_type: 'arima' or 'lstm'
input_data: list of recent stock prices
steps: number of future days to forecast
epochs: LSTM training epochs (only used for LSTM)
"""
if model_type.lower() == 'arima':
return arima_forecast(input_data, steps=steps)
elif model_type.lower() == 'lstm':
return lstm_forecast(input_data, steps=steps, epochs=epochs)
else:
return {"error": "Invalid model_type. Use 'arima' or 'lstm'."}
|