|
|
"""Training utilities for transformer-based multi-label classification. |
|
|
|
|
|
This module contains a small training harness around HuggingFace |
|
|
`AutoModelForSequenceClassification` specialized for the project's |
|
|
multi-label code-comment classification task. It provides: |
|
|
|
|
|
- `TransformerConfig` dataclass for configurable training runs. |
|
|
- `CommentDataset` to wrap tokenization of pandas DataFrames. |
|
|
- `TransformerTrainer` which runs the training loop, evaluation and |
|
|
model export (with MLflow logging hooks). |
|
|
|
|
|
The helpers are intended for experimental, small-scale training and |
|
|
instrumentation rather than production-grade distributed training. |
|
|
""" |
|
|
|
|
|
from dataclasses import asdict, dataclass |
|
|
import logging |
|
|
import os |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
import mlflow |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from sklearn.metrics import ( |
|
|
accuracy_score, |
|
|
classification_report, |
|
|
f1_score, |
|
|
precision_score, |
|
|
recall_score, |
|
|
) |
|
|
import torch |
|
|
from torch.utils.data import DataLoader, Dataset |
|
|
from tqdm.auto import tqdm |
|
|
from transformers import ( |
|
|
AutoModelForSequenceClassification, |
|
|
AutoTokenizer, |
|
|
get_linear_schedule_with_warmup, |
|
|
) |
|
|
|
|
|
from .preprocessing import load_or_prepare_data |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
print(f"Using device: {DEVICE}") |
|
|
|
|
|
|
|
|
|
|
|
LABELS: Dict[str, Tuple[str, ...]] = { |
|
|
"java": ( |
|
|
"summary", |
|
|
"Ownership", |
|
|
"Expand", |
|
|
"usage", |
|
|
"Pointer", |
|
|
"deprecation", |
|
|
"rational", |
|
|
), |
|
|
"python": ( |
|
|
"Usage", |
|
|
"Parameters", |
|
|
"DevelopmentNotes", |
|
|
"Expand", |
|
|
"Summary", |
|
|
), |
|
|
"pharo": ( |
|
|
"Keyimplementationpoints", |
|
|
"Example", |
|
|
"Responsibilities", |
|
|
"Intent", |
|
|
"Keymessages", |
|
|
"Collaborators", |
|
|
), |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TransformerConfig: |
|
|
"""Configuration for transformer training runs. |
|
|
|
|
|
Attributes are intentionally simple dataclass fields and map directly to |
|
|
CLI/YAML configuration keys used by the training harness. |
|
|
""" |
|
|
|
|
|
lang: str |
|
|
raw_data_dir: str |
|
|
processed_data_dir: str |
|
|
model_output_path: str |
|
|
pretrained_model_name: str = "microsoft/codebert-base" |
|
|
max_length: int = 128 |
|
|
batch_size: int = 16 |
|
|
lr: float = 2e-5 |
|
|
num_epochs: int = 5 |
|
|
warmup_ratio: float = 0.1 |
|
|
pos_weight_cap: float = 30.0 |
|
|
threshold: float = 0.5 |
|
|
preprocessing: bool = False |
|
|
preprocessing_factor: float = 1.0 |
|
|
|
|
|
def __post_init__(self) -> None: |
|
|
"""Force correct types even if YAML provides strings.""" |
|
|
self.max_length = int(self.max_length) |
|
|
self.batch_size = int(self.batch_size) |
|
|
self.lr = float(self.lr) |
|
|
self.num_epochs = int(self.num_epochs) |
|
|
self.warmup_ratio = float(self.warmup_ratio) |
|
|
self.pos_weight_cap = float(self.pos_weight_cap) |
|
|
self.threshold = float(self.threshold) |
|
|
self.preprocessing_factor = float(self.preprocessing_factor) |
|
|
|
|
|
|
|
|
if isinstance(self.preprocessing, str): |
|
|
self.preprocessing = self.preprocessing.lower() == "true" |
|
|
|
|
|
|
|
|
class CommentDataset(Dataset): |
|
|
"""Simple Dataset wrapper around a pandas DataFrame with 'combo' and 'labels_array'.""" |
|
|
|
|
|
def __init__(self, df: pd.DataFrame, tokenizer: AutoTokenizer, max_length: int): |
|
|
"""Create a dataset that tokenizes rows on demand. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
df : pandas.DataFrame |
|
|
Input frame containing at least `combo` and `labels_array` columns. |
|
|
tokenizer : transformers.AutoTokenizer |
|
|
Tokenizer used to encode text into model inputs. |
|
|
max_length : int |
|
|
Maximum tokenization length (used for padding/truncation). |
|
|
|
|
|
""" |
|
|
self.df = df.reset_index(drop=True) |
|
|
self.tokenizer = tokenizer |
|
|
self.max_length = max_length |
|
|
|
|
|
def __len__(self) -> int: |
|
|
"""Return the number of examples in the dataset.""" |
|
|
return len(self.df) |
|
|
|
|
|
def __getitem__(self, idx: int): |
|
|
"""Return a single tokenized example and its labels as tensors. |
|
|
|
|
|
The returned dict contains tokenized inputs (PyTorch tensors) and a |
|
|
`labels` tensor suitable for BCEWithLogitsLoss for multi-label tasks. |
|
|
""" |
|
|
row = self.df.iloc[idx] |
|
|
text = str(row["combo"]) |
|
|
labels = np.asarray(row["labels_array"], dtype=np.float32) |
|
|
|
|
|
enc = self.tokenizer( |
|
|
text, |
|
|
truncation=True, |
|
|
max_length=self.max_length, |
|
|
padding="max_length", |
|
|
return_tensors="pt", |
|
|
) |
|
|
|
|
|
item = {k: v.squeeze(0) for k, v in enc.items()} |
|
|
item["labels"] = torch.from_numpy(labels) |
|
|
return item |
|
|
|
|
|
|
|
|
class TransformerTrainer: |
|
|
"""End-to-end transformer trainer for the code comment multi-label task.""" |
|
|
|
|
|
def __init__(self, cfg: TransformerConfig) -> None: |
|
|
"""Initialize training state, data loaders, model and optimizer. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
cfg : TransformerConfig |
|
|
Training configuration containing data paths and hyperparameters. |
|
|
|
|
|
""" |
|
|
self.cfg = cfg |
|
|
if cfg.lang not in LABELS: |
|
|
raise ValueError(f"No LABELS defined for language '{cfg.lang}'.") |
|
|
|
|
|
self.label_names = LABELS[cfg.lang] |
|
|
self.num_labels = len(self.label_names) |
|
|
|
|
|
logger.info("Initializing TransformerTrainer for language '%s'.", cfg.lang) |
|
|
logger.info("Raw data directory: %s", cfg.raw_data_dir) |
|
|
logger.info("Processed data directory: %s", cfg.processed_data_dir) |
|
|
logger.info("Model output path: %s", cfg.model_output_path) |
|
|
|
|
|
|
|
|
self.train_df, self.eval_df, self.preprocessing_used = load_or_prepare_data( |
|
|
lang=cfg.lang, |
|
|
raw_data_dir=cfg.raw_data_dir, |
|
|
processed_data_dir=cfg.processed_data_dir, |
|
|
preprocessing_enabled=cfg.preprocessing, |
|
|
preprocessing_factor=cfg.preprocessing_factor, |
|
|
random_state=42, |
|
|
) |
|
|
|
|
|
logger.info("Preprocessing used for this run: %s", self.preprocessing_used) |
|
|
logger.info("Using device: %s", DEVICE) |
|
|
logger.info( |
|
|
"Train size: %d rows, Eval size: %d rows", |
|
|
len(self.train_df), |
|
|
len(self.eval_df), |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
cfg_dict = asdict(self.cfg) |
|
|
mlflow.log_params({f"cfg_{k}": v for k, v in cfg_dict.items()}) |
|
|
mlflow.log_param("num_labels", self.num_labels) |
|
|
mlflow.log_param("label_names", ",".join(self.label_names)) |
|
|
mlflow.log_param("train_samples", len(self.train_df)) |
|
|
mlflow.log_param("eval_samples", len(self.eval_df)) |
|
|
mlflow.log_param("preprocessing_used", self.preprocessing_used) |
|
|
except Exception as e: |
|
|
logger.warning("Could not log transformer config to MLflow: %s", e) |
|
|
|
|
|
|
|
|
logger.info("Loading tokenizer '%s'.", cfg.pretrained_model_name) |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(cfg.pretrained_model_name) |
|
|
|
|
|
|
|
|
y_train = np.stack(self.train_df["labels_array"].to_numpy()) |
|
|
self.pos_weight = self._compute_pos_weight(y_train) |
|
|
|
|
|
|
|
|
train_dataset = CommentDataset(self.train_df, self.tokenizer, cfg.max_length) |
|
|
eval_dataset = CommentDataset(self.eval_df, self.tokenizer, cfg.max_length) |
|
|
|
|
|
self.train_loader = DataLoader( |
|
|
train_dataset, |
|
|
batch_size=cfg.batch_size, |
|
|
shuffle=True, |
|
|
) |
|
|
self.eval_loader = DataLoader( |
|
|
eval_dataset, |
|
|
batch_size=cfg.batch_size, |
|
|
shuffle=False, |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"Hyperparameters – lr=%s (type=%s), batch_size=%s, num_epochs=%s", |
|
|
self.cfg.lr, |
|
|
type(self.cfg.lr), |
|
|
self.cfg.batch_size, |
|
|
self.cfg.num_epochs, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Loading base model '%s'.", cfg.pretrained_model_name) |
|
|
self.model = AutoModelForSequenceClassification.from_pretrained( |
|
|
cfg.pretrained_model_name, |
|
|
num_labels=self.num_labels, |
|
|
problem_type="multi_label_classification", |
|
|
).to(DEVICE) |
|
|
|
|
|
self.loss_fn = torch.nn.BCEWithLogitsLoss(pos_weight=self.pos_weight.to(DEVICE)) |
|
|
self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.cfg.lr) |
|
|
|
|
|
num_training_steps = cfg.num_epochs * len(self.train_loader) |
|
|
num_warmup_steps = int(cfg.warmup_ratio * num_training_steps) |
|
|
logger.info( |
|
|
"Total training steps: %d, warmup steps: %d.", |
|
|
num_training_steps, |
|
|
num_warmup_steps, |
|
|
) |
|
|
|
|
|
self.scheduler = get_linear_schedule_with_warmup( |
|
|
self.optimizer, |
|
|
num_warmup_steps=num_warmup_steps, |
|
|
num_training_steps=num_training_steps, |
|
|
) |
|
|
|
|
|
self.best_state_dict = None |
|
|
self.best_val_macro_f1 = 0.0 |
|
|
|
|
|
def _compute_pos_weight(self, y: np.ndarray) -> torch.Tensor: |
|
|
if y.ndim == 1: |
|
|
y = y[:, None] |
|
|
freq = y.sum(axis=0).astype(np.float64) |
|
|
num_samples = y.shape[0] |
|
|
|
|
|
pos_weight = (num_samples - freq) / np.clip(freq, 1.0, None) |
|
|
pos_weight = np.clip(pos_weight, 1.0, self.cfg.pos_weight_cap) |
|
|
|
|
|
logger.info("Positive class weights (clipped): %s", pos_weight.tolist()) |
|
|
return torch.tensor(pos_weight, dtype=torch.float32) |
|
|
|
|
|
def _step_batch(self, batch, train: bool): |
|
|
batch = {k: v.to(DEVICE) for k, v in batch.items()} |
|
|
labels = batch.pop("labels") |
|
|
|
|
|
outputs = self.model(**batch) |
|
|
logits = outputs.logits |
|
|
loss = self.loss_fn(logits, labels) |
|
|
|
|
|
if train: |
|
|
loss.backward() |
|
|
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) |
|
|
self.optimizer.step() |
|
|
self.scheduler.step() |
|
|
self.optimizer.zero_grad() |
|
|
|
|
|
return loss, logits, labels |
|
|
|
|
|
def train_one_epoch(self, epoch: int) -> float: |
|
|
"""Run a single training epoch over `self.train_loader`. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
float |
|
|
The average training loss over the epoch. |
|
|
|
|
|
""" |
|
|
self.model.train() |
|
|
total_loss = 0.0 |
|
|
n_samples = 0 |
|
|
|
|
|
num_batches = len(self.train_loader) |
|
|
logger.info("Starting epoch %d training. Number of batches: %d", epoch, num_batches) |
|
|
|
|
|
progress_bar = tqdm( |
|
|
self.train_loader, |
|
|
desc=f"Epoch {epoch} [train]", |
|
|
total=num_batches, |
|
|
leave=False, |
|
|
) |
|
|
|
|
|
for step, batch in enumerate(progress_bar, start=1): |
|
|
loss, _, _ = self._step_batch(batch, train=True) |
|
|
batch_size = batch["input_ids"].size(0) |
|
|
total_loss += loss.item() * batch_size |
|
|
n_samples += batch_size |
|
|
|
|
|
avg_loss_so_far = total_loss / max(n_samples, 1) |
|
|
progress_bar.set_postfix({"loss": f"{avg_loss_so_far:.4f}"}) |
|
|
|
|
|
avg_loss = total_loss / max(n_samples, 1) |
|
|
logger.info("Epoch %d training completed. Average loss: %.4f.", epoch, avg_loss) |
|
|
|
|
|
mlflow.log_metric("train_loss", avg_loss, step=epoch) |
|
|
|
|
|
return avg_loss |
|
|
|
|
|
def evaluate( |
|
|
self, |
|
|
epoch: int, |
|
|
split_name: str = "eval", |
|
|
) -> Tuple[float, float, float, np.ndarray, np.ndarray]: |
|
|
"""Evaluate the model on `self.eval_loader` and compute metrics. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
epoch : int |
|
|
Current epoch number (used for logging). |
|
|
split_name : str |
|
|
Name of the evaluation split used for MLflow metric keys. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tuple |
|
|
(avg_loss, micro_f1, macro_f1, y_true, y_pred) |
|
|
|
|
|
""" |
|
|
self.model.eval() |
|
|
total_loss = 0.0 |
|
|
n_samples = 0 |
|
|
all_preds: List[np.ndarray] = [] |
|
|
all_labels: List[np.ndarray] = [] |
|
|
|
|
|
logger.info("Starting evaluation for epoch %d on split '%s'.", epoch, split_name) |
|
|
|
|
|
num_batches = len(self.eval_loader) |
|
|
progress_bar = tqdm( |
|
|
self.eval_loader, |
|
|
desc=f"Epoch {epoch} [{split_name}]", |
|
|
total=num_batches, |
|
|
leave=False, |
|
|
) |
|
|
|
|
|
with torch.no_grad(): |
|
|
for batch in progress_bar: |
|
|
loss, logits, labels = self._step_batch(batch, train=False) |
|
|
batch_size = logits.size(0) |
|
|
total_loss += loss.item() * batch_size |
|
|
n_samples += batch_size |
|
|
|
|
|
probs = torch.sigmoid(logits) |
|
|
preds = (probs > self.cfg.threshold).long() |
|
|
|
|
|
all_preds.append(preds.cpu().numpy()) |
|
|
all_labels.append(labels.cpu().numpy()) |
|
|
|
|
|
avg_loss_so_far = total_loss / max(n_samples, 1) |
|
|
progress_bar.set_postfix({"loss": f"{avg_loss_so_far:.4f}"}) |
|
|
|
|
|
avg_loss = total_loss / max(n_samples, 1) |
|
|
y_pred = np.concatenate(all_preds, axis=0) |
|
|
y_true = np.concatenate(all_labels, axis=0) |
|
|
|
|
|
|
|
|
micro_f1 = f1_score(y_true, y_pred, average="micro", zero_division=0) |
|
|
macro_f1 = f1_score(y_true, y_pred, average="macro", zero_division=0) |
|
|
|
|
|
|
|
|
micro_precision = precision_score(y_true, y_pred, average="micro", zero_division=0) |
|
|
macro_precision = precision_score(y_true, y_pred, average="macro", zero_division=0) |
|
|
|
|
|
|
|
|
micro_recall = recall_score(y_true, y_pred, average="micro", zero_division=0) |
|
|
macro_recall = recall_score(y_true, y_pred, average="macro", zero_division=0) |
|
|
|
|
|
|
|
|
|
|
|
subset_accuracy = accuracy_score(y_true, y_pred) |
|
|
|
|
|
micro_accuracy = accuracy_score(y_true.flatten(), y_pred.flatten()) |
|
|
|
|
|
logger.info( |
|
|
"Eval results [%s] - loss: %.4f | " |
|
|
"micro-F1: %.4f, macro-F1: %.4f | " |
|
|
"micro-P: %.4f, macro-P: %.4f | " |
|
|
"micro-R: %.4f, macro-R: %.4f | " |
|
|
"subset-acc: %.4f, micro-acc: %.4f", |
|
|
split_name, |
|
|
avg_loss, |
|
|
micro_f1, |
|
|
macro_f1, |
|
|
micro_precision, |
|
|
macro_precision, |
|
|
micro_recall, |
|
|
macro_recall, |
|
|
subset_accuracy, |
|
|
micro_accuracy, |
|
|
) |
|
|
|
|
|
|
|
|
mlflow.log_metric(f"{split_name}_loss", avg_loss, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_micro_f1", micro_f1, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_macro_f1", macro_f1, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_micro_precision", micro_precision, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_macro_precision", macro_precision, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_micro_recall", micro_recall, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_macro_recall", macro_recall, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_subset_accuracy", subset_accuracy, step=epoch) |
|
|
mlflow.log_metric(f"{split_name}_micro_accuracy", micro_accuracy, step=epoch) |
|
|
|
|
|
return avg_loss, micro_f1, macro_f1, y_true, y_pred |
|
|
|
|
|
def run(self) -> Dict[str, float]: |
|
|
"""Execute the full training loop and save the best model. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
Summary metrics from the final evaluation (micro/macro F1). |
|
|
|
|
|
""" |
|
|
logger.info("Starting training loop for %d epochs.", self.cfg.num_epochs) |
|
|
for epoch in range(1, self.cfg.num_epochs + 1): |
|
|
train_loss = self.train_one_epoch(epoch) |
|
|
val_loss, val_micro_f1, val_macro_f1, _, _ = self.evaluate(epoch, split_name="eval") |
|
|
|
|
|
logger.info( |
|
|
"[%s] epoch=%d train_loss=%.4f val_loss=%.4f val_micro_f1=%.4f val_macro_f1=%.4f", |
|
|
self.cfg.lang, |
|
|
epoch, |
|
|
train_loss, |
|
|
val_loss, |
|
|
val_micro_f1, |
|
|
val_macro_f1, |
|
|
) |
|
|
|
|
|
if val_macro_f1 > self.best_val_macro_f1: |
|
|
logger.info( |
|
|
"New best macro-F1: %.4f (previous: %.4f). Saving current model state.", |
|
|
val_macro_f1, |
|
|
self.best_val_macro_f1, |
|
|
) |
|
|
self.best_val_macro_f1 = val_macro_f1 |
|
|
self.best_state_dict = {k: v.cpu() for k, v in self.model.state_dict().items()} |
|
|
|
|
|
if self.best_state_dict is not None: |
|
|
logger.info("Loading best model weights (macro-F1 = %.4f).", self.best_val_macro_f1) |
|
|
self.model.load_state_dict(self.best_state_dict) |
|
|
|
|
|
|
|
|
_, micro_f1, macro_f1, y_true, y_pred = self.evaluate( |
|
|
epoch=self.cfg.num_epochs, |
|
|
split_name="eval", |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"[%s] FINAL micro-F1 = %.4f, macro-F1 = %.4f.", |
|
|
self.cfg.lang, |
|
|
micro_f1, |
|
|
macro_f1, |
|
|
) |
|
|
logger.info( |
|
|
"Per-label classification report:\n%s", |
|
|
classification_report(y_true, y_pred, target_names=self.label_names, zero_division=0), |
|
|
) |
|
|
|
|
|
|
|
|
os.makedirs(self.cfg.model_output_path, exist_ok=True) |
|
|
logger.info("Saving model and tokenizer to '%s'.", self.cfg.model_output_path) |
|
|
self.model.save_pretrained(self.cfg.model_output_path) |
|
|
self.tokenizer.save_pretrained(self.cfg.model_output_path) |
|
|
|
|
|
|
|
|
logger.info("Logging final model artifacts to MLflow.") |
|
|
mlflow.log_artifacts( |
|
|
self.cfg.model_output_path, |
|
|
artifact_path=f"{self.cfg.lang}_transformer_model", |
|
|
) |
|
|
|
|
|
logger.info("Logging HF transformers model to MLflow via mlflow.transformers.log_model.") |
|
|
model_info = mlflow.transformers.log_model( |
|
|
transformers_model=self.cfg.model_output_path, |
|
|
artifact_path=f"{self.cfg.lang}_transformer_model", |
|
|
task="text-classification", |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"Logged transformers model to MLflow with URI: %s", |
|
|
model_info.model_uri, |
|
|
) |
|
|
|
|
|
return { |
|
|
"micro_f1": float(micro_f1), |
|
|
"macro_f1": float(macro_f1), |
|
|
} |
|
|
|