import os import torch import numpy as np import json import evaluate from datasets import load_dataset from transformers import BertPreTrainedModel, BertModel, BertTokenizerFast from transformers import TrainingArguments, Trainer from torch import nn from sklearn.metrics import f1_score, accuracy_score, jaccard_score # --- 配置参数 --- DATA_DIR = "./processed_data_task2_fixed" # MODEL_NAME = "bert-base-chinese" TOKENIZER_NAME = "valuesimplex-ai-lab/FinBERT2-base" MODEL_NAME = "/home/hsichen/part_time/BERT_finetune/outputs/finbert2_dapt_model" # MODEL_NAME = "valuesimplex-ai-lab/FinBERT2-base" # 标签总数:Data, Action, Gain, Regu, Vague NUM_LABELS = 5 OUTPUT_DIR = "/home/hsichen/part_time/BERT_finetune/outputs/finbert2_multilabel_model_finetuned_from_dapt" EPOCHS = 5 BATCH_SIZE = 16 LEARNING_RATE = 2e-5 SEED = 42 # ---------------------------------------------------- # A. 定义支持多标签分类的 BERT 模型 # ---------------------------------------------------- class BertForMultiLabelClassification(BertPreTrainedModel): """ 基于 BERT 的多标签分类模型,使用 BCEWithLogitsLoss """ def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.bert = BertModel(config) classifier_dropout = config.hidden_dropout_prob self.dropout = nn.Dropout(classifier_dropout) # 线性层输出维度 = 标签数量 (5) self.classifier = nn.Linear(config.hidden_size, self.num_labels) self.post_init() self.loss_fct = nn.BCEWithLogitsLoss() def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, labels=None): outputs = self.bert( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, ) # 取 [CLS] token 的隐藏状态 (即 pooler output) pooled_output = outputs.pooler_output pooled_output = self.dropout(pooled_output) # 经过分类器层,输出 logits (未经 Sigmoid 的分数) logits = self.classifier(pooled_output) loss = None if labels is not None: # 确保 labels 是 float 类型,因为损失函数需要 float loss = self.loss_fct(logits, labels.float()) return (loss, logits) if loss is not None else (logits,) from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score # 标签名称,用于报告输出 TAG_COLS = ['Data', 'Action', 'Gain', 'Regu', 'Vague'] PREDICTION_THRESHOLD = 0.5 def compute_metrics(p): """ 计算多标签分类的评估指标,包括全局指标和每个类别的指标。 要求:损失、F1、Precision、Accuracy、Recall。 """ # 损失(Loss)由 Trainer 自动处理并记录在 logs 中,这里主要关注评估指标 logits = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions labels = p.label_ids # Sigmoid -> 概率 probs = 1 / (1 + np.exp(-logits)) # 应用阈值 0.5 得到二元预测 preds = (probs > PREDICTION_THRESHOLD).astype(int) # ---------------------------------------- # 1. 全局汇总指标 (用于 metric_for_best_model) # ---------------------------------------- metrics = {} # Micro-F1 (通常用于选择最佳模型) metrics["f1_micro"] = f1_score(labels, preds, average='micro') # Macro-F1 (平衡各类别贡献) metrics["f1_macro"] = f1_score(labels, preds, average='macro') # 样本级 Jaccard (度量样本预测的标签集合与真实标签集合的重合度) metrics["jaccard_samples"] = jaccard_score(labels, preds, average='samples') # ---------------------------------------- # 2. 每个类别的指标 (Per-Class) # ---------------------------------------- # 注意:多标签的 Per-Class Metrics 就是针对每一列(每个标签)做一次二分类指标计算。 for i, tag in enumerate(TAG_COLS): y_true_class = labels[:, i] # 第 i 个标签的真实值 y_pred_class = preds[:, i] # 第 i 个标签的预测值 # 计算该类别的指标 # 类别级指标命名规范:{tag}_f1, {tag}_precision, {tag}_recall, {tag}_accuracy # F1-Score (二分类指标) metrics[f"{tag}_f1"] = f1_score(y_true_class, y_pred_class, average='binary', zero_division=0) # Precision metrics[f"{tag}_precision"] = precision_score(y_true_class, y_pred_class, average='binary', zero_division=0) # Recall metrics[f"{tag}_recall"] = recall_score(y_true_class, y_pred_class, average='binary', zero_division=0) # Accuracy (当前类别预测对的样本数 / 总样本数) # 注意:这里计算的是该标签本身的准确率,而非整个样本的准确率 metrics[f"{tag}_accuracy"] = accuracy_score(y_true_class, y_pred_class) return metrics # ---------------------------------------------------- # C. 主微调函数 # ---------------------------------------------------- def finetune_multilabel_bert(): # 1. 加载数据集 print("--- 1. 加载数据集 ---") data_files = { "train": os.path.join(DATA_DIR, "train.csv"), "validation": os.path.join(DATA_DIR, "validation.csv"), "test": os.path.join(DATA_DIR, "test.csv") } raw_datasets = load_dataset("csv", data_files=data_files) # 2. 加载分词器和自定义模型 print("--- 2. 加载分词器和自定义模型 ---") tokenizer = BertTokenizerFast.from_pretrained(TOKENIZER_NAME) # 使用自定义模型 BertForMultiLabelClassification model = BertForMultiLabelClassification.from_pretrained( MODEL_NAME, num_labels=NUM_LABELS, ignore_mismatched_sizes=True ) # 3. 数据集 Tokenization (分词) def tokenize_function(examples): # 假设文本在 'text' 列 tokenized = tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) # 将 CSV 中的字符串形式的标签列表 (e.g., '[1, 0, 1, 0, 0]') 转换为 float 列表 label_list = json.loads(examples["labels"].replace("'", "\"")) tokenized["labels"] = label_list return tokenized # 注意:多标签任务 map 时 batched=False 确保标签解析正确 tokenized_datasets = raw_datasets.map(tokenize_function, batched=False) # 移除原始的 'text' 列 tokenized_datasets = tokenized_datasets.remove_columns(["text"]) # 调整 PyTorch 张量格式 tokenized_datasets.set_format("torch", columns=['input_ids', 'attention_mask', 'labels']) train_dataset = tokenized_datasets["train"] eval_dataset = tokenized_datasets["validation"] test_dataset = tokenized_datasets["test"] # 4. 设置训练参数 print("--- 3. 设置训练参数和 Trainer ---") training_args = TrainingArguments( output_dir=OUTPUT_DIR, num_train_epochs=EPOCHS, per_device_train_batch_size=BATCH_SIZE, per_device_eval_batch_size=BATCH_SIZE, warmup_steps=200, weight_decay=0.01, logging_steps=50, eval_strategy="steps", eval_steps=50, save_strategy="steps", save_steps=500, load_best_model_at_end=True, metric_for_best_model="f1_micro", seed=SEED, learning_rate=3e-5, report_to="wandb" ) # 5. 初始化 Trainer trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, compute_metrics=compute_metrics, ) # 6. 开始训练 print("--- 4. 开始训练 ---") trainer.train() # 7. 评估测试集 print("--- 5. 评估测试集 ---") results = trainer.evaluate(test_dataset) print(f"测试集评估结果: {results}") # 8. 保存最终模型 trainer.save_model(os.path.join(OUTPUT_DIR, "final")) tokenizer.save_pretrained(os.path.join(OUTPUT_DIR, "final")) print(f"模型和分词器已保存至: {os.path.join(OUTPUT_DIR, 'final')}") if __name__ == "__main__": finetune_multilabel_bert()