Spaces:
Running
Running
| import json | |
| import os | |
| import shutil | |
| import warnings | |
| from loguru import logger | |
| import mlflow | |
| import numpy as np | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| classification_report, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| ) | |
| import torch | |
| from torch.utils.data import Dataset | |
| from transformers import ( | |
| AutoModelForSequenceClassification, | |
| AutoTokenizer, | |
| EarlyStoppingCallback, | |
| Trainer, | |
| TrainingArguments, | |
| ) | |
| from turing.config import MODELS_DIR | |
| from ..baseModel import BaseModel | |
| warnings.filterwarnings("ignore") | |
| def compute_metrics(eval_pred): | |
| predictions, labels = eval_pred | |
| # Convert logits to probabilities | |
| probs = 1 / (1 + np.exp(-predictions)) | |
| preds = (probs > 0.35).astype(int) | |
| # metrics | |
| f1 = f1_score(labels, preds, average="micro") | |
| accuracy = accuracy_score(labels, preds) | |
| precision = precision_score(labels, preds, average="micro") | |
| recall = recall_score(labels, preds, average="micro") | |
| return { | |
| "f1": f1, | |
| "accuracy": accuracy, | |
| "precision": precision, | |
| "recall": recall, | |
| } | |
| class DebertaDataset(Dataset): | |
| """ | |
| Internal Dataset class for DeBERTa. | |
| """ | |
| def __init__(self, encodings, labels=None, num_labels=None): | |
| self.encodings = {key: torch.tensor(val) for key, val in encodings.items()} | |
| if labels is not None: | |
| if not isinstance(labels, (np.ndarray, torch.Tensor)): | |
| labels = np.array(labels) | |
| # Handle standard label list or flattened format | |
| if num_labels is not None and (len(labels.shape) == 1 or (len(labels.shape) == 2 and labels.shape[1] == 1)): | |
| labels_flat = labels.flatten() | |
| one_hot = np.zeros((len(labels_flat), num_labels), dtype=np.float32) | |
| valid_indices = labels_flat < num_labels | |
| one_hot[valid_indices, labels_flat[valid_indices]] = 1.0 | |
| self.labels = torch.tensor(one_hot, dtype=torch.float) | |
| else: | |
| self.labels = torch.tensor(labels, dtype=torch.float) | |
| else: | |
| self.labels = None | |
| def __getitem__(self, idx): | |
| item = {key: val[idx] for key, val in self.encodings.items()} | |
| if self.labels is not None: | |
| item['labels'] = self.labels[idx] | |
| return item | |
| def __len__(self): | |
| return len(self.encodings['input_ids']) | |
| class WeightedTrainer(Trainer): | |
| def compute_loss(self, model, inputs, return_outputs=False, **kwargs): | |
| labels = inputs.get("labels") | |
| outputs = model(**inputs) | |
| logits = outputs.get("logits") | |
| pos_weight = torch.ones([logits.shape[1]]).to(logits.device) * 4.0 | |
| loss_fct = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight) | |
| loss = loss_fct(logits, labels.float()) | |
| return (loss, outputs) if return_outputs else loss | |
| class DebertaXSmall(BaseModel): | |
| """ | |
| Wrapper for Microsoft DeBERTa-v3-xsmall. | |
| """ | |
| def __init__(self, language, path=None): | |
| epochs = 10 if language == "java" else 20 | |
| lr = 2e-5 if language == "java" else 3e-5 | |
| self.params = { | |
| "model_name_hf": "microsoft/deberta-v3-xsmall", | |
| # Java: 7, Python: 5, Pharo: 6 | |
| "num_labels": 7 if language == "java" else 5 if language == "python" else 6, | |
| "max_length": 128, | |
| "epochs": epochs, | |
| "batch_size_train": 32, | |
| "batch_size_eval": 64, | |
| "learning_rate": lr, | |
| "weight_decay": 0.01, | |
| "train_size": 0.8, | |
| "early_stopping_patience": 3, | |
| "early_stopping_threshold": 0.005, | |
| "warmup_steps": 100 | |
| } | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.tokenizer = None | |
| super().__init__(language, path) | |
| def setup_model(self): | |
| logger.info(f"Initializing {self.params['model_name_hf']} on {self.device}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.params["model_name_hf"], use_fast=False) | |
| self.model = AutoModelForSequenceClassification.from_pretrained( | |
| self.params["model_name_hf"], | |
| num_labels=self.params["num_labels"], | |
| problem_type="multi_label_classification" | |
| ).to(self.device) | |
| logger.success("DeBERTa-v3-xsmall model initialized.") | |
| def _tokenize(self, texts): | |
| safe_texts = [] | |
| for t in texts: | |
| # Handle potential NaNs or non-strings | |
| safe_texts.append(str(t) if t is not None and t == t else "") | |
| return self.tokenizer( | |
| safe_texts, | |
| truncation=True, | |
| padding=True, | |
| max_length=self.params["max_length"] | |
| ) | |
| def train(self, X_train, y_train) -> dict: | |
| if self.model is None: | |
| raise ValueError("Model not initialized.") | |
| params_to_log = {k: v for k, v in self.params.items() if k != "model_name_hf"} | |
| logger.info(f"Starting training for: {self.language.upper()}") | |
| train_encodings = self._tokenize(X_train) | |
| full_dataset = DebertaDataset(train_encodings, y_train, num_labels=self.params["num_labels"]) | |
| train_len = int(self.params["train_size"] * len(full_dataset)) | |
| val_len = len(full_dataset) - train_len | |
| train_ds, val_ds = torch.utils.data.random_split(full_dataset, [train_len, val_len]) | |
| temp_ckpt_dir = os.path.join(MODELS_DIR, "temp_deberta_ckpt") | |
| training_args = TrainingArguments( | |
| output_dir=temp_ckpt_dir, | |
| num_train_epochs=self.params["epochs"], | |
| per_device_train_batch_size=self.params["batch_size_train"], | |
| per_device_eval_batch_size=self.params["batch_size_eval"], | |
| learning_rate=self.params["learning_rate"], | |
| weight_decay=self.params["weight_decay"], | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model="f1", | |
| greater_is_better=True, | |
| save_total_limit=1, | |
| logging_dir='./logs', | |
| report_to="none", | |
| fp16=torch.cuda.is_available() | |
| ) | |
| trainer = WeightedTrainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=train_ds, | |
| eval_dataset=val_ds, | |
| compute_metrics=compute_metrics, | |
| callbacks=[EarlyStoppingCallback( | |
| early_stopping_patience=self.params["early_stopping_patience"], | |
| early_stopping_threshold=self.params["early_stopping_threshold"] | |
| )] | |
| ) | |
| trainer.train() | |
| if os.path.exists(temp_ckpt_dir): | |
| shutil.rmtree(temp_ckpt_dir) | |
| return params_to_log | |
| def evaluate(self, X_test, y_test) -> dict: | |
| y_pred = self.predict(X_test) | |
| y_test_np = np.array(y_test) if not isinstance(y_test, np.ndarray) else y_test | |
| # Handle 1D array conversion for metrics if necessary | |
| if y_test_np.ndim == 1 or (y_test_np.ndim == 2 and y_test_np.shape[1] == 1): | |
| y_test_expanded = np.zeros((y_test_np.shape[0], self.params["num_labels"]), dtype=int) | |
| indices = y_test_np.flatten() | |
| for i, label_idx in enumerate(indices): | |
| if 0 <= label_idx < self.params["num_labels"]: | |
| y_test_expanded[i, int(label_idx)] = 1 | |
| y_test_np = y_test_expanded | |
| report = classification_report(y_test_np, y_pred, zero_division=0) | |
| print(f"\n[DeBERTa {self.language}] Classification Report:\n{report}") | |
| metrics = { | |
| "accuracy": accuracy_score(y_test_np, y_pred), | |
| "f1_score_micro": f1_score(y_test_np, y_pred, average="micro"), | |
| "f1_score_weighted": f1_score(y_test_np, y_pred, average="weighted"), | |
| } | |
| mlflow.log_metrics(metrics) | |
| return metrics | |
| def predict(self, X) -> np.ndarray: | |
| if self.model is None: | |
| raise ValueError("Model not trained.") | |
| self.model.eval() | |
| encodings = self._tokenize(X) | |
| dataset = DebertaDataset(encodings, labels=None) | |
| training_args = TrainingArguments( | |
| output_dir="./pred_temp_deberta", | |
| per_device_eval_batch_size=self.params["batch_size_eval"], | |
| fp16=torch.cuda.is_available(), | |
| report_to="none" | |
| ) | |
| trainer = Trainer(model=self.model, args=training_args) | |
| output = trainer.predict(dataset) | |
| if os.path.exists("./pred_temp_deberta"): | |
| shutil.rmtree("./pred_temp_deberta") | |
| logits = output.predictions | |
| probs = 1 / (1 + np.exp(-logits)) | |
| return (probs > 0.35).astype(int) | |
| def save(self, path, model_name): | |
| """ | |
| save model | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model not trained.") | |
| complete_path = os.path.join(path, self.language, model_name) | |
| if os.path.exists(complete_path): | |
| shutil.rmtree(complete_path) | |
| logger.info(f"Saving model to: {complete_path}") | |
| self.model.save_pretrained(complete_path) | |
| self.tokenizer.save_pretrained(complete_path) | |
| config_data = { | |
| "language": self.language, | |
| "num_labels": self.params["num_labels"], | |
| "model_name": model_name | |
| } | |
| with open(os.path.join(complete_path, "config_custom.json"), "w") as f: | |
| json.dump(config_data, f) | |
| logger.info("Model saved locally.") | |
| try: | |
| # Log on MLflow | |
| logger.info("Logging artifacts to MLflow...") | |
| mlflow.log_artifacts(local_dir=complete_path, artifact_path=f"{self.language}/{model_name}") | |
| except Exception as e: | |
| logger.error(f"Failed to log model artifacts to MLflow: {e}") |