xlm-r-hsd / models.py
AnnyNguyen's picture
Upload models.py with huggingface_hub
02d3720 verified
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import (
AutoModel, AutoConfig, AutoTokenizer,
T5ForConditionalGeneration, T5Config,
AutoModelForSequenceClassification,
PreTrainedModel, PretrainedConfig
)
from transformers.modeling_utils import (
load_state_dict,
WEIGHTS_NAME,
SAFE_WEIGHTS_NAME,
SAFE_WEIGHTS_INDEX_NAME,
WEIGHTS_INDEX_NAME
)
from transformers.utils import (
is_safetensors_available,
is_torch_available,
logging,
EntryNotFoundError,
PushToHubMixin
)
import os
import json
import numpy as np
logger = logging.get_logger(__name__)
class BaseHateSpeechModel(nn.Module):
"""Base class cho tất cả các mô hình hate speech detection"""
def __init__(self, model_name: str, num_labels: int = 3):
super().__init__()
self.num_labels = num_labels
self.model_name = model_name
def forward(self, input_ids, attention_mask, labels=None):
raise NotImplementedError
def load_state_dict(self, state_dict, strict=True):
"""
Override load_state_dict để bypass transformers' key renaming.
Load trực tiếp state_dict vào model mà không qua key mapping.
"""
# Load trực tiếp, không qua transformers' key renaming
missing_keys, unexpected_keys = super().load_state_dict(state_dict, strict=False)
if missing_keys and strict:
logger.warning(f"Missing keys when loading state_dict: {missing_keys}")
if unexpected_keys:
logger.warning(f"Unexpected keys when loading state_dict: {unexpected_keys}")
return missing_keys, unexpected_keys
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
"""
Load model từ pretrained checkpoint.
Transformers sẽ tự động load state_dict sau khi khởi tạo model.
"""
# Extract config từ kwargs (transformers sẽ pass config vào đây)
config = kwargs.pop("config", None)
# Load config nếu chưa có
if config is None:
try:
config = AutoConfig.from_pretrained(pretrained_model_name_or_path)
except Exception:
config = {}
# Get num_labels từ config hoặc kwargs
num_labels = kwargs.pop("num_labels", None)
if num_labels is None:
if hasattr(config, "num_labels"):
num_labels = config.num_labels
elif isinstance(config, dict) and "num_labels" in config:
num_labels = config["num_labels"]
else:
num_labels = 3
# Lấy base model name từ config
base_model_name = None
if hasattr(config, "_name_or_path"):
base_model_name = config._name_or_path
elif isinstance(config, dict) and "_name_or_path" in config:
base_model_name = config["_name_or_path"]
# Khởi tạo model với base model name
if base_model_name:
model = cls(model_name=base_model_name, num_labels=num_labels, **kwargs)
else:
# Fallback: dùng default model_name từ class
model = cls(num_labels=num_labels, **kwargs)
return model
class PhoBERTV2Model(BaseHateSpeechModel):
"""PhoBERT-V2 cho hate speech detection"""
def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class BartPhoModel(BaseHateSpeechModel):
"""BART Pho cho hate speech detection"""
def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.d_model, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Sử dụng hidden state của token cuối cùng
last_hidden_states = outputs.last_hidden_state
pooled_output = last_hidden_states.mean(dim=1) # Mean pooling
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class ViSoBERTModel(BaseHateSpeechModel):
"""ViSoBERT cho hate speech detection"""
def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Kiểm tra xem có pooler_output không, nếu không thì dùng last_hidden_state
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
# Fallback: sử dụng mean pooling của last_hidden_state
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class PhoBERTV1Model(BaseHateSpeechModel):
"""PhoBERT-V1 cho hate speech detection"""
def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Một số encoder không có pooler_output
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class MBERTModel(BaseHateSpeechModel):
"""mBERT (bert-base-multilingual-cased) cho hate speech detection"""
def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class SPhoBERTModel(BaseHateSpeechModel):
"""SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection"""
def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class ViHateT5Model(BaseHateSpeechModel):
"""ViHateT5 cho hate speech detection"""
def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = T5Config.from_pretrained(model_name)
self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.d_model, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Sử dụng hidden state của token cuối cùng
last_hidden_states = outputs.last_hidden_state
pooled_output = last_hidden_states.mean(dim=1) # Mean pooling
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class XLMRModel(BaseHateSpeechModel):
"""XLM-R Large cho hate speech detection"""
def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class RoBERTaGRUModel(BaseHateSpeechModel):
"""RoBERTa + GRU Hybrid model"""
def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.gru = nn.GRU(
input_size=self.config.hidden_size,
hidden_size=hidden_size,
num_layers=2,
batch_first=True,
dropout=0.1,
bidirectional=True
)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
hidden_states = outputs.last_hidden_state # [batch_size, seq_len, hidden_size]
# GRU processing
gru_output, _ = self.gru(hidden_states) # [batch_size, seq_len, hidden_size*2]
# Global average pooling
pooled_output = gru_output.mean(dim=1) # [batch_size, hidden_size*2]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class TextCNNModel(BaseHateSpeechModel):
"""TextCNN cho hate speech detection"""
def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3,
num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5):
super().__init__("textcnn", num_labels)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.convs = nn.ModuleList([
nn.Conv2d(1, num_filters, (filter_size, embedding_dim))
for filter_size in filter_sizes
])
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels)
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
"""Override để detect vocab_size từ state_dict hoặc checkpoint file"""
# Get vocab_size từ kwargs hoặc config
vocab_size = kwargs.pop("vocab_size", None)
config = kwargs.pop("config", None)
# Nếu chưa có vocab_size, thử detect từ checkpoint file
if vocab_size is None:
import os
state_dict = None
# Try to load state_dict từ local path để detect vocab_size
if os.path.isdir(pretrained_model_name_or_path):
if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)):
try:
from safetensors.torch import load_file
state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME))
except Exception:
pass
elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)):
try:
state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu")
except Exception:
pass
# Detect vocab_size từ embedding.weight
if state_dict is not None and "embedding.weight" in state_dict:
vocab_size = state_dict["embedding.weight"].shape[0]
else:
vocab_size = 30000 # Default
# Get num_labels
num_labels = kwargs.pop("num_labels", None)
if num_labels is None:
if config and hasattr(config, "num_labels"):
num_labels = config.num_labels
elif config and isinstance(config, dict) and "num_labels" in config:
num_labels = config["num_labels"]
else:
num_labels = 3
# Khởi tạo model
model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs)
return model
def forward(self, input_ids, attention_mask, labels=None):
# Embedding
embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim]
# Add channel dimension for Conv2d
embedded = embedded.unsqueeze(1) # [batch_size, 1, seq_len, embedding_dim]
# Convolutional layers
conv_outputs = []
for conv in self.convs:
conv_out = F.relu(conv(embedded)) # [batch_size, num_filters, seq_len', 1]
conv_out = conv_out.squeeze(3) # [batch_size, num_filters, seq_len']
pooled = F.max_pool1d(conv_out, conv_out.size(2)) # [batch_size, num_filters, 1]
pooled = pooled.squeeze(2) # [batch_size, num_filters]
conv_outputs.append(pooled)
# Concatenate all conv outputs
concatenated = torch.cat(conv_outputs, dim=1) # [batch_size, num_filters * len(filter_sizes)]
# Classification
concatenated = self.dropout(concatenated)
logits = self.classifier(concatenated)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class BiLSTMModel(BaseHateSpeechModel):
"""BiLSTM cho hate speech detection"""
def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5):
super().__init__("bilstm", num_labels)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
"""Override để detect vocab_size từ state_dict hoặc checkpoint file"""
# Get vocab_size từ kwargs hoặc config
vocab_size = kwargs.pop("vocab_size", None)
config = kwargs.pop("config", None)
# Nếu chưa có vocab_size, thử detect từ checkpoint file
if vocab_size is None:
import os
state_dict = None
# Try to load state_dict từ local path để detect vocab_size
if os.path.isdir(pretrained_model_name_or_path):
if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)):
try:
from safetensors.torch import load_file
state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME))
except Exception:
pass
elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)):
try:
state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu")
except Exception:
pass
# Detect vocab_size từ embedding.weight
if state_dict is not None and "embedding.weight" in state_dict:
vocab_size = state_dict["embedding.weight"].shape[0]
else:
vocab_size = 30000 # Default
# Get num_labels
num_labels = kwargs.pop("num_labels", None)
if num_labels is None:
if config and hasattr(config, "num_labels"):
num_labels = config.num_labels
elif config and isinstance(config, dict) and "num_labels" in config:
num_labels = config["num_labels"]
else:
num_labels = 3
# Khởi tạo model
model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs)
return model
def forward(self, input_ids, attention_mask, labels=None):
# Embedding
embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim]
# BiLSTM
lstm_output, (hidden, cell) = self.lstm(embedded) # [batch_size, seq_len, hidden_size*2]
# Global average pooling (có thể thay bằng max pooling hoặc last hidden state)
# Option 1: Global average pooling
pooled_output = lstm_output.mean(dim=1) # [batch_size, hidden_size*2]
# Option 2: Last hidden state (uncomment if preferred)
# pooled_output = lstm_output[:, -1, :] # [batch_size, hidden_size*2]
# Option 3: Max pooling (uncomment if preferred)
# pooled_output = torch.max(lstm_output, dim=1)[0] # [batch_size, hidden_size*2]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class EnsembleModel(BaseHateSpeechModel):
"""Ensemble model kết hợp các mô hình deep learning"""
def __init__(self, models: list, num_labels: int = 3, weights: list = None):
super().__init__("ensemble", num_labels)
self.models = nn.ModuleList(models)
self.num_models = len(models)
self.weights = weights if weights else [1.0] * self.num_models
self.weights = torch.tensor(self.weights, dtype=torch.float32)
self.weights = self.weights / self.weights.sum() # Normalize weights
def forward(self, input_ids, attention_mask, labels=None):
all_logits = []
total_loss = 0
for i, model in enumerate(self.models):
model_output = model(input_ids, attention_mask, labels)
all_logits.append(model_output["logits"])
if model_output["loss"] is not None:
total_loss += self.weights[i] * model_output["loss"]
# Weighted average of logits
ensemble_logits = torch.zeros_like(all_logits[0])
for i, logits in enumerate(all_logits):
ensemble_logits += self.weights[i] * logits
return {
"loss": total_loss if total_loss > 0 else None,
"logits": ensemble_logits
}
def get_model(model_name: str, num_labels: int = 3, **kwargs):
"""
Factory function để tạo model dựa trên tên
Args:
model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5",
"xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble")
num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE)
**kwargs: Các tham số bổ sung cho model
Returns:
Model instance
"""
model_mapping = {
"phobert-v1": PhoBERTV1Model,
"phobert-v2": PhoBERTV2Model,
"bartpho": BartPhoModel,
"visobert": ViSoBERTModel,
"vihate-t5": ViHateT5Model,
"xlm-r": XLMRModel,
"mbert": MBERTModel,
"sphobert": SPhoBERTModel,
"roberta-gru": RoBERTaGRUModel,
"textcnn": TextCNNModel,
"bilstm": BiLSTMModel,
"ensemble": EnsembleModel
}
if model_name not in model_mapping:
raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}")
model_class = model_mapping[model_name]
return model_class(num_labels=num_labels, **kwargs)