|
|
import os |
|
|
import torch |
|
|
import numpy as np |
|
|
import json |
|
|
import evaluate |
|
|
from datasets import load_dataset |
|
|
from transformers import BertPreTrainedModel, BertModel, BertTokenizerFast |
|
|
from transformers import TrainingArguments, Trainer |
|
|
from torch import nn |
|
|
from sklearn.metrics import f1_score, accuracy_score, jaccard_score |
|
|
|
|
|
|
|
|
DATA_DIR = "./processed_data_task2_fixed" |
|
|
|
|
|
TOKENIZER_NAME = "valuesimplex-ai-lab/FinBERT2-base" |
|
|
MODEL_NAME = "/home/hsichen/part_time/BERT_finetune/outputs/finbert2_dapt_model" |
|
|
|
|
|
|
|
|
NUM_LABELS = 5 |
|
|
OUTPUT_DIR = "/home/hsichen/part_time/BERT_finetune/outputs/finbert2_multilabel_model_finetuned_from_dapt" |
|
|
EPOCHS = 5 |
|
|
BATCH_SIZE = 16 |
|
|
LEARNING_RATE = 2e-5 |
|
|
SEED = 42 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BertForMultiLabelClassification(BertPreTrainedModel): |
|
|
""" |
|
|
基于 BERT 的多标签分类模型,使用 BCEWithLogitsLoss |
|
|
""" |
|
|
def __init__(self, config): |
|
|
super().__init__(config) |
|
|
self.num_labels = config.num_labels |
|
|
|
|
|
self.bert = BertModel(config) |
|
|
|
|
|
classifier_dropout = config.hidden_dropout_prob |
|
|
self.dropout = nn.Dropout(classifier_dropout) |
|
|
|
|
|
|
|
|
self.classifier = nn.Linear(config.hidden_size, self.num_labels) |
|
|
|
|
|
self.post_init() |
|
|
self.loss_fct = nn.BCEWithLogitsLoss() |
|
|
|
|
|
def forward(self, |
|
|
input_ids=None, |
|
|
attention_mask=None, |
|
|
token_type_ids=None, |
|
|
labels=None): |
|
|
|
|
|
outputs = self.bert( |
|
|
input_ids, |
|
|
attention_mask=attention_mask, |
|
|
token_type_ids=token_type_ids, |
|
|
) |
|
|
|
|
|
|
|
|
pooled_output = outputs.pooler_output |
|
|
pooled_output = self.dropout(pooled_output) |
|
|
|
|
|
|
|
|
logits = self.classifier(pooled_output) |
|
|
|
|
|
loss = None |
|
|
if labels is not None: |
|
|
|
|
|
loss = self.loss_fct(logits, labels.float()) |
|
|
|
|
|
return (loss, logits) if loss is not None else (logits,) |
|
|
|
|
|
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score |
|
|
|
|
|
|
|
|
TAG_COLS = ['Data', 'Action', 'Gain', 'Regu', 'Vague'] |
|
|
PREDICTION_THRESHOLD = 0.5 |
|
|
|
|
|
def compute_metrics(p): |
|
|
""" |
|
|
计算多标签分类的评估指标,包括全局指标和每个类别的指标。 |
|
|
要求:损失、F1、Precision、Accuracy、Recall。 |
|
|
""" |
|
|
|
|
|
logits = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions |
|
|
labels = p.label_ids |
|
|
|
|
|
|
|
|
probs = 1 / (1 + np.exp(-logits)) |
|
|
|
|
|
preds = (probs > PREDICTION_THRESHOLD).astype(int) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metrics = {} |
|
|
|
|
|
|
|
|
metrics["f1_micro"] = f1_score(labels, preds, average='micro') |
|
|
|
|
|
metrics["f1_macro"] = f1_score(labels, preds, average='macro') |
|
|
|
|
|
metrics["jaccard_samples"] = jaccard_score(labels, preds, average='samples') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i, tag in enumerate(TAG_COLS): |
|
|
y_true_class = labels[:, i] |
|
|
y_pred_class = preds[:, i] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metrics[f"{tag}_f1"] = f1_score(y_true_class, y_pred_class, average='binary', zero_division=0) |
|
|
|
|
|
metrics[f"{tag}_precision"] = precision_score(y_true_class, y_pred_class, average='binary', zero_division=0) |
|
|
|
|
|
metrics[f"{tag}_recall"] = recall_score(y_true_class, y_pred_class, average='binary', zero_division=0) |
|
|
|
|
|
|
|
|
metrics[f"{tag}_accuracy"] = accuracy_score(y_true_class, y_pred_class) |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
|
|
|
def finetune_multilabel_bert(): |
|
|
|
|
|
|
|
|
print("--- 1. 加载数据集 ---") |
|
|
data_files = { |
|
|
"train": os.path.join(DATA_DIR, "train.csv"), |
|
|
"validation": os.path.join(DATA_DIR, "validation.csv"), |
|
|
"test": os.path.join(DATA_DIR, "test.csv") |
|
|
} |
|
|
raw_datasets = load_dataset("csv", data_files=data_files) |
|
|
|
|
|
|
|
|
print("--- 2. 加载分词器和自定义模型 ---") |
|
|
|
|
|
tokenizer = BertTokenizerFast.from_pretrained(TOKENIZER_NAME) |
|
|
|
|
|
|
|
|
model = BertForMultiLabelClassification.from_pretrained( |
|
|
MODEL_NAME, |
|
|
num_labels=NUM_LABELS, |
|
|
ignore_mismatched_sizes=True |
|
|
) |
|
|
|
|
|
|
|
|
def tokenize_function(examples): |
|
|
|
|
|
tokenized = tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) |
|
|
|
|
|
|
|
|
|
|
|
label_list = json.loads(examples["labels"].replace("'", "\"")) |
|
|
tokenized["labels"] = label_list |
|
|
return tokenized |
|
|
|
|
|
|
|
|
tokenized_datasets = raw_datasets.map(tokenize_function, batched=False) |
|
|
|
|
|
|
|
|
tokenized_datasets = tokenized_datasets.remove_columns(["text"]) |
|
|
|
|
|
|
|
|
tokenized_datasets.set_format("torch", columns=['input_ids', 'attention_mask', 'labels']) |
|
|
|
|
|
train_dataset = tokenized_datasets["train"] |
|
|
eval_dataset = tokenized_datasets["validation"] |
|
|
test_dataset = tokenized_datasets["test"] |
|
|
|
|
|
|
|
|
print("--- 3. 设置训练参数和 Trainer ---") |
|
|
training_args = TrainingArguments( |
|
|
output_dir=OUTPUT_DIR, |
|
|
num_train_epochs=EPOCHS, |
|
|
per_device_train_batch_size=BATCH_SIZE, |
|
|
per_device_eval_batch_size=BATCH_SIZE, |
|
|
warmup_steps=200, |
|
|
weight_decay=0.01, |
|
|
logging_steps=50, |
|
|
eval_strategy="steps", |
|
|
eval_steps=50, |
|
|
save_strategy="steps", |
|
|
save_steps=500, |
|
|
load_best_model_at_end=True, |
|
|
metric_for_best_model="f1_micro", |
|
|
seed=SEED, |
|
|
learning_rate=3e-5, |
|
|
report_to="wandb" |
|
|
) |
|
|
|
|
|
|
|
|
trainer = Trainer( |
|
|
model=model, |
|
|
args=training_args, |
|
|
train_dataset=train_dataset, |
|
|
eval_dataset=eval_dataset, |
|
|
compute_metrics=compute_metrics, |
|
|
) |
|
|
|
|
|
|
|
|
print("--- 4. 开始训练 ---") |
|
|
trainer.train() |
|
|
|
|
|
|
|
|
print("--- 5. 评估测试集 ---") |
|
|
results = trainer.evaluate(test_dataset) |
|
|
print(f"测试集评估结果: {results}") |
|
|
|
|
|
|
|
|
trainer.save_model(os.path.join(OUTPUT_DIR, "final")) |
|
|
tokenizer.save_pretrained(os.path.join(OUTPUT_DIR, "final")) |
|
|
print(f"模型和分词器已保存至: {os.path.join(OUTPUT_DIR, 'final')}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
finetune_multilabel_bert() |