bert_remark / method_1 /final_ensemble_stacking.py
BaltimoreCA68's picture
Add files using upload-large-folder tool
027ce51 verified
# ==============================================================================
# 最终脚本: final_ensemble_stacking.py (Meta-Model 升级)
#
# 策略: 结合 Model A 和 Model B 的预测概率(元特征),
# 使用 AdaBoostClassifier (来自 DM-01 PPT) 作为最终分类器。
#
# 目标: 提高 Fake(1) 的 Precision (精确率)。
# ==============================================================================
import pandas as pd
import numpy as np
import torch
import joblib
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier, GradientBoostingClassifier # 引入 AdaBoost
from sklearn.tree import DecisionTreeClassifier # AdaBoost 通常使用决策树作为基分类器
from sklearn.metrics import classification_report, f1_score
from datasets import Dataset
from transformers import (
DebertaV2Tokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer
)
import os
import time
import warnings
# --- 1. 定义路径和常量 (保持不变) ---
warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
TRAIN_FILE_PATH = "/tmp/home/wzh/file/train_data.csv"
VALID_FILE_PATH = "/tmp/home/wzh/file/val_data.csv"
# 依赖路径
MODEL_A_DIR = "./final_model_tfidf"
VECTORIZER_PATH = os.path.join(MODEL_A_DIR, "vectorizer.joblib")
MODEL_A_PATH = os.path.join(MODEL_A_DIR, "model_A.joblib")
DEBERTA_MODEL_PATH = "./final_model_deberta_weighted"
# --- 2. 加载数据 (保持不变) ---
print("--- 方案三:集成学习 (Stacking) ---")
print("加载数据...")
train_df = pd.read_csv(TRAIN_FILE_PATH)
eval_df = pd.read_csv(VALID_FILE_PATH)
label_map = {"real": 0, "fake": 1}
train_df['label'] = train_df['label'].map(label_map)
eval_df['label'] = eval_df['label'].map(label_map)
X_train_text = train_df['text']
y_train = train_df['label']
X_eval_text = eval_df['text']
y_eval = eval_df['label'] # 这是我们最终的真实标签
# --- 3/4: 加载 Model A 和 Model B 并获取预测概率 (代码与之前相同) ---
# ... (此部分代码完全相同,省略以节省空间) ...
# 注意:你需要手动确保这部分在你的实际文件中是完整的,因为它依赖了前面 Model A/B 的加载和预测逻辑。
# 以下为 Model A 的加载和预测代码片段:
print(f"\n--- 1/3:加载 Model A ({MODEL_A_DIR}) 并获取预测概率 ---")
start_time = time.time()
try:
vectorizer = joblib.load(VECTORIZER_PATH)
model_A = joblib.load(MODEL_A_PATH)
except FileNotFoundError:
print(f"!!!错误:找不到 Model A 的文件 {MODEL_A_DIR} !!!")
exit()
X_train_tfidf = vectorizer.transform(X_train_text)
X_eval_tfidf = vectorizer.transform(X_eval_text)
probs_A_train = model_A.predict_proba(X_train_tfidf)[:, 1]
probs_A_eval = model_A.predict_proba(X_eval_tfidf)[:, 1]
print(f"Model A 运行完毕. 耗时: {time.time() - start_time:.2f} 秒")
# 以下为 Model B 的加载和预测代码片段:
print(f"\n--- 2/3:加载 Model B ({DEBERTA_MODEL_PATH}) 并获取预测概率 ---")
start_time = time.time()
try:
model_B = AutoModelForSequenceClassification.from_pretrained(DEBERTA_MODEL_PATH)
tokenizer_B = DebertaV2Tokenizer.from_pretrained(DEBERTA_MODEL_PATH)
except Exception as e:
print(f"!!!错误:无法加载 Model B。详情: {e} !!!")
exit()
trainer_B = Trainer(
model=model_B,
args=TrainingArguments(output_dir="./tmp_ensemble_eval", per_device_eval_batch_size=32, eval_strategy="no", fp16=True)
)
def tokenize_function(examples):
return tokenizer_B(examples["text"], padding="max_length", truncation=True, max_length=512)
train_dataset_hf = Dataset.from_pandas(train_df)
eval_dataset_hf = Dataset.from_pandas(eval_df)
tokenized_train_dataset = train_dataset_hf.map(tokenize_function, batched=True, num_proc=4).remove_columns(["id", "text", "label"])
tokenized_eval_dataset = eval_dataset_hf.map(tokenize_function, batched=True, num_proc=4).remove_columns(["id", "text", "label"])
predictions_B_train = trainer_B.predict(tokenized_train_dataset)
logits_B_train = predictions_B_train.predictions
softmax = torch.nn.Softmax(dim=1)
probs_B_train = softmax(torch.from_numpy(logits_B_train))[:, 1].numpy()
predictions_B_eval = trainer_B.predict(tokenized_eval_dataset)
logits_B_eval = predictions_B_eval.predictions
probs_B_eval = softmax(torch.from_numpy(logits_B_eval))[:, 1].numpy()
print(f"Model B 运行完毕. 耗时: {time.time() - start_time:.2f} 秒")
# --- 5: 训练元模型 (Meta-Model) (!!! 核心修改在这里 !!!) ---
print("\n--- 3/3:训练元模型 (AdaBoost Classifier) ---")
# 创建元数据集
X_train_meta = np.column_stack((probs_A_train, probs_B_train))
X_eval_meta = np.column_stack((probs_A_eval, probs_B_eval))
print(f"元训练集维度: {X_train_meta.shape}")
# (核心修改) AdaBoost 作为元模型
# 导入 AdaBoost 和 DecisionTree
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
# 基分类器 (弱学习器)
base_learner = DecisionTreeClassifier(max_depth=1, random_state=42)
# (!!! 关键修正: 将 base_estimator 改为 estimator !!!)
meta_model = AdaBoostClassifier(
estimator=base_learner, # <--- 修正后的参数名
n_estimators=50,
learning_rate=1.0,
random_state=42
)
# 注意:AdaBoost 不支持 class_weight='balanced',它通过迭代调整样本权重实现平衡。
# 所以我们直接在不平衡数据上训练。
meta_model.fit(X_train_meta, y_train)
print("元模型 (AdaBoost) 训练完毕!")
# --- 6: 最终评估 ---
print("\n--- 最终评估:在【验证集】上评估“集成模型”的性能 ---")
preds_meta = meta_model.predict(X_eval_meta)
# 输出详细的分类报告
report = classification_report(
y_eval,
preds_meta,
target_names=['real (0)', 'fake (1)'],
digits=4,
zero_division=0
)
print(report)
f1_ensemble = f1_score(y_eval, preds_meta, pos_label=1)
print(f"--- 最终集成 F1 分数 (Fake): {f1_ensemble:.4f} ---")
print("--- 脚本运行结束 ---")