github-actions[bot]
Sync turing folder from GitHub
8e13241
import json
import os
import shutil
import warnings
from loguru import logger
import mlflow
import numpy as np
from sklearn.metrics import (
accuracy_score,
classification_report,
f1_score,
precision_score,
recall_score,
)
import torch
from torch.utils.data import Dataset
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
EarlyStoppingCallback,
Trainer,
TrainingArguments,
)
from turing.config import MODELS_DIR
from ..baseModel import BaseModel
warnings.filterwarnings("ignore")
def compute_metrics(eval_pred):
predictions, labels = eval_pred
# Convert logits to probabilities
probs = 1 / (1 + np.exp(-predictions))
preds = (probs > 0.35).astype(int)
# metrics
f1 = f1_score(labels, preds, average="micro")
accuracy = accuracy_score(labels, preds)
precision = precision_score(labels, preds, average="micro")
recall = recall_score(labels, preds, average="micro")
return {
"f1": f1,
"accuracy": accuracy,
"precision": precision,
"recall": recall,
}
class DebertaDataset(Dataset):
"""
Internal Dataset class for DeBERTa.
"""
def __init__(self, encodings, labels=None, num_labels=None):
self.encodings = {key: torch.tensor(val) for key, val in encodings.items()}
if labels is not None:
if not isinstance(labels, (np.ndarray, torch.Tensor)):
labels = np.array(labels)
# Handle standard label list or flattened format
if num_labels is not None and (len(labels.shape) == 1 or (len(labels.shape) == 2 and labels.shape[1] == 1)):
labels_flat = labels.flatten()
one_hot = np.zeros((len(labels_flat), num_labels), dtype=np.float32)
valid_indices = labels_flat < num_labels
one_hot[valid_indices, labels_flat[valid_indices]] = 1.0
self.labels = torch.tensor(one_hot, dtype=torch.float)
else:
self.labels = torch.tensor(labels, dtype=torch.float)
else:
self.labels = None
def __getitem__(self, idx):
item = {key: val[idx] for key, val in self.encodings.items()}
if self.labels is not None:
item['labels'] = self.labels[idx]
return item
def __len__(self):
return len(self.encodings['input_ids'])
class WeightedTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
labels = inputs.get("labels")
outputs = model(**inputs)
logits = outputs.get("logits")
pos_weight = torch.ones([logits.shape[1]]).to(logits.device) * 4.0
loss_fct = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight)
loss = loss_fct(logits, labels.float())
return (loss, outputs) if return_outputs else loss
class DebertaXSmall(BaseModel):
"""
Wrapper for Microsoft DeBERTa-v3-xsmall.
"""
def __init__(self, language, path=None):
epochs = 10 if language == "java" else 20
lr = 2e-5 if language == "java" else 3e-5
self.params = {
"model_name_hf": "microsoft/deberta-v3-xsmall",
# Java: 7, Python: 5, Pharo: 6
"num_labels": 7 if language == "java" else 5 if language == "python" else 6,
"max_length": 128,
"epochs": epochs,
"batch_size_train": 32,
"batch_size_eval": 64,
"learning_rate": lr,
"weight_decay": 0.01,
"train_size": 0.8,
"early_stopping_patience": 3,
"early_stopping_threshold": 0.005,
"warmup_steps": 100
}
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = None
super().__init__(language, path)
def setup_model(self):
logger.info(f"Initializing {self.params['model_name_hf']} on {self.device}...")
self.tokenizer = AutoTokenizer.from_pretrained(self.params["model_name_hf"], use_fast=False)
self.model = AutoModelForSequenceClassification.from_pretrained(
self.params["model_name_hf"],
num_labels=self.params["num_labels"],
problem_type="multi_label_classification"
).to(self.device)
logger.success("DeBERTa-v3-xsmall model initialized.")
def _tokenize(self, texts):
safe_texts = []
for t in texts:
# Handle potential NaNs or non-strings
safe_texts.append(str(t) if t is not None and t == t else "")
return self.tokenizer(
safe_texts,
truncation=True,
padding=True,
max_length=self.params["max_length"]
)
def train(self, X_train, y_train) -> dict:
if self.model is None:
raise ValueError("Model not initialized.")
params_to_log = {k: v for k, v in self.params.items() if k != "model_name_hf"}
logger.info(f"Starting training for: {self.language.upper()}")
train_encodings = self._tokenize(X_train)
full_dataset = DebertaDataset(train_encodings, y_train, num_labels=self.params["num_labels"])
train_len = int(self.params["train_size"] * len(full_dataset))
val_len = len(full_dataset) - train_len
train_ds, val_ds = torch.utils.data.random_split(full_dataset, [train_len, val_len])
temp_ckpt_dir = os.path.join(MODELS_DIR, "temp_deberta_ckpt")
training_args = TrainingArguments(
output_dir=temp_ckpt_dir,
num_train_epochs=self.params["epochs"],
per_device_train_batch_size=self.params["batch_size_train"],
per_device_eval_batch_size=self.params["batch_size_eval"],
learning_rate=self.params["learning_rate"],
weight_decay=self.params["weight_decay"],
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
save_total_limit=1,
logging_dir='./logs',
report_to="none",
fp16=torch.cuda.is_available()
)
trainer = WeightedTrainer(
model=self.model,
args=training_args,
train_dataset=train_ds,
eval_dataset=val_ds,
compute_metrics=compute_metrics,
callbacks=[EarlyStoppingCallback(
early_stopping_patience=self.params["early_stopping_patience"],
early_stopping_threshold=self.params["early_stopping_threshold"]
)]
)
trainer.train()
if os.path.exists(temp_ckpt_dir):
shutil.rmtree(temp_ckpt_dir)
return params_to_log
def evaluate(self, X_test, y_test) -> dict:
y_pred = self.predict(X_test)
y_test_np = np.array(y_test) if not isinstance(y_test, np.ndarray) else y_test
# Handle 1D array conversion for metrics if necessary
if y_test_np.ndim == 1 or (y_test_np.ndim == 2 and y_test_np.shape[1] == 1):
y_test_expanded = np.zeros((y_test_np.shape[0], self.params["num_labels"]), dtype=int)
indices = y_test_np.flatten()
for i, label_idx in enumerate(indices):
if 0 <= label_idx < self.params["num_labels"]:
y_test_expanded[i, int(label_idx)] = 1
y_test_np = y_test_expanded
report = classification_report(y_test_np, y_pred, zero_division=0)
print(f"\n[DeBERTa {self.language}] Classification Report:\n{report}")
metrics = {
"accuracy": accuracy_score(y_test_np, y_pred),
"f1_score_micro": f1_score(y_test_np, y_pred, average="micro"),
"f1_score_weighted": f1_score(y_test_np, y_pred, average="weighted"),
}
mlflow.log_metrics(metrics)
return metrics
def predict(self, X) -> np.ndarray:
if self.model is None:
raise ValueError("Model not trained.")
self.model.eval()
encodings = self._tokenize(X)
dataset = DebertaDataset(encodings, labels=None)
training_args = TrainingArguments(
output_dir="./pred_temp_deberta",
per_device_eval_batch_size=self.params["batch_size_eval"],
fp16=torch.cuda.is_available(),
report_to="none"
)
trainer = Trainer(model=self.model, args=training_args)
output = trainer.predict(dataset)
if os.path.exists("./pred_temp_deberta"):
shutil.rmtree("./pred_temp_deberta")
logits = output.predictions
probs = 1 / (1 + np.exp(-logits))
return (probs > 0.35).astype(int)
def save(self, path, model_name):
"""
save model
"""
if self.model is None:
raise ValueError("Model not trained.")
complete_path = os.path.join(path, self.language, model_name)
if os.path.exists(complete_path):
shutil.rmtree(complete_path)
logger.info(f"Saving model to: {complete_path}")
self.model.save_pretrained(complete_path)
self.tokenizer.save_pretrained(complete_path)
config_data = {
"language": self.language,
"num_labels": self.params["num_labels"],
"model_name": model_name
}
with open(os.path.join(complete_path, "config_custom.json"), "w") as f:
json.dump(config_data, f)
logger.info("Model saved locally.")
try:
# Log on MLflow
logger.info("Logging artifacts to MLflow...")
mlflow.log_artifacts(local_dir=complete_path, artifact_path=f"{self.language}/{model_name}")
except Exception as e:
logger.error(f"Failed to log model artifacts to MLflow: {e}")