import json import os import shutil import warnings from loguru import logger import mlflow import numpy as np from sklearn.metrics import ( accuracy_score, classification_report, f1_score, precision_score, recall_score, ) import torch from torch.utils.data import Dataset from transformers import ( AutoModelForSequenceClassification, AutoTokenizer, EarlyStoppingCallback, Trainer, TrainingArguments, ) from turing.config import MODELS_DIR from ..baseModel import BaseModel warnings.filterwarnings("ignore") def compute_metrics(eval_pred): predictions, labels = eval_pred # Convert logits to probabilities probs = 1 / (1 + np.exp(-predictions)) preds = (probs > 0.35).astype(int) # metrics f1 = f1_score(labels, preds, average="micro") accuracy = accuracy_score(labels, preds) precision = precision_score(labels, preds, average="micro") recall = recall_score(labels, preds, average="micro") return { "f1": f1, "accuracy": accuracy, "precision": precision, "recall": recall, } class DebertaDataset(Dataset): """ Internal Dataset class for DeBERTa. """ def __init__(self, encodings, labels=None, num_labels=None): self.encodings = {key: torch.tensor(val) for key, val in encodings.items()} if labels is not None: if not isinstance(labels, (np.ndarray, torch.Tensor)): labels = np.array(labels) # Handle standard label list or flattened format if num_labels is not None and (len(labels.shape) == 1 or (len(labels.shape) == 2 and labels.shape[1] == 1)): labels_flat = labels.flatten() one_hot = np.zeros((len(labels_flat), num_labels), dtype=np.float32) valid_indices = labels_flat < num_labels one_hot[valid_indices, labels_flat[valid_indices]] = 1.0 self.labels = torch.tensor(one_hot, dtype=torch.float) else: self.labels = torch.tensor(labels, dtype=torch.float) else: self.labels = None def __getitem__(self, idx): item = {key: val[idx] for key, val in self.encodings.items()} if self.labels is not None: item['labels'] = self.labels[idx] return item def __len__(self): return len(self.encodings['input_ids']) class WeightedTrainer(Trainer): def compute_loss(self, model, inputs, return_outputs=False, **kwargs): labels = inputs.get("labels") outputs = model(**inputs) logits = outputs.get("logits") pos_weight = torch.ones([logits.shape[1]]).to(logits.device) * 4.0 loss_fct = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight) loss = loss_fct(logits, labels.float()) return (loss, outputs) if return_outputs else loss class DebertaXSmall(BaseModel): """ Wrapper for Microsoft DeBERTa-v3-xsmall. """ def __init__(self, language, path=None): epochs = 10 if language == "java" else 20 lr = 2e-5 if language == "java" else 3e-5 self.params = { "model_name_hf": "microsoft/deberta-v3-xsmall", # Java: 7, Python: 5, Pharo: 6 "num_labels": 7 if language == "java" else 5 if language == "python" else 6, "max_length": 128, "epochs": epochs, "batch_size_train": 32, "batch_size_eval": 64, "learning_rate": lr, "weight_decay": 0.01, "train_size": 0.8, "early_stopping_patience": 3, "early_stopping_threshold": 0.005, "warmup_steps": 100 } self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.tokenizer = None super().__init__(language, path) def setup_model(self): logger.info(f"Initializing {self.params['model_name_hf']} on {self.device}...") self.tokenizer = AutoTokenizer.from_pretrained(self.params["model_name_hf"], use_fast=False) self.model = AutoModelForSequenceClassification.from_pretrained( self.params["model_name_hf"], num_labels=self.params["num_labels"], problem_type="multi_label_classification" ).to(self.device) logger.success("DeBERTa-v3-xsmall model initialized.") def _tokenize(self, texts): safe_texts = [] for t in texts: # Handle potential NaNs or non-strings safe_texts.append(str(t) if t is not None and t == t else "") return self.tokenizer( safe_texts, truncation=True, padding=True, max_length=self.params["max_length"] ) def train(self, X_train, y_train) -> dict: if self.model is None: raise ValueError("Model not initialized.") params_to_log = {k: v for k, v in self.params.items() if k != "model_name_hf"} logger.info(f"Starting training for: {self.language.upper()}") train_encodings = self._tokenize(X_train) full_dataset = DebertaDataset(train_encodings, y_train, num_labels=self.params["num_labels"]) train_len = int(self.params["train_size"] * len(full_dataset)) val_len = len(full_dataset) - train_len train_ds, val_ds = torch.utils.data.random_split(full_dataset, [train_len, val_len]) temp_ckpt_dir = os.path.join(MODELS_DIR, "temp_deberta_ckpt") training_args = TrainingArguments( output_dir=temp_ckpt_dir, num_train_epochs=self.params["epochs"], per_device_train_batch_size=self.params["batch_size_train"], per_device_eval_batch_size=self.params["batch_size_eval"], learning_rate=self.params["learning_rate"], weight_decay=self.params["weight_decay"], eval_strategy="epoch", save_strategy="epoch", load_best_model_at_end=True, metric_for_best_model="f1", greater_is_better=True, save_total_limit=1, logging_dir='./logs', report_to="none", fp16=torch.cuda.is_available() ) trainer = WeightedTrainer( model=self.model, args=training_args, train_dataset=train_ds, eval_dataset=val_ds, compute_metrics=compute_metrics, callbacks=[EarlyStoppingCallback( early_stopping_patience=self.params["early_stopping_patience"], early_stopping_threshold=self.params["early_stopping_threshold"] )] ) trainer.train() if os.path.exists(temp_ckpt_dir): shutil.rmtree(temp_ckpt_dir) return params_to_log def evaluate(self, X_test, y_test) -> dict: y_pred = self.predict(X_test) y_test_np = np.array(y_test) if not isinstance(y_test, np.ndarray) else y_test # Handle 1D array conversion for metrics if necessary if y_test_np.ndim == 1 or (y_test_np.ndim == 2 and y_test_np.shape[1] == 1): y_test_expanded = np.zeros((y_test_np.shape[0], self.params["num_labels"]), dtype=int) indices = y_test_np.flatten() for i, label_idx in enumerate(indices): if 0 <= label_idx < self.params["num_labels"]: y_test_expanded[i, int(label_idx)] = 1 y_test_np = y_test_expanded report = classification_report(y_test_np, y_pred, zero_division=0) print(f"\n[DeBERTa {self.language}] Classification Report:\n{report}") metrics = { "accuracy": accuracy_score(y_test_np, y_pred), "f1_score_micro": f1_score(y_test_np, y_pred, average="micro"), "f1_score_weighted": f1_score(y_test_np, y_pred, average="weighted"), } mlflow.log_metrics(metrics) return metrics def predict(self, X) -> np.ndarray: if self.model is None: raise ValueError("Model not trained.") self.model.eval() encodings = self._tokenize(X) dataset = DebertaDataset(encodings, labels=None) training_args = TrainingArguments( output_dir="./pred_temp_deberta", per_device_eval_batch_size=self.params["batch_size_eval"], fp16=torch.cuda.is_available(), report_to="none" ) trainer = Trainer(model=self.model, args=training_args) output = trainer.predict(dataset) if os.path.exists("./pred_temp_deberta"): shutil.rmtree("./pred_temp_deberta") logits = output.predictions probs = 1 / (1 + np.exp(-logits)) return (probs > 0.35).astype(int) def save(self, path, model_name): """ save model """ if self.model is None: raise ValueError("Model not trained.") complete_path = os.path.join(path, self.language, model_name) if os.path.exists(complete_path): shutil.rmtree(complete_path) logger.info(f"Saving model to: {complete_path}") self.model.save_pretrained(complete_path) self.tokenizer.save_pretrained(complete_path) config_data = { "language": self.language, "num_labels": self.params["num_labels"], "model_name": model_name } with open(os.path.join(complete_path, "config_custom.json"), "w") as f: json.dump(config_data, f) logger.info("Model saved locally.") try: # Log on MLflow logger.info("Logging artifacts to MLflow...") mlflow.log_artifacts(local_dir=complete_path, artifact_path=f"{self.language}/{model_name}") except Exception as e: logger.error(f"Failed to log model artifacts to MLflow: {e}")