import os import pandas as pd import numpy as np import torch from torch.nn import CrossEntropyLoss from torch.utils.data import WeightedRandomSampler, DataLoader from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, precision_recall_fscore_support from sklearn.utils.class_weight import compute_class_weight from datasets import Dataset from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding ) from augment import augment_class # ----------------------------- # CONFIG # ----------------------------- MODEL_NAME = "ProsusAI/finbert" DATASET_PATH = "/home/sayantan/Desktop/sentiment_data.csv" OUTPUT_DIR = "./finbert_financial_model" EPOCHS = 3 BATCH_SIZE = 16 MAX_LENGTH = 128 # ----------------------------- # LABEL MAPPING # ----------------------------- id2label = {0: "negative", 1: "neutral", 2: "positive"} label2id = {"negative": 0, "neutral": 1, "positive": 2} # ----------------------------- # LOAD DATASET # ----------------------------- print("Loading dataset...") df = pd.read_csv(DATASET_PATH, encoding='latin1') # df["label"] = df["sentiment"] and df["sentiment"].map(label2id) df = df[["text", "label"]] df["text"] = df["text"].astype(str) df["label"] = df["label"].map(label2id) df = df.dropna(subset=["label"]) # drop any rows with unmapped/unknown labels df["label"] = df["label"].astype(int) print(f"Dataset size: {len(df)}") print(df["label"].value_counts()) # ----------------------------- # TRAIN / TEST SPLIT # ----------------------------- train_df, test_df = train_test_split( df, test_size=0.1, random_state=42, stratify=df["label"] ) # ----------------------------- # AUGMENT — must happen on pandas BEFORE converting to Dataset # ----------------------------- train_df = train_df.reset_index(drop=True) test_df = test_df.reset_index(drop=True) print("Augmenting negative class...") print(f"Before augmentation: {train_df['label'].value_counts().to_dict()}") train_df = augment_class(train_df, label_id=0, target_count=1000) print(f"After augmentation: {train_df['label'].value_counts().to_dict()}") train_df["label"] = train_df["label"].astype(int) train_df["text"] = train_df["text"].astype(str) test_df["label"] = test_df["label"].astype(int) test_df["text"] = test_df["text"].astype(str) # ----------------------------- # CLASS WEIGHTS & SAMPLER # Computed AFTER augmentation so sizes match the actual training data # ----------------------------- class_weights = compute_class_weight( class_weight="balanced", classes=np.array([0, 1, 2]), y=train_df["label"].values ) weights_tensor = torch.tensor(class_weights, dtype=torch.float) print(f"Class weights → negative: {class_weights[0]:.2f}, neutral: {class_weights[1]:.2f}, positive: {class_weights[2]:.2f}") sample_weights = [class_weights[label] for label in train_df["label"].values] sampler = WeightedRandomSampler( weights=torch.tensor(sample_weights, dtype=torch.float), num_samples=len(sample_weights), replacement=True ) # ----------------------------- # CONVERT TO HUGGINGFACE DATASET # ----------------------------- from datasets import Dataset, Features, Value, ClassLabel features = Features({ "text": Value("string"), "label": ClassLabel(num_classes=3, names=["negative", "neutral", "positive"]) }) train_dataset = Dataset.from_pandas(train_df[["text", "label"]], features=features) test_dataset = Dataset.from_pandas(test_df[["text", "label"]], features=features) # ----------------------------- # TOKENIZER # ----------------------------- print("Loading tokenizer...") checkpoint = os.path.join(OUTPUT_DIR, "config.json") if os.path.exists(checkpoint): print("Loading existing tokenizer...") tokenizer = AutoTokenizer.from_pretrained(OUTPUT_DIR) else: print("Loading base FinBERT tokenizer...") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) def tokenize(batch): texts = [] for x in batch["text"]: if x is None: texts.append("") elif isinstance(x, str): texts.append(x) else: texts.append(str(x)) return tokenizer( texts, truncation=True, padding="max_length", max_length=MAX_LENGTH, ) train_dataset = train_dataset.map(tokenize, batched=True, remove_columns=["text"]) test_dataset = test_dataset.map(tokenize, batched=True, remove_columns=["text"]) train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"]) test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"]) # ----------------------------- # MODEL # ----------------------------- print("Loading FinBERT model...") if os.path.exists(checkpoint): print("Loading previously trained model...") model = AutoModelForSequenceClassification.from_pretrained(OUTPUT_DIR) else: print("Loading base FinBERT model...") model = AutoModelForSequenceClassification.from_pretrained( MODEL_NAME, num_labels=3, id2label=id2label, label2id=label2id ) # ----------------------------- # WEIGHTED TRAINER # ----------------------------- class WeightedTrainer(Trainer): def __init__(self, *args, weights_tensor, sampler, **kwargs): super().__init__(*args, **kwargs) self.weights_tensor = weights_tensor self.sampler = sampler def compute_loss(self, model, inputs, return_outputs=False, **kwargs): labels = inputs.get("labels") outputs = model(**inputs) logits = outputs.get("logits") loss = CrossEntropyLoss( weight=self.weights_tensor.to(logits.device) )(logits, labels) return (loss, outputs) if return_outputs else loss def get_train_dataloader(self): return DataLoader( self.train_dataset, batch_size=self.args.per_device_train_batch_size, sampler=self.sampler, collate_fn=self.data_collator, pin_memory=True ) # ----------------------------- # TRAINING ARGS # ----------------------------- training_args = TrainingArguments( output_dir=OUTPUT_DIR, learning_rate=2e-5, lr_scheduler_type="cosine", warmup_ratio=0.1, per_device_train_batch_size=BATCH_SIZE, per_device_eval_batch_size=BATCH_SIZE, num_train_epochs=EPOCHS, eval_strategy="epoch", save_strategy="epoch", load_best_model_at_end=True, metric_for_best_model="f1_macro", # Save best by macro F1, not loss weight_decay=0.01, logging_dir="./logs", logging_steps=10, report_to="none" ) # ----------------------------- # METRICS # ----------------------------- def compute_metrics(eval_pred): logits, labels = eval_pred preds = logits.argmax(axis=1) acc = accuracy_score(labels, preds) precision_w, recall_w, f1_w, _ = precision_recall_fscore_support(labels, preds, average="weighted") precision_m, recall_m, f1_m, _ = precision_recall_fscore_support(labels, preds, average="macro") return { "accuracy": acc, "f1_weighted": f1_w, "f1_macro": f1_m, "precision_macro": precision_m, "recall_macro": recall_m } # ----------------------------- # DATA COLLATOR # ----------------------------- data_collator = DataCollatorWithPadding(tokenizer) # ----------------------------- # TRAIN # ----------------------------- trainer = WeightedTrainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=test_dataset, compute_metrics=compute_metrics, data_collator=data_collator, weights_tensor=weights_tensor, # passed explicitly, not global sampler=sampler # passed explicitly, not global ) print("Starting training...") trainer.train() # ----------------------------- # SAVE # ----------------------------- print("Saving model...") trainer.save_model(OUTPUT_DIR) tokenizer.save_pretrained(OUTPUT_DIR) print("Training complete.")