|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import torch |
|
|
from torch import nn |
|
|
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score |
|
|
from sklearn.utils.class_weight import compute_class_weight |
|
|
from datasets import Dataset |
|
|
from transformers import ( |
|
|
AutoTokenizer, |
|
|
AutoModelForSequenceClassification, |
|
|
TrainingArguments, |
|
|
Trainer, |
|
|
EvalPrediction |
|
|
) |
|
|
import optuna |
|
|
|
|
|
|
|
|
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com' |
|
|
os.environ['HF_HUB_CACHE'] = '/root/autodl-tmp/huggingface_cache' |
|
|
|
|
|
|
|
|
|
|
|
MODEL_NAME_OR_PATH = "microsoft/deberta-v3-base" |
|
|
|
|
|
|
|
|
TRAIN_FILE_PATH = "/tmp/home/wzh/file/train_data.csv" |
|
|
VALID_FILE_PATH = "/tmp/home/wzh/file/val_data.csv" |
|
|
|
|
|
|
|
|
print(f"加载训练集: {TRAIN_FILE_PATH}") |
|
|
train_df = pd.read_csv(TRAIN_FILE_PATH) |
|
|
print(f"加载验证集: {VALID_FILE_PATH}") |
|
|
eval_df = pd.read_csv(VALID_FILE_PATH) |
|
|
|
|
|
label_map = {"real": 0, "fake": 1} |
|
|
train_df['label'] = train_df['label'].map(label_map) |
|
|
eval_df['label'] = eval_df['label'].map(label_map) |
|
|
|
|
|
|
|
|
print("\n正在计算类别权重...") |
|
|
train_labels = np.array(train_df["label"]) |
|
|
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(train_labels), y=train_labels) |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device) |
|
|
print(f"计算出的类别权重: {class_weights}") |
|
|
|
|
|
|
|
|
train_dataset = Dataset.from_pandas(train_df) |
|
|
eval_dataset = Dataset.from_pandas(eval_df) |
|
|
|
|
|
print(f"\n正在下载/加载模型: {MODEL_NAME_OR_PATH} ...") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_OR_PATH) |
|
|
|
|
|
def tokenize_function(examples): |
|
|
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) |
|
|
|
|
|
tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True) |
|
|
tokenized_eval_dataset = eval_dataset.map(tokenize_function, batched=True) |
|
|
|
|
|
columns_to_remove = ["id", "text"] |
|
|
if "__index_level_0__" in train_df.columns: |
|
|
columns_to_remove.append("__index_level_0__") |
|
|
|
|
|
tokenized_train_dataset = tokenized_train_dataset.remove_columns(columns_to_remove) |
|
|
tokenized_eval_dataset = tokenized_eval_dataset.remove_columns(columns_to_remove) |
|
|
tokenized_train_dataset = tokenized_train_dataset.rename_column("label", "labels") |
|
|
tokenized_eval_dataset = tokenized_eval_dataset.rename_column("label", "labels") |
|
|
|
|
|
|
|
|
class CustomTrainer(Trainer): |
|
|
def compute_loss(self, model, inputs, return_outputs=False, **kwargs): |
|
|
labels = inputs.pop("labels") |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.get("logits") |
|
|
loss_fct = nn.CrossEntropyLoss(weight=class_weights_tensor) |
|
|
loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1)) |
|
|
return (loss, outputs) if return_outputs else loss |
|
|
|
|
|
|
|
|
id2label = {0: "real", 1: "fake"} |
|
|
label2id = {"real": 0, "fake": 1} |
|
|
|
|
|
def model_init(trial): |
|
|
return AutoModelForSequenceClassification.from_pretrained( |
|
|
MODEL_NAME_OR_PATH, |
|
|
num_labels=2, |
|
|
id2label=id2label, |
|
|
label2id=label2id, |
|
|
) |
|
|
|
|
|
def compute_metrics_macro(p: EvalPrediction): |
|
|
labels = p.label_ids |
|
|
preds = np.argmax(p.predictions, axis=1) |
|
|
|
|
|
f1_macro = f1_score(labels, preds, average='macro', zero_division=0) |
|
|
acc = accuracy_score(labels, preds) |
|
|
precision_macro = precision_score(labels, preds, average='macro', zero_division=0) |
|
|
recall_macro = recall_score(labels, preds, average='macro', zero_division=0) |
|
|
|
|
|
return { |
|
|
"accuracy": acc, |
|
|
"f1_macro": f1_macro, |
|
|
"precision_macro": precision_macro, |
|
|
"recall_macro": recall_macro |
|
|
} |
|
|
|
|
|
def compute_objective(metrics): |
|
|
return metrics['eval_f1_macro'] |
|
|
def my_hp_space(trial): |
|
|
return { |
|
|
|
|
|
"learning_rate": trial.suggest_float("learning_rate", 1e-6, 5e-5, log=True), |
|
|
|
|
|
|
|
|
"num_train_epochs": trial.suggest_int("num_train_epochs", 3, 8), |
|
|
|
|
|
|
|
|
"seed": trial.suggest_int("seed", 1, 40), |
|
|
|
|
|
|
|
|
|
|
|
"per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [16]), |
|
|
|
|
|
|
|
|
"weight_decay": trial.suggest_float("weight_decay", 0.0, 0.3), |
|
|
|
|
|
|
|
|
"warmup_ratio": trial.suggest_float("warmup_ratio", 0.0, 0.2), |
|
|
} |
|
|
training_args = TrainingArguments( |
|
|
output_dir="./results_hyper_search_DEBERTA", |
|
|
|
|
|
|
|
|
per_device_train_batch_size=16, |
|
|
per_device_eval_batch_size=16, |
|
|
|
|
|
weight_decay=0.01, |
|
|
eval_strategy="steps", |
|
|
eval_steps=1000, |
|
|
save_strategy="steps", |
|
|
save_steps=1000, |
|
|
logging_strategy="steps", |
|
|
logging_steps=1000, |
|
|
load_best_model_at_end=True, |
|
|
metric_for_best_model="f1_macro", |
|
|
greater_is_better=True, |
|
|
save_total_limit=1, |
|
|
) |
|
|
|
|
|
trainer = CustomTrainer( |
|
|
model=None, |
|
|
args=training_args, |
|
|
model_init=model_init, |
|
|
train_dataset=tokenized_train_dataset, |
|
|
eval_dataset=tokenized_eval_dataset, |
|
|
tokenizer=tokenizer, |
|
|
compute_metrics=compute_metrics_macro, |
|
|
) |
|
|
|
|
|
|
|
|
print("\n" + "="*50) |
|
|
print("🚀 [DeBERTa-v3] 开始自动超参数搜索 (Target: Macro-F1)...") |
|
|
print("="*50) |
|
|
|
|
|
best_run = trainer.hyperparameter_search( |
|
|
direction="maximize", |
|
|
n_trials=20, |
|
|
compute_objective=compute_objective, |
|
|
backend="optuna", |
|
|
hp_space=my_hp_space |
|
|
) |
|
|
|
|
|
print("\n" + "="*50) |
|
|
print("🎉 搜索完成!") |
|
|
print("="*50) |
|
|
print(f"最佳 Macro-F1: {best_run.objective:.4f}") |
|
|
print("最佳参数组合:", best_run.hyperparameters) |
|
|
|
|
|
|
|
|
print("\n" + "="*50) |
|
|
print("🚀 [DeBERTa-v3] 使用最佳参数进行最终训练...") |
|
|
print("="*50) |
|
|
|
|
|
for k, v in best_run.hyperparameters.items(): |
|
|
setattr(training_args, k, v) |
|
|
training_args.output_dir = "./results_final_best_DEBERTA" |
|
|
training_args.logging_steps = 200 |
|
|
|
|
|
trainer = CustomTrainer( |
|
|
model_init=model_init, |
|
|
args=training_args, |
|
|
train_dataset=tokenized_train_dataset, |
|
|
eval_dataset=tokenized_eval_dataset, |
|
|
compute_metrics=compute_metrics_macro, |
|
|
) |
|
|
|
|
|
trainer.train() |
|
|
print("\n" + "="*50) |
|
|
print("🎉 最终训练完成!") |
|
|
print("="*50) |
|
|
|
|
|
|
|
|
final_model_path = "./final_model_deberta_macro" |
|
|
trainer.save_model(final_model_path) |
|
|
tokenizer.save_pretrained(final_model_path) |
|
|
print(f"\nDeBERTa 最优模型已保存至: {final_model_path}") |
|
|
|
|
|
print("\n--- 最终成绩单 (验证集) ---") |
|
|
final_metrics = trainer.evaluate() |
|
|
|
|
|
for key, value in final_metrics.items(): |
|
|
if key.startswith("eval_"): |
|
|
key = key[5:] |
|
|
if isinstance(value, float): |
|
|
print(f" - {key}: {value:.4f}") |
|
|
else: |
|
|
print(f" - {key}: {value}") |