StockPredict / core /train_eval.py
aromidvar1355's picture
Update core/train_eval.py
5e3c87c verified
import numpy as np
import pandas as pd
import torch
from torch import nn, optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import os
def create_sequences(data, window_size, horizon=1):
X, y = [], []
for i in range(len(data) - window_size - horizon + 1):
X.append(data[i:i + window_size])
y.append(data[i + window_size:i + window_size + horizon].flatten())
return np.array(X), np.array(y)
def mean_absolute_percentage_error(y_true, y_pred):
"""Calculate MAPE, avoiding division by zero."""
y_true, y_pred = np.array(y_true), np.array(y_pred)
non_zero = np.abs(y_true) > 0
if np.sum(non_zero) == 0:
return np.nan # Return NaN if all true values are zero
return np.mean(np.abs((y_true[non_zero] - y_pred[non_zero]) / y_true[non_zero])) * 100
def train_and_evaluate(
df,
model_cls,
horizon=1,
hidden=64,
layers=1,
epochs=50,
lr=0.001,
beta1=0.9, # Added
beta2=0.999, # Added
weight_decay=0.01, # Added
dropout=0.2, # Added
window=30,
test_split=0.2,
device="cuda" if torch.cuda.is_available() else "cpu",
verbose=True
):
result = {}
original_values = df['value'].values.astype(np.float32)
scaler = StandardScaler()
scaled_data = scaler.fit_transform(original_values.reshape(-1, 1))
X, y = create_sequences(scaled_data, window, horizon)
print(f"X shape: {X.shape}, y shape: {y.shape}")
split = int(len(X) * (1 - test_split))
val_split = int(split * 0.9)
X_train, X_val, X_test = X[:val_split], X[val_split:split], X[split:]
y_train, y_val, y_test = y[:val_split], y[val_split:split], y[split:]
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val_tensor, y_val_tensor), batch_size=32, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=32, shuffle=False)
input_dim = X_train.shape[2] if X_train.ndim == 3 else 1
model = model_cls(input_size=input_dim, hidden_size=hidden, num_layers=layers, output_size=horizon, dropout=dropout).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, betas=(beta1, beta2), weight_decay=weight_decay)
loss_fn = nn.MSELoss()
train_losses = []
val_losses = []
best_val_loss = float('inf')
patience = 5
counter = 0
best_model_state = None
model.train()
for epoch in range(epochs):
epoch_loss = 0.0
for xb, yb in train_loader:
xb, yb = xb.to(device), yb.to(device)
optimizer.zero_grad()
out = model(xb)
loss = loss_fn(out, yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
train_losses.append(epoch_loss / len(train_loader))
model.eval()
val_loss = 0.0
with torch.no_grad():
for xb, yb in val_loader:
xb, yb = xb.to(device), yb.to(device)
out = model(xb)
loss = loss_fn(out, yb)
val_loss += loss.item()
val_loss /= len(val_loader)
val_losses.append(val_loss)
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}")
if val_loss < best_val_loss:
best_val_loss = val_loss
counter = 0
best_model_state = model.state_dict()
else:
counter += 1
if counter >= patience:
print(f"Early stopping at epoch {epoch+1}")
break
if best_model_state:
model.load_state_dict(best_model_state)
result["train_loss"] = train_losses
result["val_loss"] = val_losses
model.eval()
preds, targets = [], []
with torch.no_grad():
for xb, yb in test_loader:
xb = xb.to(device)
out = model(xb).cpu().numpy()
preds.append(out)
targets.append(yb.numpy())
preds = np.concatenate(preds, axis=0)
targets = np.concatenate(targets, axis=0)
print(f"Preds shape: {preds.shape}, Targets shape: {targets.shape}")
preds_reshaped = preds.reshape(-1, 1)
targets_reshaped = targets.reshape(-1, 1)
preds_inv = scaler.inverse_transform(preds_reshaped).reshape(preds.shape)
targets_inv = scaler.inverse_transform(targets_reshaped).reshape(targets.shape)
mse = mean_squared_error(targets_inv, preds_inv)
rmse = np.sqrt(mse)
mae = mean_absolute_error(targets_inv, preds_inv)
r2 = r2_score(targets_inv, preds_inv)
mape = mean_absolute_percentage_error(targets_inv, preds_inv)
result["metrics"] = {
"R2": round(r2, 4),
"RMSE": round(rmse, 4),
"MAE": round(mae, 4),
"MAPE": round(mape, 4) if not np.isnan(mape) else None
}
result["forecast"] = preds_inv
result["actual"] = targets_inv
result["predicted"] = result["forecast"]
latest_window = scaled_data[-window:].reshape(1, window, 1)
latest_input = torch.tensor(latest_window, dtype=torch.float32).to(device)
with torch.no_grad():
future_pred = model(latest_input).cpu().numpy()
future_pred_reshaped = future_pred.reshape(-1, 1)
future_pred_inv = scaler.inverse_transform(future_pred_reshaped).reshape(future_pred.shape)
result["latest_prediction"] = future_pred_inv[0].tolist()
return result