| import os |
| import pandas as pd |
| import numpy as np |
| import torch |
| from torch.nn import CrossEntropyLoss |
| from torch.utils.data import WeightedRandomSampler, DataLoader |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support |
| from sklearn.utils.class_weight import compute_class_weight |
| from datasets import Dataset |
| from transformers import ( |
| AutoTokenizer, |
| AutoModelForSequenceClassification, |
| TrainingArguments, |
| Trainer, |
| DataCollatorWithPadding |
| ) |
|
|
| from augment import augment_class |
|
|
| |
| |
| |
|
|
| MODEL_NAME = "ProsusAI/finbert" |
| DATASET_PATH = "/home/sayantan/Desktop/sentiment_data.csv" |
| OUTPUT_DIR = "./finbert_financial_model" |
|
|
| EPOCHS = 3 |
| BATCH_SIZE = 16 |
| MAX_LENGTH = 128 |
|
|
| |
| |
| |
|
|
| id2label = {0: "negative", 1: "neutral", 2: "positive"} |
| label2id = {"negative": 0, "neutral": 1, "positive": 2} |
|
|
| |
| |
| |
|
|
| print("Loading dataset...") |
|
|
| df = pd.read_csv(DATASET_PATH, encoding='latin1') |
| |
| df = df[["text", "label"]] |
| df["text"] = df["text"].astype(str) |
| df["label"] = df["label"].map(label2id) |
| df = df.dropna(subset=["label"]) |
| df["label"] = df["label"].astype(int) |
|
|
| print(f"Dataset size: {len(df)}") |
| print(df["label"].value_counts()) |
|
|
| |
| |
| |
|
|
| train_df, test_df = train_test_split( |
| df, |
| test_size=0.1, |
| random_state=42, |
| stratify=df["label"] |
| ) |
|
|
| |
| |
| |
|
|
| train_df = train_df.reset_index(drop=True) |
| test_df = test_df.reset_index(drop=True) |
|
|
| print("Augmenting negative class...") |
| print(f"Before augmentation: {train_df['label'].value_counts().to_dict()}") |
|
|
| train_df = augment_class(train_df, label_id=0, target_count=1000) |
|
|
| print(f"After augmentation: {train_df['label'].value_counts().to_dict()}") |
|
|
| train_df["label"] = train_df["label"].astype(int) |
| train_df["text"] = train_df["text"].astype(str) |
| test_df["label"] = test_df["label"].astype(int) |
| test_df["text"] = test_df["text"].astype(str) |
|
|
| |
| |
| |
| |
|
|
| class_weights = compute_class_weight( |
| class_weight="balanced", |
| classes=np.array([0, 1, 2]), |
| y=train_df["label"].values |
| ) |
| weights_tensor = torch.tensor(class_weights, dtype=torch.float) |
| print(f"Class weights → negative: {class_weights[0]:.2f}, neutral: {class_weights[1]:.2f}, positive: {class_weights[2]:.2f}") |
|
|
| sample_weights = [class_weights[label] for label in train_df["label"].values] |
| sampler = WeightedRandomSampler( |
| weights=torch.tensor(sample_weights, dtype=torch.float), |
| num_samples=len(sample_weights), |
| replacement=True |
| ) |
|
|
| |
| |
| |
|
|
| from datasets import Dataset, Features, Value, ClassLabel |
|
|
| features = Features({ |
| "text": Value("string"), |
| "label": ClassLabel(num_classes=3, names=["negative", "neutral", "positive"]) |
| }) |
|
|
| train_dataset = Dataset.from_pandas(train_df[["text", "label"]], features=features) |
| test_dataset = Dataset.from_pandas(test_df[["text", "label"]], features=features) |
|
|
| |
| |
| |
|
|
| print("Loading tokenizer...") |
|
|
| checkpoint = os.path.join(OUTPUT_DIR, "config.json") |
|
|
| if os.path.exists(checkpoint): |
| print("Loading existing tokenizer...") |
| tokenizer = AutoTokenizer.from_pretrained(OUTPUT_DIR) |
| else: |
| print("Loading base FinBERT tokenizer...") |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
|
|
| def tokenize(batch): |
| texts = [] |
|
|
| for x in batch["text"]: |
| if x is None: |
| texts.append("") |
| elif isinstance(x, str): |
| texts.append(x) |
| else: |
| texts.append(str(x)) |
|
|
| return tokenizer( |
| texts, |
| truncation=True, |
| padding="max_length", |
| max_length=MAX_LENGTH, |
| ) |
|
|
| train_dataset = train_dataset.map(tokenize, batched=True, remove_columns=["text"]) |
| test_dataset = test_dataset.map(tokenize, batched=True, remove_columns=["text"]) |
|
|
| train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"]) |
| test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"]) |
|
|
| |
| |
| |
|
|
| print("Loading FinBERT model...") |
|
|
| if os.path.exists(checkpoint): |
| print("Loading previously trained model...") |
| model = AutoModelForSequenceClassification.from_pretrained(OUTPUT_DIR) |
| else: |
| print("Loading base FinBERT model...") |
| model = AutoModelForSequenceClassification.from_pretrained( |
| MODEL_NAME, |
| num_labels=3, |
| id2label=id2label, |
| label2id=label2id |
| ) |
|
|
| |
| |
| |
|
|
| class WeightedTrainer(Trainer): |
| def __init__(self, *args, weights_tensor, sampler, **kwargs): |
| super().__init__(*args, **kwargs) |
| self.weights_tensor = weights_tensor |
| self.sampler = sampler |
|
|
| def compute_loss(self, model, inputs, return_outputs=False, **kwargs): |
| labels = inputs.get("labels") |
| outputs = model(**inputs) |
| logits = outputs.get("logits") |
| loss = CrossEntropyLoss( |
| weight=self.weights_tensor.to(logits.device) |
| )(logits, labels) |
| return (loss, outputs) if return_outputs else loss |
|
|
| def get_train_dataloader(self): |
| return DataLoader( |
| self.train_dataset, |
| batch_size=self.args.per_device_train_batch_size, |
| sampler=self.sampler, |
| collate_fn=self.data_collator, |
| pin_memory=True |
| ) |
|
|
| |
| |
| |
|
|
| training_args = TrainingArguments( |
| output_dir=OUTPUT_DIR, |
| learning_rate=2e-5, |
| lr_scheduler_type="cosine", |
| warmup_ratio=0.1, |
| per_device_train_batch_size=BATCH_SIZE, |
| per_device_eval_batch_size=BATCH_SIZE, |
| num_train_epochs=EPOCHS, |
| eval_strategy="epoch", |
| save_strategy="epoch", |
| load_best_model_at_end=True, |
| metric_for_best_model="f1_macro", |
| weight_decay=0.01, |
| logging_dir="./logs", |
| logging_steps=10, |
| report_to="none" |
| ) |
|
|
| |
| |
| |
|
|
| def compute_metrics(eval_pred): |
| logits, labels = eval_pred |
| preds = logits.argmax(axis=1) |
|
|
| acc = accuracy_score(labels, preds) |
| precision_w, recall_w, f1_w, _ = precision_recall_fscore_support(labels, preds, average="weighted") |
| precision_m, recall_m, f1_m, _ = precision_recall_fscore_support(labels, preds, average="macro") |
|
|
| return { |
| "accuracy": acc, |
| "f1_weighted": f1_w, |
| "f1_macro": f1_m, |
| "precision_macro": precision_m, |
| "recall_macro": recall_m |
| } |
|
|
| |
| |
| |
|
|
| data_collator = DataCollatorWithPadding(tokenizer) |
|
|
| |
| |
| |
|
|
| trainer = WeightedTrainer( |
| model=model, |
| args=training_args, |
| train_dataset=train_dataset, |
| eval_dataset=test_dataset, |
| compute_metrics=compute_metrics, |
| data_collator=data_collator, |
| weights_tensor=weights_tensor, |
| sampler=sampler |
| ) |
|
|
| print("Starting training...") |
| trainer.train() |
|
|
| |
| |
| |
|
|
| print("Saving model...") |
| trainer.save_model(OUTPUT_DIR) |
| tokenizer.save_pretrained(OUTPUT_DIR) |
|
|
| print("Training complete.") |