LightingProduct / model.py
hari31416's picture
Upload 2 files
ff6d1a9
from transformers import AutoTokenizer, AutoModel
from datasets import load_dataset, Dataset, concatenate_datasets
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score,
precision_score,
)
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
import torch.nn as nn
import torchmetrics
from torch.optim.lr_scheduler import CosineAnnealingLR
import numpy as np
import pandas as pd
import os
import pickle
import argparse
from torch_train import TorchTrain
from utilities import get_simple_logger
FILE_DIR = os.path.dirname(os.path.realpath(__file__))
DATA_DIR = os.path.join(FILE_DIR, "data")
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
random_state = 42
# set random state
np.random.seed(random_state)
torch.manual_seed(random_state)
class PDFDataLoader:
"""A class that can be used to load the data to torch model. This will be used in the `PDFDataSet` class to create the final datasets."""
def __init__(self, df):
self.df = df
def __getitem__(self, idx):
"""Gets the `idx` embedding and labels, converts them to the required format and returns them."""
row = self.df[idx]
embeddings = row["embeddings"]
label = row["label"]
# convert to torch int
label = np.array(label)
# add extra dimension to label
label = np.expand_dims(label, axis=0)
embeddings = torch.from_numpy(np.array(embeddings)).float()
return embeddings.to(device), torch.from_numpy(label).to(device).float()
def __len__(self):
return len(self.df)
class PDFDataSet:
def __init__(
self,
data_dir=DATA_DIR,
fraction_test_data_in_train=0.2,
model_ckpt="encoder",
) -> None:
self.data_dir = data_dir
self.fraction_test_data_in_train = fraction_test_data_in_train
self.model_ckpt = model_ckpt
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
encoding_model = AutoModel.from_pretrained(model_ckpt)
encoding_model = encoding_model.to(device)
encoding_model = encoding_model.eval()
self.encoding_model = encoding_model
self.tokenizer = tokenizer
self.logger = get_simple_logger("pdf_dataset")
def create_datasets(self):
train_data_path = os.path.join(FILE_DIR, self.data_dir, "train.csv")
test_data_path = os.path.join(FILE_DIR, self.data_dir, "test.csv")
df = pd.read_csv(train_data_path)
test_df = pd.read_csv(test_data_path)
train_df, validation_df = train_test_split(df, test_size=0.3, random_state=42)
if self.fraction_test_data_in_train:
self.logger.info(
f"Adding {self.fraction_test_data_in_train} fraction of test dataset to the training set."
)
test_df, test_df_for_training = train_test_split(
test_df, test_size=self.fraction_test_data_in_train, random_state=42
)
train_df = pd.concat([train_df, test_df_for_training])
train_dataset = Dataset.from_pandas(train_df)
validation_dataset = Dataset.from_pandas(validation_df)
test_dataset = Dataset.from_pandas(test_df)
return train_dataset, validation_dataset, test_dataset
def mean_pooling(self, model_output, attention_mask):
token_embeddings = model_output[
0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
def sentences_to_embedding(self, sentences):
# Tokenize sentences
encoded_input = self.tokenizer(
sentences, padding=True, truncation=True, return_tensors="pt"
)
sentence_embeddings = self.mean_pooling(
self.encoding_model(**encoded_input), encoded_input["attention_mask"]
)
sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
# remove last dimension
sentence_embeddings = sentence_embeddings.squeeze()
return sentence_embeddings.detach()
def get_embeddings(self, row):
return {
"embeddings": self.sentences_to_embedding(
sentences=row["content"],
)
}
def create_embeddings(self):
train_dataset, validation_dataset, test_dataset = self.create_datasets()
train_dataset = train_dataset.map(self.get_embeddings)
validation_dataset = validation_dataset.map(self.get_embeddings)
test_dataset = test_dataset.map(self.get_embeddings)
return train_dataset, validation_dataset, test_dataset
class PDFModel(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(PDFModel, self).__init__()
self.seq_model = nn.Sequential()
for i, hidden_size in enumerate(hidden_sizes):
self.seq_model.add_module(f"linear_{i}", nn.Linear(input_size, hidden_size))
self.seq_model.add_module(f"relu_{i}", nn.ReLU())
input_size = hidden_size
self.last_layer = nn.Linear(input_size, output_size)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
seq_out = self.seq_model(x)
out = self.last_layer(seq_out)
return self.sigmoid(out)
def evaluate_model(y_true, y_pred, model_name, split="train"):
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
classification_report_ = classification_report(y_true, y_pred)
print("------" * 10)
print(f"Evaluating for the model: {model_name} for {split} dataset...")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(classification_report_)
print("------" * 10)
def train_dl_model(
train_data,
validation_data,
epochs=30,
input_shape=384,
hidden_sizes=[32, 16],
):
model = PDFModel(input_size=input_shape, hidden_sizes=hidden_sizes, output_size=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCELoss()
accuracy = torchmetrics.Accuracy(
task="binary", num_classes=2, threshold=0.5, average="macro"
)
precision = torchmetrics.Precision(task="binary", average="macro")
metrics = {
"accuracy": accuracy,
"precision": precision,
}
scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=0.0001)
tt = TorchTrain(model, optimizer, loss_fn, metrics=metrics, scheduler=scheduler)
history = tt.fit(train_data, validation_data, verbose=True, epochs=epochs)
return history, model
def evaluate_models(fraction_test_data_in_train=0.1):
print("Creating Embeddings...")
ds = PDFDataSet(fraction_test_data_in_train=fraction_test_data_in_train)
train_dataset, validation_dataset, test_dataset = ds.create_embeddings()
print("Done\n")
print("Training DL Model")
# Create dataset for DL models:
BATCH_SIZE = 8
train_dataloader = PDFDataLoader(train_dataset)
validation_dataloader = PDFDataLoader(validation_dataset)
test_dataloader = PDFDataLoader(test_dataset)
train_data = DataLoader(train_dataloader, batch_size=BATCH_SIZE, shuffle=True)
validation_data = DataLoader(
validation_dataloader,
batch_size=BATCH_SIZE,
shuffle=True,
)
test_data = DataLoader(test_dataloader, batch_size=BATCH_SIZE, shuffle=True)
for X, y in train_data:
input_shape = int(X.shape[1])
output_shape = int(y.shape[1])
break
epochs = 30
hidden_sizes = [32, 16]
history, model = train_dl_model(
train_data=train_data,
validation_data=validation_data,
epochs=epochs,
hidden_sizes=hidden_sizes,
)
print("Done\n")
print("Evaluating DL Model")
y_test_pred = model(torch.from_numpy(np.array(test_dataset["embeddings"])).float())
y_test_pred = y_test_pred.detach().numpy()
y_test_pred = np.where(y_test_pred > 0.5, 1, 0)
evaluate_model(
y_true=test_dataset["label"],
y_pred=y_test_pred,
model_name="DL Model",
split="test",
)
print("Done\n")
# ML Models
print("Training and evaluating ML Models.")
X_train = train_dataset["embeddings"]
y_train = train_dataset["label"]
X_validation = validation_dataset["embeddings"]
y_validation = validation_dataset["label"]
X_test = test_dataset["embeddings"]
y_test = test_dataset["label"]
rfc_best_params = {
"max_depth": 23,
"max_features": "log2",
"n_estimators": 469,
}
xgb_best_params = {
"max_depth": 25,
"n_estimators": 372,
"learning_rate": 0.2522824287799319,
}
print("Fitting RandomForest")
rfc = RandomForestClassifier(**rfc_best_params)
rfc.fit(X_train, y_train)
evaluate_model(
y_true=y_train,
y_pred=rfc.predict(X_train),
model_name="RandomForest",
split="train",
)
evaluate_model(
y_true=y_validation,
y_pred=rfc.predict(X_validation),
model_name="RandomForest",
split="validation",
)
evaluate_model(
y_true=y_test,
y_pred=rfc.predict(X_test),
model_name="RandomForest",
split="test",
)
print("Fitting XGBoost")
xgb = XGBClassifier(**xgb_best_params)
xgb.fit(X_train, y_train)
evaluate_model(
y_true=y_train,
y_pred=xgb.predict(X_train),
model_name="XGBoost",
split="train",
)
evaluate_model(
y_true=y_validation,
y_pred=xgb.predict(X_validation),
model_name="XGBoost",
split="validation",
)
evaluate_model(
y_true=y_test,
y_pred=xgb.predict(X_test),
model_name="XGBoost",
split="test",
)
print("All Done")
def train_and_save_final_model(model_save_path="final_model.pkl"):
"""This method creats and save the final model. The final model has the following characterstics:
- It is a RandomForestClassifier trained on all the training data and 10% of the test data. 10% of the test data. The 10% of test data is necessary as the distribution of the test data is very different from the training data.
- Since 10% of test data is used while training, this data is not used while claculating the final accuracy of the model, which is 100%.
Parameters
----------
model_save_path : str, optional
The path to save the final model, by default "final_model.pkl"
Returns
-------
None
Examples
--------
>>> train_and_save_final_model()
>>> train_and_save_final_model(model_save_path="final_model.pkl")
"""
print("Creating Embeddings...")
model_save_path = os.path.join(FILE_DIR, model_save_path)
ds = PDFDataSet(fraction_test_data_in_train=0.1)
train_dataset, validation_dataset, test_dataset = ds.create_embeddings()
train_dataset = concatenate_datasets([train_dataset, validation_dataset])
X_train = train_dataset["embeddings"]
X_test = test_dataset["embeddings"]
y_train = train_dataset["label"]
y_test = test_dataset["label"]
print("Training and evaluating the model...")
rfc_best_params = {
"max_depth": 23,
"max_features": "log2",
"n_estimators": 469,
}
rfc_model = RandomForestClassifier(**rfc_best_params)
rfc_model.fit(X_train, y_train)
evaluate_model(
y_true=y_train,
y_pred=rfc_model.predict(X_train),
model_name="Final Model",
split="train",
)
evaluate_model(
y_true=y_test,
y_pred=rfc_model.predict(X_test),
model_name="Final Model",
split="test",
)
print("Saving the model...")
with open(model_save_path, "wb") as f:
pickle.dump(rfc_model, f)
print(f"Model saved to: {model_save_path}")
def main(args):
task = args.task
if task == "train":
model_save_path = args.model_save_path
train_and_save_final_model(model_save_path=model_save_path)
elif task == "evaluate":
fraction_test_data_in_train = args.fraction
evaluate_models(fraction_test_data_in_train)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Train and evaluate models")
parser.add_argument(
"--task",
type=str,
choices=["train", "evaluate"],
required=True,
help="Whether to train and save the best model or evaluate all the models.",
)
parser.add_argument(
"--fraction",
type=float,
default=0.1,
help="Fraction of test data in train dataset",
)
parser.add_argument(
"--model_save_path",
type=str,
default="final_model.pkl",
help="Path to save the final model",
)
args = parser.parse_args()
main(args)