Spaces:
Runtime error
Runtime error
| from transformers import AutoTokenizer, AutoModel | |
| from datasets import load_dataset, Dataset, concatenate_datasets | |
| import torch | |
| import torch.nn.functional as F | |
| from torch.utils.data import DataLoader | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import ( | |
| classification_report, | |
| confusion_matrix, | |
| accuracy_score, | |
| precision_score, | |
| ) | |
| from sklearn.ensemble import RandomForestClassifier | |
| from xgboost import XGBClassifier | |
| import torch.nn as nn | |
| import torchmetrics | |
| from torch.optim.lr_scheduler import CosineAnnealingLR | |
| import numpy as np | |
| import pandas as pd | |
| import os | |
| import pickle | |
| import argparse | |
| from torch_train import TorchTrain | |
| from utilities import get_simple_logger | |
| FILE_DIR = os.path.dirname(os.path.realpath(__file__)) | |
| DATA_DIR = os.path.join(FILE_DIR, "data") | |
| device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| random_state = 42 | |
| # set random state | |
| np.random.seed(random_state) | |
| torch.manual_seed(random_state) | |
| class PDFDataLoader: | |
| """A class that can be used to load the data to torch model. This will be used in the `PDFDataSet` class to create the final datasets.""" | |
| def __init__(self, df): | |
| self.df = df | |
| def __getitem__(self, idx): | |
| """Gets the `idx` embedding and labels, converts them to the required format and returns them.""" | |
| row = self.df[idx] | |
| embeddings = row["embeddings"] | |
| label = row["label"] | |
| # convert to torch int | |
| label = np.array(label) | |
| # add extra dimension to label | |
| label = np.expand_dims(label, axis=0) | |
| embeddings = torch.from_numpy(np.array(embeddings)).float() | |
| return embeddings.to(device), torch.from_numpy(label).to(device).float() | |
| def __len__(self): | |
| return len(self.df) | |
| class PDFDataSet: | |
| def __init__( | |
| self, | |
| data_dir=DATA_DIR, | |
| fraction_test_data_in_train=0.2, | |
| model_ckpt="encoder", | |
| ) -> None: | |
| self.data_dir = data_dir | |
| self.fraction_test_data_in_train = fraction_test_data_in_train | |
| self.model_ckpt = model_ckpt | |
| tokenizer = AutoTokenizer.from_pretrained(model_ckpt) | |
| encoding_model = AutoModel.from_pretrained(model_ckpt) | |
| encoding_model = encoding_model.to(device) | |
| encoding_model = encoding_model.eval() | |
| self.encoding_model = encoding_model | |
| self.tokenizer = tokenizer | |
| self.logger = get_simple_logger("pdf_dataset") | |
| def create_datasets(self): | |
| train_data_path = os.path.join(FILE_DIR, self.data_dir, "train.csv") | |
| test_data_path = os.path.join(FILE_DIR, self.data_dir, "test.csv") | |
| df = pd.read_csv(train_data_path) | |
| test_df = pd.read_csv(test_data_path) | |
| train_df, validation_df = train_test_split(df, test_size=0.3, random_state=42) | |
| if self.fraction_test_data_in_train: | |
| self.logger.info( | |
| f"Adding {self.fraction_test_data_in_train} fraction of test dataset to the training set." | |
| ) | |
| test_df, test_df_for_training = train_test_split( | |
| test_df, test_size=self.fraction_test_data_in_train, random_state=42 | |
| ) | |
| train_df = pd.concat([train_df, test_df_for_training]) | |
| train_dataset = Dataset.from_pandas(train_df) | |
| validation_dataset = Dataset.from_pandas(validation_df) | |
| test_dataset = Dataset.from_pandas(test_df) | |
| return train_dataset, validation_dataset, test_dataset | |
| def mean_pooling(self, model_output, attention_mask): | |
| token_embeddings = model_output[ | |
| 0 | |
| ] # First element of model_output contains all token embeddings | |
| input_mask_expanded = ( | |
| attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| ) | |
| return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( | |
| input_mask_expanded.sum(1), min=1e-9 | |
| ) | |
| def sentences_to_embedding(self, sentences): | |
| # Tokenize sentences | |
| encoded_input = self.tokenizer( | |
| sentences, padding=True, truncation=True, return_tensors="pt" | |
| ) | |
| sentence_embeddings = self.mean_pooling( | |
| self.encoding_model(**encoded_input), encoded_input["attention_mask"] | |
| ) | |
| sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) | |
| # remove last dimension | |
| sentence_embeddings = sentence_embeddings.squeeze() | |
| return sentence_embeddings.detach() | |
| def get_embeddings(self, row): | |
| return { | |
| "embeddings": self.sentences_to_embedding( | |
| sentences=row["content"], | |
| ) | |
| } | |
| def create_embeddings(self): | |
| train_dataset, validation_dataset, test_dataset = self.create_datasets() | |
| train_dataset = train_dataset.map(self.get_embeddings) | |
| validation_dataset = validation_dataset.map(self.get_embeddings) | |
| test_dataset = test_dataset.map(self.get_embeddings) | |
| return train_dataset, validation_dataset, test_dataset | |
| class PDFModel(nn.Module): | |
| def __init__(self, input_size, hidden_sizes, output_size): | |
| super(PDFModel, self).__init__() | |
| self.seq_model = nn.Sequential() | |
| for i, hidden_size in enumerate(hidden_sizes): | |
| self.seq_model.add_module(f"linear_{i}", nn.Linear(input_size, hidden_size)) | |
| self.seq_model.add_module(f"relu_{i}", nn.ReLU()) | |
| input_size = hidden_size | |
| self.last_layer = nn.Linear(input_size, output_size) | |
| self.sigmoid = nn.Sigmoid() | |
| def forward(self, x): | |
| seq_out = self.seq_model(x) | |
| out = self.last_layer(seq_out) | |
| return self.sigmoid(out) | |
| def evaluate_model(y_true, y_pred, model_name, split="train"): | |
| accuracy = accuracy_score(y_true, y_pred) | |
| precision = precision_score(y_true, y_pred) | |
| classification_report_ = classification_report(y_true, y_pred) | |
| print("------" * 10) | |
| print(f"Evaluating for the model: {model_name} for {split} dataset...") | |
| print(f"Accuracy: {accuracy}") | |
| print(f"Precision: {precision}") | |
| print(classification_report_) | |
| print("------" * 10) | |
| def train_dl_model( | |
| train_data, | |
| validation_data, | |
| epochs=30, | |
| input_shape=384, | |
| hidden_sizes=[32, 16], | |
| ): | |
| model = PDFModel(input_size=input_shape, hidden_sizes=hidden_sizes, output_size=1) | |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
| loss_fn = nn.BCELoss() | |
| accuracy = torchmetrics.Accuracy( | |
| task="binary", num_classes=2, threshold=0.5, average="macro" | |
| ) | |
| precision = torchmetrics.Precision(task="binary", average="macro") | |
| metrics = { | |
| "accuracy": accuracy, | |
| "precision": precision, | |
| } | |
| scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=0.0001) | |
| tt = TorchTrain(model, optimizer, loss_fn, metrics=metrics, scheduler=scheduler) | |
| history = tt.fit(train_data, validation_data, verbose=True, epochs=epochs) | |
| return history, model | |
| def evaluate_models(fraction_test_data_in_train=0.1): | |
| print("Creating Embeddings...") | |
| ds = PDFDataSet(fraction_test_data_in_train=fraction_test_data_in_train) | |
| train_dataset, validation_dataset, test_dataset = ds.create_embeddings() | |
| print("Done\n") | |
| print("Training DL Model") | |
| # Create dataset for DL models: | |
| BATCH_SIZE = 8 | |
| train_dataloader = PDFDataLoader(train_dataset) | |
| validation_dataloader = PDFDataLoader(validation_dataset) | |
| test_dataloader = PDFDataLoader(test_dataset) | |
| train_data = DataLoader(train_dataloader, batch_size=BATCH_SIZE, shuffle=True) | |
| validation_data = DataLoader( | |
| validation_dataloader, | |
| batch_size=BATCH_SIZE, | |
| shuffle=True, | |
| ) | |
| test_data = DataLoader(test_dataloader, batch_size=BATCH_SIZE, shuffle=True) | |
| for X, y in train_data: | |
| input_shape = int(X.shape[1]) | |
| output_shape = int(y.shape[1]) | |
| break | |
| epochs = 30 | |
| hidden_sizes = [32, 16] | |
| history, model = train_dl_model( | |
| train_data=train_data, | |
| validation_data=validation_data, | |
| epochs=epochs, | |
| hidden_sizes=hidden_sizes, | |
| ) | |
| print("Done\n") | |
| print("Evaluating DL Model") | |
| y_test_pred = model(torch.from_numpy(np.array(test_dataset["embeddings"])).float()) | |
| y_test_pred = y_test_pred.detach().numpy() | |
| y_test_pred = np.where(y_test_pred > 0.5, 1, 0) | |
| evaluate_model( | |
| y_true=test_dataset["label"], | |
| y_pred=y_test_pred, | |
| model_name="DL Model", | |
| split="test", | |
| ) | |
| print("Done\n") | |
| # ML Models | |
| print("Training and evaluating ML Models.") | |
| X_train = train_dataset["embeddings"] | |
| y_train = train_dataset["label"] | |
| X_validation = validation_dataset["embeddings"] | |
| y_validation = validation_dataset["label"] | |
| X_test = test_dataset["embeddings"] | |
| y_test = test_dataset["label"] | |
| rfc_best_params = { | |
| "max_depth": 23, | |
| "max_features": "log2", | |
| "n_estimators": 469, | |
| } | |
| xgb_best_params = { | |
| "max_depth": 25, | |
| "n_estimators": 372, | |
| "learning_rate": 0.2522824287799319, | |
| } | |
| print("Fitting RandomForest") | |
| rfc = RandomForestClassifier(**rfc_best_params) | |
| rfc.fit(X_train, y_train) | |
| evaluate_model( | |
| y_true=y_train, | |
| y_pred=rfc.predict(X_train), | |
| model_name="RandomForest", | |
| split="train", | |
| ) | |
| evaluate_model( | |
| y_true=y_validation, | |
| y_pred=rfc.predict(X_validation), | |
| model_name="RandomForest", | |
| split="validation", | |
| ) | |
| evaluate_model( | |
| y_true=y_test, | |
| y_pred=rfc.predict(X_test), | |
| model_name="RandomForest", | |
| split="test", | |
| ) | |
| print("Fitting XGBoost") | |
| xgb = XGBClassifier(**xgb_best_params) | |
| xgb.fit(X_train, y_train) | |
| evaluate_model( | |
| y_true=y_train, | |
| y_pred=xgb.predict(X_train), | |
| model_name="XGBoost", | |
| split="train", | |
| ) | |
| evaluate_model( | |
| y_true=y_validation, | |
| y_pred=xgb.predict(X_validation), | |
| model_name="XGBoost", | |
| split="validation", | |
| ) | |
| evaluate_model( | |
| y_true=y_test, | |
| y_pred=xgb.predict(X_test), | |
| model_name="XGBoost", | |
| split="test", | |
| ) | |
| print("All Done") | |
| def train_and_save_final_model(model_save_path="final_model.pkl"): | |
| """This method creats and save the final model. The final model has the following characterstics: | |
| - It is a RandomForestClassifier trained on all the training data and 10% of the test data. 10% of the test data. The 10% of test data is necessary as the distribution of the test data is very different from the training data. | |
| - Since 10% of test data is used while training, this data is not used while claculating the final accuracy of the model, which is 100%. | |
| Parameters | |
| ---------- | |
| model_save_path : str, optional | |
| The path to save the final model, by default "final_model.pkl" | |
| Returns | |
| ------- | |
| None | |
| Examples | |
| -------- | |
| >>> train_and_save_final_model() | |
| >>> train_and_save_final_model(model_save_path="final_model.pkl") | |
| """ | |
| print("Creating Embeddings...") | |
| model_save_path = os.path.join(FILE_DIR, model_save_path) | |
| ds = PDFDataSet(fraction_test_data_in_train=0.1) | |
| train_dataset, validation_dataset, test_dataset = ds.create_embeddings() | |
| train_dataset = concatenate_datasets([train_dataset, validation_dataset]) | |
| X_train = train_dataset["embeddings"] | |
| X_test = test_dataset["embeddings"] | |
| y_train = train_dataset["label"] | |
| y_test = test_dataset["label"] | |
| print("Training and evaluating the model...") | |
| rfc_best_params = { | |
| "max_depth": 23, | |
| "max_features": "log2", | |
| "n_estimators": 469, | |
| } | |
| rfc_model = RandomForestClassifier(**rfc_best_params) | |
| rfc_model.fit(X_train, y_train) | |
| evaluate_model( | |
| y_true=y_train, | |
| y_pred=rfc_model.predict(X_train), | |
| model_name="Final Model", | |
| split="train", | |
| ) | |
| evaluate_model( | |
| y_true=y_test, | |
| y_pred=rfc_model.predict(X_test), | |
| model_name="Final Model", | |
| split="test", | |
| ) | |
| print("Saving the model...") | |
| with open(model_save_path, "wb") as f: | |
| pickle.dump(rfc_model, f) | |
| print(f"Model saved to: {model_save_path}") | |
| def main(args): | |
| task = args.task | |
| if task == "train": | |
| model_save_path = args.model_save_path | |
| train_and_save_final_model(model_save_path=model_save_path) | |
| elif task == "evaluate": | |
| fraction_test_data_in_train = args.fraction | |
| evaluate_models(fraction_test_data_in_train) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Train and evaluate models") | |
| parser.add_argument( | |
| "--task", | |
| type=str, | |
| choices=["train", "evaluate"], | |
| required=True, | |
| help="Whether to train and save the best model or evaluate all the models.", | |
| ) | |
| parser.add_argument( | |
| "--fraction", | |
| type=float, | |
| default=0.1, | |
| help="Fraction of test data in train dataset", | |
| ) | |
| parser.add_argument( | |
| "--model_save_path", | |
| type=str, | |
| default="final_model.pkl", | |
| help="Path to save the final model", | |
| ) | |
| args = parser.parse_args() | |
| main(args) | |