sbasu2512's picture
Fix: Properly fetching and re-uploading complete model weights via Git LFS
91bd1f8
import os
import pandas as pd
import numpy as np
import torch
from torch.nn import CrossEntropyLoss
from torch.utils.data import WeightedRandomSampler, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
from datasets import Dataset
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
DataCollatorWithPadding
)
from augment import augment_class
# -----------------------------
# CONFIG
# -----------------------------
MODEL_NAME = "ProsusAI/finbert"
DATASET_PATH = "/home/sayantan/Desktop/sentiment_data.csv"
OUTPUT_DIR = "./finbert_financial_model"
EPOCHS = 3
BATCH_SIZE = 16
MAX_LENGTH = 128
# -----------------------------
# LABEL MAPPING
# -----------------------------
id2label = {0: "negative", 1: "neutral", 2: "positive"}
label2id = {"negative": 0, "neutral": 1, "positive": 2}
# -----------------------------
# LOAD DATASET
# -----------------------------
print("Loading dataset...")
df = pd.read_csv(DATASET_PATH, encoding='latin1')
# df["label"] = df["sentiment"] and df["sentiment"].map(label2id)
df = df[["text", "label"]]
df["text"] = df["text"].astype(str)
df["label"] = df["label"].map(label2id)
df = df.dropna(subset=["label"]) # drop any rows with unmapped/unknown labels
df["label"] = df["label"].astype(int)
print(f"Dataset size: {len(df)}")
print(df["label"].value_counts())
# -----------------------------
# TRAIN / TEST SPLIT
# -----------------------------
train_df, test_df = train_test_split(
df,
test_size=0.1,
random_state=42,
stratify=df["label"]
)
# -----------------------------
# AUGMENT — must happen on pandas BEFORE converting to Dataset
# -----------------------------
train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
print("Augmenting negative class...")
print(f"Before augmentation: {train_df['label'].value_counts().to_dict()}")
train_df = augment_class(train_df, label_id=0, target_count=1000)
print(f"After augmentation: {train_df['label'].value_counts().to_dict()}")
train_df["label"] = train_df["label"].astype(int)
train_df["text"] = train_df["text"].astype(str)
test_df["label"] = test_df["label"].astype(int)
test_df["text"] = test_df["text"].astype(str)
# -----------------------------
# CLASS WEIGHTS & SAMPLER
# Computed AFTER augmentation so sizes match the actual training data
# -----------------------------
class_weights = compute_class_weight(
class_weight="balanced",
classes=np.array([0, 1, 2]),
y=train_df["label"].values
)
weights_tensor = torch.tensor(class_weights, dtype=torch.float)
print(f"Class weights → negative: {class_weights[0]:.2f}, neutral: {class_weights[1]:.2f}, positive: {class_weights[2]:.2f}")
sample_weights = [class_weights[label] for label in train_df["label"].values]
sampler = WeightedRandomSampler(
weights=torch.tensor(sample_weights, dtype=torch.float),
num_samples=len(sample_weights),
replacement=True
)
# -----------------------------
# CONVERT TO HUGGINGFACE DATASET
# -----------------------------
from datasets import Dataset, Features, Value, ClassLabel
features = Features({
"text": Value("string"),
"label": ClassLabel(num_classes=3, names=["negative", "neutral", "positive"])
})
train_dataset = Dataset.from_pandas(train_df[["text", "label"]], features=features)
test_dataset = Dataset.from_pandas(test_df[["text", "label"]], features=features)
# -----------------------------
# TOKENIZER
# -----------------------------
print("Loading tokenizer...")
checkpoint = os.path.join(OUTPUT_DIR, "config.json")
if os.path.exists(checkpoint):
print("Loading existing tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(OUTPUT_DIR)
else:
print("Loading base FinBERT tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
def tokenize(batch):
texts = []
for x in batch["text"]:
if x is None:
texts.append("")
elif isinstance(x, str):
texts.append(x)
else:
texts.append(str(x))
return tokenizer(
texts,
truncation=True,
padding="max_length",
max_length=MAX_LENGTH,
)
train_dataset = train_dataset.map(tokenize, batched=True, remove_columns=["text"])
test_dataset = test_dataset.map(tokenize, batched=True, remove_columns=["text"])
train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
# -----------------------------
# MODEL
# -----------------------------
print("Loading FinBERT model...")
if os.path.exists(checkpoint):
print("Loading previously trained model...")
model = AutoModelForSequenceClassification.from_pretrained(OUTPUT_DIR)
else:
print("Loading base FinBERT model...")
model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME,
num_labels=3,
id2label=id2label,
label2id=label2id
)
# -----------------------------
# WEIGHTED TRAINER
# -----------------------------
class WeightedTrainer(Trainer):
def __init__(self, *args, weights_tensor, sampler, **kwargs):
super().__init__(*args, **kwargs)
self.weights_tensor = weights_tensor
self.sampler = sampler
def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
labels = inputs.get("labels")
outputs = model(**inputs)
logits = outputs.get("logits")
loss = CrossEntropyLoss(
weight=self.weights_tensor.to(logits.device)
)(logits, labels)
return (loss, outputs) if return_outputs else loss
def get_train_dataloader(self):
return DataLoader(
self.train_dataset,
batch_size=self.args.per_device_train_batch_size,
sampler=self.sampler,
collate_fn=self.data_collator,
pin_memory=True
)
# -----------------------------
# TRAINING ARGS
# -----------------------------
training_args = TrainingArguments(
output_dir=OUTPUT_DIR,
learning_rate=2e-5,
lr_scheduler_type="cosine",
warmup_ratio=0.1,
per_device_train_batch_size=BATCH_SIZE,
per_device_eval_batch_size=BATCH_SIZE,
num_train_epochs=EPOCHS,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1_macro", # Save best by macro F1, not loss
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
report_to="none"
)
# -----------------------------
# METRICS
# -----------------------------
def compute_metrics(eval_pred):
logits, labels = eval_pred
preds = logits.argmax(axis=1)
acc = accuracy_score(labels, preds)
precision_w, recall_w, f1_w, _ = precision_recall_fscore_support(labels, preds, average="weighted")
precision_m, recall_m, f1_m, _ = precision_recall_fscore_support(labels, preds, average="macro")
return {
"accuracy": acc,
"f1_weighted": f1_w,
"f1_macro": f1_m,
"precision_macro": precision_m,
"recall_macro": recall_m
}
# -----------------------------
# DATA COLLATOR
# -----------------------------
data_collator = DataCollatorWithPadding(tokenizer)
# -----------------------------
# TRAIN
# -----------------------------
trainer = WeightedTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=test_dataset,
compute_metrics=compute_metrics,
data_collator=data_collator,
weights_tensor=weights_tensor, # passed explicitly, not global
sampler=sampler # passed explicitly, not global
)
print("Starting training...")
trainer.train()
# -----------------------------
# SAVE
# -----------------------------
print("Saving model...")
trainer.save_model(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print("Training complete.")