DualModernBERT-ecommerce-finetuned / modeling_dualmodernbert.py
Terence709's picture
Upload 6 files
2e664e6 verified
# Placeholder for Hugging Face compatible DualModernBERT model code
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
import torch.nn.functional as F
from transformers import PreTrainedModel, AutoModel, PretrainedConfig
from transformers.modeling_outputs import SequenceClassifierOutput # 使用标准输出格式
# --- 定义 Config 类在最前面 ---
class DualModernBERTConfig(PretrainedConfig):
model_type = "dual-modernbert" # 指定模型类型
def __init__(
self,
base_model_name="answerdotai/ModernBERT-base",
fusion_hidden_dim=512,
ordinal_dropout=0.5,
encoder_dropout=0.35,
fusion_dropout_rates={
'cross_attn': 0.3,
'transform_dropout': 0.4,
'fusion_dropout1': 0.5,
'fusion_dropout2': 0.45,
'gate_dropout': 0.3,
'output_dropout': 0.45
},
num_labels=5, # 原始评分等级
num_ordinal_labels=4, # 序数边界数量
freeze_base_encoder_layers=5,
problem_type="ordinal_regression", # 指定问题类型
# EnhancedOrdinalLoss 参数 (如果需要配置)
loss_beta=0.9999,
loss_base_boundary_weight=0.1,
loss_boundary_weights=[1.0, 1.2, 1.2, 1.0],
loss_smoothing=0.1,
**kwargs,
):
super().__init__(**kwargs)
self.base_model_name = base_model_name
self.fusion_hidden_dim = fusion_hidden_dim
self.ordinal_dropout = ordinal_dropout
self.encoder_dropout = encoder_dropout
self.fusion_dropout_rates = fusion_dropout_rates
self.num_labels = num_labels
self.num_ordinal_labels = num_ordinal_labels
self.freeze_base_encoder_layers = freeze_base_encoder_layers
self.problem_type = problem_type
# Loss config
self.loss_beta = loss_beta
self.loss_base_boundary_weight = loss_base_boundary_weight
self.loss_boundary_weights = loss_boundary_weights
self.loss_smoothing = loss_smoothing
# 继承 ModernBERT-base 的部分配置 (如果需要的话, 可以在加载时动态获取)
# 例如: self.hidden_size = 768 # ModernBERT-base hidden size
# --- Config 定义结束 ---
# 特征融合层 (基本从原代码迁移, 使用config中的dropout值)
class EnhancedFusion(nn.Module):
def __init__(self, config: DualModernBERTConfig):
super().__init__()
hidden_dim = config.fusion_hidden_dim
dropout_rates = config.fusion_dropout_rates
base_hidden_size = getattr(config, 'hidden_size', 768) # 从config获取或使用默认值
# 多头交叉注意力层
self.cross_attention = nn.ModuleDict({
'title2text': nn.MultiheadAttention(embed_dim=base_hidden_size, num_heads=12, dropout=dropout_rates['cross_attn']),
'text2title': nn.MultiheadAttention(embed_dim=base_hidden_size, num_heads=12, dropout=dropout_rates['cross_attn']),
'self_title': nn.MultiheadAttention(embed_dim=base_hidden_size, num_heads=12, dropout=dropout_rates['cross_attn']),
'self_text': nn.MultiheadAttention(embed_dim=base_hidden_size, num_heads=12, dropout=dropout_rates['cross_attn'])
})
# 多尺度特征提取
self.scale_projections = nn.ModuleDict({
'scale1': nn.Linear(base_hidden_size, hidden_dim),
'scale2': nn.Linear(base_hidden_size, hidden_dim // 2),
'scale3': nn.Linear(base_hidden_size, hidden_dim // 4)
})
# 特征转换网络
self.feature_transform = nn.ModuleDict({
'title': nn.Sequential(
nn.Linear(base_hidden_size, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU(),
nn.Dropout(dropout_rates['transform_dropout']),
nn.Linear(hidden_dim, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU(),
nn.Dropout(dropout_rates['transform_dropout'])
),
'text': nn.Sequential(
nn.Linear(base_hidden_size, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU(),
nn.Dropout(dropout_rates['transform_dropout']),
nn.Linear(hidden_dim, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU(),
nn.Dropout(dropout_rates['transform_dropout'])
)
})
# 深度特征融合网络
self.fusion_network = nn.Sequential(
nn.Linear(hidden_dim * 4, hidden_dim * 3), nn.LayerNorm(hidden_dim * 3), nn.GELU(),
nn.Dropout(dropout_rates['fusion_dropout1']),
nn.Linear(hidden_dim * 3, hidden_dim * 2), nn.LayerNorm(hidden_dim * 2), nn.GELU(),
nn.Dropout(dropout_rates['fusion_dropout2']),
nn.Linear(hidden_dim * 2, hidden_dim), nn.LayerNorm(hidden_dim)
)
# 跨层特征连接
self.cross_connections = nn.ModuleDict({
'title': nn.Linear(hidden_dim * 2, hidden_dim),
'text': nn.Linear(hidden_dim * 2, hidden_dim)
})
# 增强的残差连接
self.residual_proj = nn.Sequential(
nn.Linear(base_hidden_size * 2, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU()
)
# 动态特征门控
self.gate = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU(),
nn.Dropout(dropout_rates['gate_dropout']),
nn.Linear(hidden_dim, hidden_dim), nn.Sigmoid()
)
# 输出层
self.output_layer = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim), nn.LayerNorm(hidden_dim), nn.GELU(),
nn.Dropout(dropout_rates['output_dropout']),
nn.Linear(hidden_dim, hidden_dim), nn.LayerNorm(hidden_dim)
)
def forward(self, title, text):
# --- FUSION LOGIC (copied and slightly adapted from original) ---
title_q = title.unsqueeze(0)
text_q = text.unsqueeze(0)
# Multi-head attention interaction
title2text, _ = self.cross_attention['title2text'](text_q, title_q, title_q)
text2title, _ = self.cross_attention['text2title'](title_q, text_q, text_q)
title_self, _ = self.cross_attention['self_title'](title_q, title_q, title_q)
text_self, _ = self.cross_attention['self_text'](text_q, text_q, text_q)
# Feature transformation
title_feats = self.feature_transform['title'](title2text.squeeze(0))
text_feats = self.feature_transform['text'](text2title.squeeze(0))
title_self_feats = self.feature_transform['title'](title_self.squeeze(0))
text_self_feats = self.feature_transform['text'](text_self.squeeze(0))
# Cross-layer feature connection
title_cross = self.cross_connections['title'](torch.cat([title_feats, title_self_feats], dim=-1))
text_cross = self.cross_connections['text'](torch.cat([text_feats, text_self_feats], dim=-1))
# Multi-scale feature extraction
title_scales = {scale: proj(title) for scale, proj in self.scale_projections.items()}
text_scales = {scale: proj(text) for scale, proj in self.scale_projections.items()}
# Feature fusion
fused_features = torch.cat([
title_cross, text_cross,
title_scales['scale1'], text_scales['scale1']
], dim=-1)
fused = self.fusion_network(fused_features)
# Residual connection
residual = self.residual_proj(torch.cat([title, text], dim=-1))
# Dynamic feature gating
gate_input = torch.cat([fused, residual], dim=-1)
gate = self.gate(gate_input)
gated_fusion = gate * fused + (1 - gate) * residual
# Final output
output = self.output_layer(gated_fusion)
return output
# 序数分类层 (基本从原代码迁移, 使用config中的dropout和输出维度)
class OrdinalLayer(nn.Module):
def __init__(self, config: DualModernBERTConfig):
super().__init__()
input_dim = config.fusion_hidden_dim # 输入来自融合层
self.ordinal = nn.Sequential(
nn.Dropout(config.ordinal_dropout),
nn.Linear(input_dim, config.num_ordinal_labels) # 输出维度由config决定
)
def forward(self, x):
return self.ordinal(x)
# Enhanced Ordinal Loss (从原代码迁移, 移除状态更新逻辑以简化集成)
# 注意: 为了与Hugging Face Trainer更好地集成,移除了依赖于训练循环状态的EMA和动态权重更新。
# 只保留了核心的带边界惩罚和标签平滑的BCE损失。
# 如果需要完整的状态更新逻辑,需要使用自定义Trainer或回调。
class SimpleEnhancedOrdinalLoss(nn.Module):
def __init__(self, config: DualModernBERTConfig):
super().__init__()
self.num_ordinal_labels = config.num_ordinal_labels
self.smoothing = config.loss_smoothing
self.base_boundary_weight = config.loss_base_boundary_weight
# 将边界权重列表转换为tensor
self.boundary_weights = torch.tensor(config.loss_boundary_weights, dtype=torch.float)
# 确保权重张量在正确的设备上
self.register_buffer('boundary_weights_tensor', self.boundary_weights)
def get_boundary_weight(self, pos):
"""获取边界权重"""
# 确保访问张量
if pos < len(self.boundary_weights_tensor):
return self.base_boundary_weight * self.boundary_weights_tensor[pos]
else:
# 如果索引超出范围,返回基础权重或0
return self.base_boundary_weight # 或者可以返回0
def forward(self, predictions, targets):
# 标签平滑
smoothed_targets = targets * (1 - self.smoothing) + 0.5 * self.smoothing
# 基础BCE损失
bce_loss = F.binary_cross_entropy_with_logits(predictions, smoothed_targets, reduction='none')
# 计算边界惩罚
probs = torch.sigmoid(predictions)
boundary_penalty = torch.zeros_like(bce_loss)
# 确保 boundary_weights_tensor 在正确的设备上
current_device = predictions.device
self.boundary_weights_tensor = self.boundary_weights_tensor.to(current_device)
for i in range(predictions.size(1) - 1):
diff = torch.abs(probs[:, i] - probs[:, i + 1])
penalty = torch.exp(-diff) * 0.5
adaptive_weight = self.get_boundary_weight(i)
boundary_penalty[:, i] = adaptive_weight * penalty
# 这里省略了原版的类别权重 (self.weight_tensor), 因为它依赖于训练过程中的状态更新
final_loss = bce_loss + boundary_penalty
return final_loss.mean()
# 完整模型 (修改为独立编码器和分离输入)
class DualModernBERTModel(PreTrainedModel):
config_class = DualModernBERTConfig # <-- 重新添加
def __init__(self, config: DualModernBERTConfig):
super().__init__(config)
self.config = config
# -- 修改: 创建两个独立的编码器 --
print(f"Initializing title encoder from: {config.base_model_name}")
self.title_encoder = AutoModel.from_pretrained(
config.base_model_name,
add_pooling_layer=False,
trust_remote_code=True,
# config=config, # 传递config可能导致问题,让AutoModel自己处理
)
print(f"Initializing text encoder from: {config.base_model_name}")
self.text_encoder = AutoModel.from_pretrained(
config.base_model_name,
add_pooling_layer=False,
trust_remote_code=True,
# config=config,
)
# -- 结束修改 --
# 获取基础模型的 hidden_size (从任一编码器获取)
if not hasattr(config, 'hidden_size'):
self.config.hidden_size = self.title_encoder.config.hidden_size
self.title_dropout = nn.Dropout(config.encoder_dropout)
self.text_dropout = nn.Dropout(config.encoder_dropout)
self.fusion = EnhancedFusion(config)
self.ordinal_layer = OrdinalLayer(config)
# -- 修改: 实例化自定义损失函数 --
# 使用简化的、无状态的版本
self.criterion = SimpleEnhancedOrdinalLoss(config)
# -- 结束修改 --
# 冻结底层 (在加载权重后执行更安全)
self._freeze_encoder_layers(config.freeze_base_encoder_layers)
self.post_init()
def _freeze_encoder_layers(self, num_layers_to_freeze):
"""冻结两个编码器的底层"""
if num_layers_to_freeze > 0:
print(f"Freezing first {num_layers_to_freeze} layers of both encoders.")
for encoder in [self.title_encoder, self.text_encoder]:
if hasattr(encoder, 'layers'):
num_actual_layers = len(encoder.layers)
layers_to_freeze_count = min(num_layers_to_freeze, num_actual_layers)
for i in range(layers_to_freeze_count):
for param in encoder.layers[i].parameters():
param.requires_grad = False
elif hasattr(encoder, 'encoder') and hasattr(encoder.encoder, 'layer'): # 兼容不同BERT变体结构
num_actual_layers = len(encoder.encoder.layer)
layers_to_freeze_count = min(num_layers_to_freeze, num_actual_layers)
for i in range(layers_to_freeze_count):
for param in encoder.encoder.layer[i].parameters():
param.requires_grad = False
else:
print(f"Warning: Could not find layers attribute typical for freezing in {encoder.__class__.__name__}. Freezing skipped for this encoder.")
# -- 修改: 更新 forward 签名以接受分离的输入 --
def forward(
self,
title_input_ids=None,
title_attention_mask=None,
title_token_type_ids=None, # ModernBERT可能不需要
text_input_ids=None,
text_attention_mask=None,
text_token_type_ids=None, # ModernBERT可能不需要
position_ids=None, # 通常不需要显式传递
head_mask=None, # 通常不需要显式传递
inputs_embeds=None, # 通常不需要显式传递 (除非自定义嵌入)
labels=None, # 序数标签 (batch_size, num_ordinal_labels)
output_attentions=None,
output_hidden_states=None,
return_dict=None,
):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# 检查必要输入
if title_input_ids is None or text_input_ids is None:
raise ValueError("Both title_input_ids and text_input_ids must be provided.")
if title_attention_mask is None:
title_attention_mask = torch.ones_like(title_input_ids)
if text_attention_mask is None:
text_attention_mask = torch.ones_like(text_input_ids)
# --- Encoding (使用独立编码器) ---
title_outputs = self.title_encoder(
input_ids=title_input_ids,
attention_mask=title_attention_mask,
token_type_ids=title_token_type_ids,
# position_ids=position_ids, # 分开处理可能不需要共享position_ids
head_mask=head_mask,
# inputs_embeds=inputs_embeds, # 分开处理
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# 提取标题特征 (通常是 [CLS] token)
title_features = title_outputs[0][:, 0] # 取 last_hidden_state 的第一个 token
text_outputs = self.text_encoder(
input_ids=text_input_ids,
attention_mask=text_attention_mask,
token_type_ids=text_token_type_ids,
# position_ids=position_ids,
head_mask=head_mask,
# inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# 提取文本特征 (通常是 [CLS] token)
text_features = text_outputs[0][:, 0] # 取 last_hidden_state 的第一个 token
# -- 结束修改 --
title_features_dropped = self.title_dropout(title_features)
text_features_dropped = self.text_dropout(text_features)
fused_features = self.fusion(title_features_dropped, text_features_dropped)
logits = self.ordinal_layer(fused_features)
loss = None
if labels is not None:
# -- 修改: 使用实例化的自定义损失 --
# 确保 labels 是 float 类型
loss = self.criterion(logits, labels.float())
# -- 结束修改 --
# 处理 return_dict
if not return_dict:
# 为了简化,这里只返回核心输出。如果需要编码器的隐藏状态等,需要从title_outputs和text_outputs合并
output = (logits,)
return ((loss,) + output) if loss is not None else output
# 合并来自两个编码器的 hidden_states 和 attentions (如果需要)
merged_hidden_states = None
merged_attentions = None
# (可选) 在这里添加合并逻辑,例如拼接或选择性返回
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=merged_hidden_states, # 或 title_outputs.hidden_states, text_outputs.hidden_states
attentions=merged_attentions, # 或 title_outputs.attentions, text_outputs.attentions
)
# 确保将Config和Model注册到AutoClass, 这样 AutoModelForSequenceClassification.from_pretrained 可以找到它们
from transformers import AutoConfig, AutoModelForSequenceClassification
AutoConfig.register("dual-modernbert", DualModernBERTConfig)
# 注意: 这里注册为 SequenceClassification, 因为最终任务是分类
# 如果需要不同的 AutoClass (例如 AutoModel), 需要相应调整或注册
AutoModelForSequenceClassification.register(DualModernBERTConfig, DualModernBERTModel)