Scikit-learn
English
gnn-model / app.py
KTAparna's picture
Rename main.py to app.py
c5049b0 verified
import os
from typing import Tuple, List
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import GCNConv
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
import datetime as dt
import time
# -----------------------------------------------------
# 1. Data loading and preprocessing
# -----------------------------------------------------
def load_it_sector_data_from_csvs(
infy_csv: str,
tcs_csv: str,
nifty_it_csv: str,
) -> Tuple[np.ndarray, np.ndarray, List[pd.Timestamp], List[str]]:
"""Load IT sector data from separate CSV files and build cleaned feature + target tensors.
Methodology alignment
---------------------
- Data Collection: uses OHLCV-style fields from the NSE IT sector file.
- Preprocessing / Cleaning:
* Parse dates and sort.
* Filter to equity series (EQ).
* Remove duplicates and rows with missing / invalid key values.
* Filter out non-trading days (zero / negative volume).
* Forward-fill remaining gaps.
- Derived Indicators:
* Daily returns.
* 5-day moving average of close.
* 20-day rolling volatility of returns.
Returns
-------
features : np.ndarray
Shape [num_dates, num_companies, num_features].
Features per company per day include normalized price/volume and indicators.
targets : np.ndarray
Shape [num_dates, num_companies]. Daily returns per company (prediction target).
dates : list of pd.Timestamp
Trading dates.
companies : list of str
List of company tickers (node names in the graph).
"""
# ---------------------------------
# Load individual CSVs
# ---------------------------------
infy_df = pd.read_csv(infy_csv)
tcs_df = pd.read_csv(tcs_csv)
index_df = pd.read_csv(nifty_it_csv)
# Add a Company identifier manually
infy_df["Company"] = "INFY"
tcs_df["Company"] = "TCS"
index_df["Company"] = "NIFTY_IT"
# Harmonize columns where needed for the index
# Ensure required OHLCV columns exist (use Close/Volume, ignore others if missing)
for df in [infy_df, tcs_df, index_df]:
df["Date"] = pd.to_datetime(df["Date"])
# For the index, mimic equity-style columns for compatibility
if "Series" not in index_df.columns:
index_df["Series"] = "EQ"
if "Close" not in index_df.columns and "Close" in index_df.columns:
# Already present; this branch is just a safety net
pass
if "Volume" not in index_df.columns and "Volume" in index_df.columns:
# Already present; just a safety net
pass
# Unify columns to a common subset
common_cols = [
"Date",
"Company",
"Series",
"Open",
"High",
"Low",
"Close",
"Volume",
]
# For stock CSVs, ensure the above columns are present
for stock_df in [infy_df, tcs_df]:
# They already have Symbol, Series, Prev Close, Open, High, Low, Last, Close, VWAP, Volume, ...
# We just keep the columns we need and drop the rest later.
pass
# For index, keep only the needed OHLCV columns and Series/Company
index_df = index_df[["Date", "Open", "High", "Low", "Close", "Volume", "Company", "Series"]]
# Make sure column order matches common_cols
index_df = index_df[["Date", "Company", "Series", "Open", "High", "Low", "Close", "Volume"]]
# Align stock DataFrames to the same schema
infy_df = infy_df[["Date", "Company", "Series", "Open", "High", "Low", "Close", "Volume"]]
tcs_df = tcs_df[["Date", "Company", "Series", "Open", "High", "Low", "Close", "Volume"]]
# Concatenate all into one panel-like table
df = pd.concat([infy_df, tcs_df, index_df], ignore_index=True)
# -------------------------
# Basic cleaning steps
# -------------------------
# Ensure proper dtypes and ordering
df["Date"] = pd.to_datetime(df["Date"])
# Keep only equity series
if "Series" in df.columns:
df = df[df["Series"] == "EQ"]
# Drop rows with critical missing values
df = df.dropna(subset=["Company", "Close", "Volume", "Open", "High", "Low"])
# Remove zero / negative volume (non-trading or bad records)
df = df[df["Volume"] > 0]
# Drop exact duplicates on (Date, Company)
df = df.drop_duplicates(subset=["Date", "Company"])
# Sort by date then company
df = df.sort_values(["Date", "Company"])
# Use the "Company" column as canonical ticker (INFY, TCS, HCLTECH, TECHM, WIPRO, ...)
companies = sorted(df["Company"].unique().tolist())
# Pivot to Date x Company for OHLCV-like data
close = df.pivot_table(index="Date", columns="Company", values="Close")
volume = df.pivot_table(index="Date", columns="Company", values="Volume")
# Ensure consistent column order
close = close[companies]
volume = volume[companies]
# Forward-fill missing values along time for each company
close = close.ffill()
volume = volume.ffill()
# -------------------------
# Derived indicators
# -------------------------
# 1-day simple returns (percentage change)
returns = close.pct_change().replace([np.inf, -np.inf], np.nan).fillna(0.0)
# 5-day moving average of closing price (trend)
ma5 = close.rolling(window=5, min_periods=1).mean().ffill()
# 20-day rolling volatility of returns (risk)
vol20 = (
returns.rolling(window=20, min_periods=1)
.std()
.replace([np.inf, -np.inf], np.nan)
.fillna(0.0)
.ffill()
)
# -------------------------
# Normalization per company
# -------------------------
scaler_close = StandardScaler()
scaler_vol = StandardScaler()
scaler_ma5 = StandardScaler()
scaler_vol20 = StandardScaler()
close_scaled = pd.DataFrame(
scaler_close.fit_transform(close.values),
index=close.index,
columns=close.columns,
)
volume_scaled = pd.DataFrame(
scaler_vol.fit_transform(volume.values),
index=volume.index,
columns=volume.columns,
)
ma5_scaled = pd.DataFrame(
scaler_ma5.fit_transform(ma5.values),
index=ma5.index,
columns=ma5.columns,
)
vol20_scaled = pd.DataFrame(
scaler_vol20.fit_transform(vol20.values),
index=vol20.index,
columns=vol20.columns,
)
dates = close.index.to_list()
num_dates = len(dates)
num_companies = len(companies)
# Features per node per day:
# [normalized close, normalized volume, raw return, normalized MA5, normalized VOL20]
num_features = 5
features = np.zeros((num_dates, num_companies, num_features), dtype=np.float32)
for j, c in enumerate(companies):
features[:, j, 0] = close_scaled[c].values
features[:, j, 1] = volume_scaled[c].values
features[:, j, 2] = returns[c].values
features[:, j, 3] = ma5_scaled[c].values
features[:, j, 4] = vol20_scaled[c].values
targets = returns.values.astype(np.float32) # predict daily returns
return features, targets, dates, companies
# -----------------------------------------------------
# 2. Graph construction (correlation-based)
# -----------------------------------------------------
def build_correlation_graph(returns: np.ndarray, threshold: float = 0.2) -> torch.Tensor:
"""Build an undirected graph of companies based on return correlations.
Parameters
----------
returns : np.ndarray
Array of shape [num_dates, num_companies] with daily returns.
threshold : float
Minimum absolute correlation to create an edge.
Returns
-------
edge_index : torch.Tensor
Tensor of shape [2, num_edges] in COO format for PyTorch Geometric.
"""
# Correlation across companies
corr = np.corrcoef(returns.T) # [num_companies, num_companies]
num_nodes = corr.shape[0]
edge_index_list = []
for i in range(num_nodes):
for j in range(num_nodes):
if i == j:
continue
if np.abs(corr[i, j]) >= threshold:
edge_index_list.append([i, j])
# Fallback: fully-connected graph (without self-loops) if threshold is too high
if len(edge_index_list) == 0:
for i in range(num_nodes):
for j in range(num_nodes):
if i != j:
edge_index_list.append([i, j])
edge_index = torch.tensor(edge_index_list, dtype=torch.long).t().contiguous()
return edge_index
# -----------------------------------------------------
# 3. Dataset for time-windowed graph snapshots
# -----------------------------------------------------
class TimeSeriesGraphDataset(torch.utils.data.Dataset):
"""Dataset that converts time series into windowed graph snapshots for GNNs.
Each item is a Data object with:
- x: node features [num_nodes, window_size * num_features]
- edge_index: static company correlation graph
- y: target returns [num_nodes]
"""
def __init__(
self,
features: np.ndarray,
targets: np.ndarray,
edge_index: torch.Tensor,
window_size: int,
start_t: int,
end_t: int,
) -> None:
super().__init__()
self.features = features
self.targets = targets
self.edge_index = edge_index
self.window_size = window_size
self.start_t = start_t
self.end_t = end_t
def __len__(self) -> int:
return self.end_t - self.start_t
def __getitem__(self, idx: int) -> Data:
t = self.start_t + idx
# Use previous `window_size` days to predict returns at day t
window_feats = self.features[t - self.window_size : t] # [W, N, F]
window, num_nodes, num_feat = window_feats.shape
# Keep the temporal dimension for LSTM-based encoding.
# Shape: [num_nodes, window, num_feat]
x_seq = window_feats.transpose(1, 0, 2)
y = self.targets[t] # [num_nodes]
data = Data(
x=torch.from_numpy(x_seq), # [num_nodes, window, num_feat]
edge_index=self.edge_index,
y=torch.from_numpy(y),
)
return data
# -----------------------------------------------------
# 4. GNN model definition (GCN for regression)
# -----------------------------------------------------
class GNNTimeSeriesModel(nn.Module):
"""LSTM + GCN hybrid for multi-node time-series regression.
Methodology alignment
---------------------
- Temporal Feature Extraction: shared LSTM encodes each stock's past W days.
- GNN Application: GCN layers propagate information over the inter-stock graph.
- Prediction: per-node regression head outputs next-day return.
"""
def __init__(
self,
window_size: int,
num_features: int,
hidden_lstm: int = 64,
hidden_gnn: int = 64,
dropout: float = 0.2,
) -> None:
super().__init__()
self.window_size = window_size
self.num_features = num_features
# Temporal encoder: LSTM over W x F for each stock
self.lstm = nn.LSTM(
input_size=num_features,
hidden_size=hidden_lstm,
num_layers=1,
batch_first=False, # we will feed [W, N, F]
)
# Graph convolution layers operating on LSTM embeddings
self.conv1 = GCNConv(hidden_lstm, hidden_gnn)
self.conv2 = GCNConv(hidden_gnn, hidden_gnn)
self.lin = nn.Linear(hidden_gnn, 1)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor, edge_index: torch.Tensor) -> torch.Tensor:
"""Forward pass.
Parameters
----------
x : torch.Tensor
Shape [num_nodes_total_in_batch, window, num_features].
edge_index : torch.Tensor
Graph edges for the batched graph.
"""
# -----------------------------
# Temporal feature extraction
# -----------------------------
# x_seq: [num_nodes_total, window, num_features]
num_nodes_total, window, num_feat = x.shape
assert (
window == self.window_size and num_feat == self.num_features
), "Input window/feature dims do not match model configuration."
# LSTM expects [seq_len, batch, input_size]
x_seq = x.permute(1, 0, 2) # [window, num_nodes_total, num_features]
_, (h_n, _) = self.lstm(x_seq)
# Last layer hidden state: [num_nodes_total, hidden_lstm]
h_last = h_n[-1]
# -----------------------------
# Graph convolution over stocks
# -----------------------------
x_g = self.conv1(h_last, edge_index)
x_g = torch.relu(x_g)
x_g = self.dropout(x_g)
x_g = self.conv2(x_g, edge_index)
x_g = torch.relu(x_g)
x_g = self.dropout(x_g)
out = self.lin(x_g).squeeze(-1) # [num_nodes_total]
return out
# -----------------------------------------------------
# 5. Training and evaluation utilities
# -----------------------------------------------------
def train_one_epoch(
model: nn.Module,
loader: DataLoader,
optimizer: torch.optim.Optimizer,
device: torch.device,
) -> float:
model.train()
criterion = nn.MSELoss()
total_loss = 0.0
for batch in loader:
batch = batch.to(device)
optimizer.zero_grad()
out = model(batch.x, batch.edge_index)
loss = criterion(out, batch.y)
loss.backward()
optimizer.step()
total_loss += loss.item() * batch.num_graphs
avg_loss = total_loss / len(loader.dataset)
return avg_loss
def evaluate(
model: nn.Module,
loader: DataLoader,
device: torch.device,
):
model.eval()
criterion = nn.MSELoss()
total_loss = 0.0
all_y_true = []
all_y_pred = []
with torch.no_grad():
for batch in loader:
batch = batch.to(device)
out = model(batch.x, batch.edge_index)
loss = criterion(out, batch.y)
total_loss += loss.item() * batch.num_graphs
all_y_true.append(batch.y.cpu().numpy())
all_y_pred.append(out.cpu().numpy())
y_true = np.concatenate(all_y_true)
y_pred = np.concatenate(all_y_pred)
# -------------------------------------------------
# Guard against NaN/Inf in predictions or targets
# -------------------------------------------------
mask = np.isfinite(y_true) & np.isfinite(y_pred)
if mask.sum() == 0:
# Fallback: avoid crashing; metrics will be NaN but training can continue
mse = float("nan")
mae = float("nan")
directional_accuracy = float("nan")
avg_loss = total_loss / max(len(loader.dataset), 1)
return avg_loss, mse, mae, directional_accuracy, y_true, y_pred
y_true_clean = y_true[mask]
y_pred_clean = y_pred[mask]
mse = mean_squared_error(y_true_clean, y_pred_clean)
mae = mean_absolute_error(y_true_clean, y_pred_clean)
# Directional accuracy: how often the sign of return is predicted correctly
directional_accuracy = float((np.sign(y_true_clean) == np.sign(y_pred_clean)).mean())
avg_loss = total_loss / len(loader.dataset)
return avg_loss, mse, mae, directional_accuracy, y_true_clean, y_pred_clean
# -----------------------------------------------------
# 6. Baseline (before GNN) and real-time helpers
# -----------------------------------------------------
def compute_naive_baseline_metrics(targets: np.ndarray, train_start: int, train_end: int, val_start: int, val_end: int, test_start: int, test_end: int):
"""Compute a simple baseline: predict zero return (no change) and plot vs actual.
This represents a "before GNN" naive model where we assume next-day return = 0.
"""
# Flatten across all nodes
y_train = targets[train_start:train_end].reshape(-1)
y_val = targets[val_start:val_end].reshape(-1)
y_test = targets[test_start:test_end].reshape(-1)
# Baseline predictions are all zeros
y_train_pred = np.zeros_like(y_train)
y_val_pred = np.zeros_like(y_val)
y_test_pred = np.zeros_like(y_test)
train_mse = mean_squared_error(y_train, y_train_pred)
val_mse = mean_squared_error(y_val, y_val_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
# Plot for test set
plt.figure(figsize=(6, 6))
plt.scatter(y_test, y_test_pred, alpha=0.3, s=10)
plt.xlabel("Actual returns")
plt.ylabel("Predicted returns (baseline: 0)")
plt.title("Baseline (No GNN) Predicted vs Actual Returns")
lims = [min(y_test.min(), y_test_pred.min()), max(y_test.max(), y_test_pred.max())]
plt.plot(lims, lims, "r--", linewidth=1)
plt.tight_layout()
plt.savefig("baseline_pred_vs_actual.png", dpi=200)
plt.close()
print(f"Baseline Train MSE: {train_mse:.6f}, Val MSE: {val_mse:.6f}, Test MSE: {test_mse:.6f}")
print("Saved baseline scatter plot to baseline_pred_vs_actual.png")
def realtime_predict_last_window(
model: nn.Module,
features: np.ndarray,
edge_index: torch.Tensor,
window_size: int,
device: torch.device,
):
"""Generate a real-time style prediction for the latest available day.
This uses the most recent `window_size` days in `features` as if it were "live" data.
"""
model.eval()
num_dates, num_nodes, num_feat = features.shape
if num_dates < window_size:
raise ValueError("Not enough data points for real-time window prediction.")
# Last window
window_feats = features[num_dates - window_size : num_dates] # [W, N, F]
window, N, F = window_feats.shape
x_seq = window_feats.transpose(1, 0, 2) # [N, W, F]
data = Data(
x=torch.from_numpy(x_seq).to(device),
edge_index=edge_index.to(device),
)
with torch.no_grad():
out = model(data.x, data.edge_index).cpu().numpy()
return out # [num_nodes]
# -----------------------------------------------------
# 7. Main experiment pipeline
# -----------------------------------------------------
def main():
infy_csv = "infy_stock.csv"
tcs_csv = "tcs_stock.csv"
nifty_it_csv = "nifty_it_index.csv"
for p in [infy_csv, tcs_csv, nifty_it_csv]:
if not os.path.exists(p):
raise FileNotFoundError(f"Could not find required CSV file: {p}")
print("Loading and preprocessing data from CSVs...")
features, targets, dates, companies = load_it_sector_data_from_csvs(
infy_csv=infy_csv,
tcs_csv=tcs_csv,
nifty_it_csv=nifty_it_csv,
)
num_dates, num_companies, num_features = features.shape
print(f"Num dates: {num_dates}, Num companies (nodes): {num_companies}, Num features: {num_features}")
# Build graph from training-period correlations only (to avoid look-ahead bias)
window_size = 20
if num_dates <= window_size + 1:
raise ValueError("Not enough dates to create time windows. Reduce window_size or use more data.")
first_t = window_size
last_t = num_dates - 1
total_samples = last_t - first_t + 1
train_samples = int(total_samples * 0.7)
val_samples = int(total_samples * 0.15)
test_samples = total_samples - train_samples - val_samples
train_start_t = first_t
train_end_t = train_start_t + train_samples
val_start_t = train_end_t
val_end_t = val_start_t + val_samples
test_start_t = val_end_t
test_end_t = last_t + 1
print(f"Total usable samples: {total_samples}")
print(f"Train: {train_samples}, Val: {val_samples}, Test: {test_samples}")
# -----------------------------
# Baseline (before GNN)
# -----------------------------
compute_naive_baseline_metrics(
targets,
train_start=train_start_t,
train_end=train_end_t,
val_start=val_start_t,
val_end=val_end_t,
test_start=test_start_t,
test_end=test_end_t,
)
# Use only training period to compute correlations
train_returns = targets[train_start_t:train_end_t]
edge_index = build_correlation_graph(train_returns, threshold=0.2)
print("Edge index shape:", edge_index.shape)
# Create datasets
train_dataset = TimeSeriesGraphDataset(
features=features,
targets=targets,
edge_index=edge_index,
window_size=window_size,
start_t=train_start_t,
end_t=train_end_t,
)
val_dataset = TimeSeriesGraphDataset(
features=features,
targets=targets,
edge_index=edge_index,
window_size=window_size,
start_t=val_start_t,
end_t=val_end_t,
)
test_dataset = TimeSeriesGraphDataset(
features=features,
targets=targets,
edge_index=edge_index,
window_size=window_size,
start_t=test_start_t,
end_t=test_end_t,
)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model = GNNTimeSeriesModel(
window_size=window_size,
num_features=num_features,
hidden_lstm=64,
hidden_gnn=64,
dropout=0.2,
).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
num_epochs = 30
best_val_loss = float("inf")
best_state_dict = None
print("Starting training...")
for epoch in range(1, num_epochs + 1):
train_loss = train_one_epoch(model, train_loader, optimizer, device)
val_loss, val_mse, val_mae, val_dir_acc, _, _ = evaluate(model, val_loader, device)
if val_loss < best_val_loss:
best_val_loss = val_loss
best_state_dict = {k: v.cpu().clone() for k, v in model.state_dict().items()}
print(
f"Epoch {epoch:03d} | "
f"Train Loss: {train_loss:.6f} | "
f"Val Loss: {val_loss:.6f}, Val MSE: {val_mse:.6f}, Val MAE: {val_mae:.6f}, "
f"Val DirAcc: {val_dir_acc:.4f}"
)
if best_state_dict is not None:
model.load_state_dict(best_state_dict)
print("Evaluating on test set...")
test_loss, test_mse, test_mae, test_dir_acc, y_true, y_pred = evaluate(model, test_loader, device)
print(
f"Test Loss: {test_loss:.6f}, Test MSE: {test_mse:.6f}, "
f"Test MAE: {test_mae:.6f}, Test DirAcc: {test_dir_acc:.4f}"
)
# -------------------------------------------------
# Simple visualization: predicted vs actual returns
# -------------------------------------------------
plt.figure(figsize=(6, 6))
plt.scatter(y_true, y_pred, alpha=0.3, s=10)
plt.xlabel("Actual returns")
plt.ylabel("Predicted returns")
plt.title("GNN Predicted vs Actual Daily Returns (All IT Stocks)")
lims = [min(y_true.min(), y_pred.min()), max(y_true.max(), y_pred.max())]
plt.plot(lims, lims, "r--", linewidth=1)
plt.tight_layout()
plt.savefig("gnn_it_sector_pred_vs_actual.png", dpi=200)
plt.close()
print("Saved scatter plot to gnn_it_sector_pred_vs_actual.png")
# -------------------------------------------------
# Real-time style prediction using latest window
# -------------------------------------------------
latest_pred = realtime_predict_last_window(
model=model,
features=features,
edge_index=edge_index,
window_size=window_size,
device=device,
)
print("Real-time style next-day return prediction per node (order of companies):")
for comp, val in zip(companies, latest_pred):
print(f" {comp}: {val:.6f}")
if __name__ == "__main__":
main()