import os import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score import matplotlib.pyplot as plt from itertools import product import json # ------------------------- # Dataset # ------------------------- class StockDataset(Dataset): """Custom Dataset for stock price time-series forecasting.""" def __init__(self, series, seq_length): self.series = series self.seq_length = seq_length def __len__(self): return len(self.series) - self.seq_length def __getitem__(self, idx): x = self.series[idx:idx + self.seq_length] # Shape: (seq_length,) y = self.series[idx + self.seq_length] # Shape: scalar x = np.expand_dims(x, axis=0) # Shape: (1, seq_length) return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32) # ------------------------- # TCN Blocks # ------------------------- class TemporalBlock(nn.Module): """Temporal Convolutional Network block with causal dilated convolutions.""" def __init__(self, in_channels, out_channels, kernel_size, stride, dilation, dropout=0.2): super().__init__() padding = (kernel_size - 1) * dilation self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, dilation=dilation) self.relu1 = nn.ReLU() self.dropout1 = nn.Dropout(dropout) self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, stride=stride, padding=padding, dilation=dilation) self.relu2 = nn.ReLU() self.dropout2 = nn.Dropout(dropout) self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None self.relu = nn.ReLU() def forward(self, x): out = self.conv1(x) out = out[:, :, :x.size(2)] # Trim padding out = self.relu1(out) out = self.dropout1(out) out = self.conv2(out) out = out[:, :, :x.size(2)] # Trim padding out = self.relu2(out) out = self.dropout2(out) res = x if self.downsample is None else self.downsample(x) return self.relu(out + res) class TCN(nn.Module): """Temporal Convolutional Network for time-series forecasting.""" def __init__(self, input_size, output_size, num_channels, kernel_size=3, dropout=0.2): super().__init__() layers = [] num_levels = len(num_channels) for i in range(num_levels): dilation_size = 2 ** i in_channels = input_size if i == 0 else num_channels[i - 1] out_channels = num_channels[i] layers.append( TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size, dropout=dropout) ) self.network = nn.Sequential(*layers) self.linear = nn.Linear(num_channels[-1], output_size) def forward(self, x): out = self.network(x) out = out[:, :, -1] return self.linear(out) # ------------------------- # Forecaster # ------------------------- class StockPriceForecaster: """Stock price forecasting with TCN model.""" def __init__(self, dataset_path, seq_length=30, batch_size=32, lr=0.001, epochs=20, kernel_size=3, num_channels=[32, 64, 64], dropout=0.2, test_split=0.2): self.dataset_path = dataset_path self.seq_length = seq_length self.batch_size = batch_size self.lr = lr self.epochs = epochs self.kernel_size = kernel_size self.num_channels = num_channels self.dropout = dropout self.test_split = test_split self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.scaler = MinMaxScaler() def load_data(self): """Load and preprocess stock price data.""" if not os.path.exists(self.dataset_path): raise FileNotFoundError(f"Dataset file not found at: {self.dataset_path}") df = pd.read_csv(self.dataset_path) if "Close" not in df.columns: raise ValueError("CSV file must contain a 'Close' column") prices = df["Close"].values.reshape(-1, 1) prices_scaled = self.scaler.fit_transform(prices).flatten() dataset = StockDataset(prices_scaled, self.seq_length) train_size = int(len(dataset) * (1 - self.test_split)) test_size = len(dataset) - train_size train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size]) train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) return train_loader, test_loader def train(self, model, train_loader): """Train the TCN model.""" criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=self.lr) model.train() for epoch in range(self.epochs): epoch_loss = 0 for x, y in train_loader: x, y = x.to(self.device), y.to(self.device) optimizer.zero_grad() output = model(x) loss = criterion(output.squeeze(), y) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch [{epoch+1}/{self.epochs}], Loss: {epoch_loss/len(train_loader):.6f}") return model def evaluate(self, model, test_loader): """Evaluate the model on the test set.""" model.eval() predictions, actuals = [], [] with torch.no_grad(): for x, y in test_loader: x, y = x.to(self.device), y.to(self.device) output = model(x) predictions.extend(output.squeeze().cpu().numpy()) actuals.extend(y.cpu().numpy()) predictions = self.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten() actuals = self.scaler.inverse_transform(np.array(actuals).reshape(-1, 1)).flatten() mae = mean_absolute_error(actuals, predictions) rmse = mean_squared_error(actuals, predictions, squared=False) mape = np.mean(np.abs((actuals - predictions) / (actuals + 1e-10))) * 100 r2 = r2_score(actuals, predictions) return mae, rmse, mape, r2, actuals, predictions def run(self): """Run training and evaluation.""" train_loader, test_loader = self.load_data() model = TCN(input_size=1, output_size=1, num_channels=self.num_channels, kernel_size=self.kernel_size, dropout=self.dropout).to(self.device) trained_model = self.train(model, train_loader) return trained_model, self.evaluate(model, test_loader) # ------------------------- # Save Model for Hugging Face # ------------------------- def save_model_for_huggingface(model, scaler, config, save_dir="tcn_stock_model"): """Save the model and necessary components for Hugging Face deployment.""" os.makedirs(save_dir, exist_ok=True) # Save model weights torch.save(model.state_dict(), os.path.join(save_dir, "pytorch_model.bin")) # Save model configuration with open(os.path.join(save_dir, "config.json"), "w") as f: json.dump({ "input_size": 1, "output_size": 1, "num_channels": config["num_channels"], "kernel_size": config["kernel_size"], "dropout": config["dropout"], "seq_length": config["seq_length"] }, f, indent=4) # Save scaler for preprocessing import pickle with open(os.path.join(save_dir, "scaler.pkl"), "wb") as f: pickle.dump(scaler, f) print(f"Model saved to {save_dir}") # ------------------------- # Experiment Loop # ------------------------- if __name__ == "__main__": dataset_path = "/work/GOOGL.csv" # Update to your CSV path # Hyperparameter grid seq_lengths = [20, 50] batch_sizes = [16, 32] learning_rates = [0.001, 0.0005] kernel_sizes = [3, 5] num_channels_list = [[32, 64, 128], [64, 128, 256]] dropouts = [0.1, 0.2] results = [] best_result = None best_metrics = float('inf') # Track best RMSE best_model = None best_config = None # Run experiments for seq, batch, lr, kernel, channels, dropout in product( seq_lengths, batch_sizes, learning_rates, kernel_sizes, num_channels_list, dropouts ): print(f"\nRunning: seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}") try: forecaster = StockPriceForecaster( dataset_path=dataset_path, seq_length=seq, batch_size=batch, lr=lr, epochs=20, kernel_size=kernel, num_channels=channels, dropout=dropout, test_split=0.2 ) model, (mae, rmse, mape, r2, actuals, predictions) = forecaster.run() results.append({ "seq_length": seq, "batch_size": batch, "lr": lr, "kernel_size": kernel, "num_channels": str(channels), "dropout": dropout, "MAE": mae, "RMSE": rmse, "MAPE": mape, "R2": r2 }) if rmse < best_metrics: best_metrics = rmse best_result = (actuals, predictions, seq, batch, lr, kernel, channels, dropout) best_model = model best_config = { "seq_length": seq, "batch_size": batch, "lr": lr, "kernel_size": kernel, "num_channels": channels, "dropout": dropout } except Exception as e: print(f"Error with config seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}: {e}") continue # Save results df_results = pd.DataFrame(results) df_results.to_csv("tcn_experiments_results.csv", index=False) print("\nAll experiments done! Results saved to 'tcn_experiments_results.csv'") # Display metrics table print("\nMetrics Table:") pd.set_option('display.max_columns', None) pd.set_option('display.width', 1000) pd.set_option('display.float_format', '{:.6f}'.format) print(df_results) # Save best model for Hugging Face if best_model is not None: save_model_for_huggingface(best_model, forecaster.scaler, best_config) print(f"\nBest model saved with RMSE: {best_metrics:.6f}") print("\nBest configuration:") print(pd.Series(best_config)) # Plot best combination if best_result is not None: actuals, predictions, seq, batch, lr, kernel, channels, dropout = best_result plt.figure(figsize=(12, 6)) plt.plot(actuals, label="Actual Prices") plt.plot(predictions, label="Predicted Prices") plt.title(f"Best Model: seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}") plt.xlabel("Time Step") plt.ylabel("Price") plt.legend() plt.grid(True) plt.show() else: print("No successful experiments to plot.")