github-actions[bot]
Sync turing folder from GitHub
5ecd2f9
import os
import shutil
import warnings
from loguru import logger
import mlflow
import numpy as np
from numpy import ndarray
from sklearn.metrics import (
accuracy_score,
classification_report,
f1_score,
precision_score,
recall_score,
)
import torch
from torch.utils.data import Dataset
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
EarlyStoppingCallback,
Trainer,
TrainingArguments,
)
from turing.config import MODELS_DIR
from ..baseModel import BaseModel
warnings.filterwarnings("ignore")
def compute_metrics(eval_pred):
predictions, labels = eval_pred
# Sigmoid function to convert logits to probabilities
probs = 1 / (1 + np.exp(-predictions))
# Apply threshold of 0.5 (becomes 1 if > 0.5, otherwise 0)
preds = (probs > 0.5).astype(int)
# Calculate F1 score (macro average for multi-label)
f1 = f1_score(labels, preds, average='macro')
precision = precision_score(labels, preds, average='macro', zero_division=0)
recall = recall_score(labels, preds, average='macro', zero_division=0)
return {
'f1': f1,
'precision': precision,
'recall': recall,
}
class CodeBERTaDataset(Dataset):
"""
Internal Dataset class for CodeBERTa.
"""
def __init__(self, encodings, labels=None, num_labels=None):
"""
Initialize the InternalDataset.
Args:
encodings (dict): Tokenized encodings.
labels (list or np.ndarray, optional): Corresponding labels.
num_labels (int, optional): Total number of classes. Required for auto-converting indices to one-hot.
"""
self.encodings = {key: torch.tensor(val) for key, val in encodings.items()}
if labels is not None:
if not isinstance(labels, (np.ndarray, torch.Tensor)):
labels = np.array(labels)
# Case A: labels are indices (integers)
if num_labels is not None and (len(labels.shape) == 1 or (len(labels.shape) == 2 and labels.shape[1] == 1)):
labels_flat = labels.flatten()
# Create one-hot encoded matrix
one_hot = np.zeros((len(labels_flat), num_labels), dtype=np.float32)
# Set the corresponding index to 1
valid_indices = labels_flat < num_labels
one_hot[valid_indices, labels_flat[valid_indices]] = 1.0
self.labels = torch.tensor(one_hot, dtype=torch.float)
# Case B: labels are already vectors (e.g., One-Hot or Multi-Hot)
else:
self.labels = torch.tensor(labels, dtype=torch.float)
else:
self.labels = None
def __getitem__(self, idx):
"""
Retrieve item at index idx.
Args:
idx (int): Index of the item to retrieve.
Returns:
dict: Dictionary containing input_ids, attention_mask, and labels (if available).
"""
item = {key: val[idx] for key, val in self.encodings.items()}
if self.labels is not None:
item['labels'] = self.labels[idx]
return item
def __len__(self):
"""
Return the length of the dataset.
Returns:
int: Length of the dataset.
"""
return len(self.encodings['input_ids'])
class CodeBERTa(BaseModel):
"""
HuggingFace implementation of BaseModel for Code Comment Classification.
Uses CodeBERTa-small-v1 for efficient inference.
"""
def __init__(self, language, path=None):
"""
Initialize the CodeBERTa model with configuration parameters.
Args:
language (str): Language for the model.
path (str, optional): Path to load a pre-trained model. Defaults to None.
"""
self.params = {
"model_name_hf": "huggingface/CodeBERTa-small-v1",
"num_labels": 7 if language == "java" else 5 if language == "python" else 6,
"max_length": 128,
"epochs": 15,
"batch_size_train": 16,
"batch_size_eval": 64,
"learning_rate": 1e-5,
"weight_decay": 0.02,
"train_size": 0.8,
"early_stopping_patience": 3,
"early_stopping_threshold": 0.005
}
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = None
super().__init__(language, path)
def setup_model(self):
"""
Initialize the CodeBERTa tokenizer and model.
"""
logger.info(f"Initializing {self.params['model_name_hf']} on {self.device}...")
self.tokenizer = AutoTokenizer.from_pretrained(self.params["model_name_hf"])
self.model = AutoModelForSequenceClassification.from_pretrained(
self.params["model_name_hf"],
num_labels=self.params["num_labels"],
problem_type="multi_label_classification"
).to(self.device)
logger.info("CodeBERTa model initialized.")
def _tokenize(self, texts):
"""
Helper to tokenize list of texts efficiently.
Args:
texts (list): List of text strings to tokenize.
Returns:
dict: Tokenized encodings.
"""
safe_texts = []
for t in texts:
if t is None:
safe_texts.append("")
elif isinstance(t, (int, float)):
if t != t: # NaN check
safe_texts.append("")
else:
safe_texts.append(str(t))
else:
safe_texts.append(str(t))
return self.tokenizer(
safe_texts,
truncation=True,
padding=True,
max_length=self.params["max_length"]
)
def train(self, X_train, y_train) -> dict[str,any]:
"""
Train the model using HF Trainer and log to MLflow.
Args:
X_train (list): Training input texts.
y_train (list or np.ndarray): Training labels.
Returns:
dict[str, any]: Dictionary of parameters used for training.
"""
if self.model is None:
raise ValueError("Model is not initialized. Call setup_model() before training.")
# log parameters to MLflow without model_name_hf
params_to_log = {k: v for k, v in self.params.items() if k != "model_name_hf" and k != "num_labels"}
logger.info(f"Starting training for: {self.language.upper()}")
# Prepare dataset (train/val split)
train_encodings = self._tokenize(X_train)
full_dataset = CodeBERTaDataset(train_encodings, y_train, num_labels=self.params["num_labels"])
full_dataset = CodeBERTaDataset(train_encodings, y_train, num_labels=self.params["num_labels"])
train_size = int(self.params["train_size"] * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])
temp_ckpt_dir = os.path.join(MODELS_DIR, "temp_checkpoints")
use_fp16 = torch.cuda.is_available()
training_args = TrainingArguments(
output_dir=temp_ckpt_dir,
num_train_epochs=self.params["epochs"],
per_device_train_batch_size=self.params["batch_size_train"],
per_device_eval_batch_size=self.params["batch_size_eval"],
learning_rate=self.params["learning_rate"],
weight_decay=self.params["weight_decay"],
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
save_total_limit=2,
logging_dir='./logs',
logging_steps=50,
fp16=use_fp16,
optim="adamw_torch",
report_to="none",
no_cuda=not torch.cuda.is_available()
)
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
callbacks=[EarlyStoppingCallback(early_stopping_patience=self.params["early_stopping_patience"], early_stopping_threshold=self.params["early_stopping_threshold"])]
)
trainer.train()
logger.info(f"Training for {self.language.upper()} completed.")
if os.path.exists(temp_ckpt_dir):
shutil.rmtree(temp_ckpt_dir)
return params_to_log
def evaluate(self, X_test, y_test) -> dict[str,any]:
"""
Evaluate model on test data, return metrics and log to MLflow.
Handles automatic conversion of y_test to match multi-label prediction shape.
Args:
X_test (list): Input test data.
y_test (list or np.ndarray): True labels for test data.
Returns:
dict[str, any]: Dictionary of evaluation metrics.
"""
# Obtain predictions
y_pred = self.predict(X_test)
# Convert y_test to numpy array if needed
if not isinstance(y_test, (np.ndarray, torch.Tensor)):
y_test_np = np.array(y_test)
elif isinstance(y_test, torch.Tensor):
y_test_np = y_test.cpu().numpy()
else:
y_test_np = y_test
num_labels = self.params["num_labels"]
is_multilabel_pred = (y_pred.ndim == 2 and y_pred.shape[1] > 1)
is_flat_truth = (y_test_np.ndim == 1) or (y_test_np.ndim == 2 and y_test_np.shape[1] == 1)
if is_multilabel_pred and is_flat_truth:
# Create a zero matrix
y_test_expanded = np.zeros((y_test_np.shape[0], num_labels), dtype=int)
# Flatten y_test for iteration
indices = y_test_np.flatten()
# Use indices to set the correct column to 1
for i, label_idx in enumerate(indices):
idx = int(label_idx)
if 0 <= idx < num_labels:
y_test_expanded[i, idx] = 1
y_test_np = y_test_expanded
# Generate classification report
report = classification_report(y_test_np, y_pred, zero_division=0)
print("\n" + "=" * 50)
print("CLASSIFICATION REPORT")
print(report)
print("=" * 50 + "\n")
metrics = {
"accuracy": accuracy_score(y_test_np, y_pred),
"precision": precision_score(y_test_np, y_pred, average="macro", zero_division=0),
"recall": recall_score(y_test_np, y_pred, average="macro", zero_division=0),
"f1_score": f1_score(y_test_np, y_pred, average="macro"),
}
mlflow.log_metrics(metrics)
logger.info(f"Evaluation completed — Accuracy: {metrics['accuracy']:.3f}, F1: {metrics['f1_score']:.3f}")
return metrics
def predict(self, X) -> ndarray:
"""
Make predictions for Multi-Label classification.
Returns Binary Matrix (Multi-Hot) where multiple classes can be 1.
Args:
X (list): Input texts for prediction.
Returns:
np.ndarray: Multi-Hot Encoded predictions (e.g., [[0, 1, 1, 0], ...])
"""
if self.model is None:
raise ValueError("Model is not trained. Call train() or load() before prediction.")
# Set model to evaluation mode
self.model.eval()
# Tokenize inputs
encodings = self._tokenize(X)
# Convert lists to tensors and move to device
inputs = {key: torch.tensor(val).to(self.device) for key, val in encodings.items()}
# Inference (no gradients, lightweight)
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
# Move back to CPU and convert to numpy
logits = logits.cpu().numpy()
# Sigmoid + Threshold
probs = 1 / (1 + np.exp(-logits))
# Apply a threshold of 0.5 (if prob > 0.5, predict 1 else 0)
preds_binary = (probs > 0.5).astype(int)
return preds_binary
def save(self, path, model_name):
"""
Save model locally and log to MLflow as artifact.
Args:
path (str): Directory path to save the model.
model_name (str): Name for the saved model.
"""
if self.model is None:
raise ValueError("Model is not trained. Cannot save uninitialized model.")
# Local Saving
complete_path = os.path.join(path, f"{model_name}_{self.language}")
# Remove existing directory if it exists
if os.path.exists(complete_path) and os.path.isdir(complete_path):
shutil.rmtree(complete_path)
# Save model and tokenizer
logger.info(f"Saving model to: {complete_path}")
self.model.save_pretrained(complete_path)
self.tokenizer.save_pretrained(complete_path)
logger.info("Model saved locally.")
try:
# Log to MLflow
logger.info("Logging artifacts to MLflow...")
mlflow.log_artifacts(local_dir=complete_path, artifact_path=f"{model_name}_{self.language}")
except Exception as e:
logger.error(f"Failed to log model artifacts to MLflow: {e}")
def load(self, model_path):
"""
Load model from a local path OR an MLflow URI.
Args:
model_path (str): Local path or MLflow URI to load the model from.
"""
logger.info(f"Loading model from: {model_path}")
local_model_path = model_path
# Downloading model from MLflow and saving to local path
if model_path.startswith("models:/") or model_path.startswith("runs:/"):
try:
logger.info("Detected MLflow model URI. Attempting to load from MLflow...")
local_model_path = os.path.join(MODELS_DIR, "mlflow_temp_models")
local_model_path = mlflow.artifacts.download_artifacts(artifact_uri=model_path, dst_path=local_model_path)
logger.info(f"Model downloaded from MLflow to: {local_model_path}")
except Exception as e:
logger.error(f"Failed to load from MLflow: {e}")
raise e
# Loading from local path
try:
if not os.path.exists(local_model_path):
raise FileNotFoundError(f"Model path not found: {local_model_path}")
# Load tokenizer and model from local path
self.tokenizer = AutoTokenizer.from_pretrained(local_model_path)
self.model = AutoModelForSequenceClassification.from_pretrained(
local_model_path,
low_cpu_mem_usage=False
).to(self.device)
logger.info("Model loaded from local path successfully.")
except Exception as e:
logger.error(f"Failed to load model from local path: {e}")
raise e
# Set model to evaluation mode
self.model.eval()