|
|
import torch
|
|
|
import torch.nn as nn
|
|
|
import torch.nn.functional as F
|
|
|
from transformers import (
|
|
|
AutoModel, AutoConfig, AutoTokenizer,
|
|
|
T5ForConditionalGeneration, T5Config,
|
|
|
AutoModelForSequenceClassification,
|
|
|
PreTrainedModel, PretrainedConfig
|
|
|
)
|
|
|
from transformers.modeling_utils import (
|
|
|
load_state_dict,
|
|
|
WEIGHTS_NAME,
|
|
|
SAFE_WEIGHTS_NAME,
|
|
|
SAFE_WEIGHTS_INDEX_NAME,
|
|
|
WEIGHTS_INDEX_NAME
|
|
|
)
|
|
|
from transformers.utils import (
|
|
|
is_safetensors_available,
|
|
|
is_torch_available,
|
|
|
logging,
|
|
|
EntryNotFoundError,
|
|
|
PushToHubMixin
|
|
|
)
|
|
|
import os
|
|
|
import json
|
|
|
import numpy as np
|
|
|
|
|
|
logger = logging.get_logger(__name__)
|
|
|
|
|
|
class BaseHateSpeechModel(nn.Module):
|
|
|
"""Base class cho tất cả các mô hình hate speech detection"""
|
|
|
def __init__(self, model_name: str, num_labels: int = 3):
|
|
|
super().__init__()
|
|
|
self.num_labels = num_labels
|
|
|
self.model_name = model_name
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
@classmethod
|
|
|
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
|
|
|
"""
|
|
|
Load model từ pretrained checkpoint.
|
|
|
Transformers sẽ tự động load state_dict sau khi khởi tạo model.
|
|
|
"""
|
|
|
|
|
|
config = kwargs.pop("config", None)
|
|
|
|
|
|
|
|
|
if config is None:
|
|
|
try:
|
|
|
config = AutoConfig.from_pretrained(pretrained_model_name_or_path)
|
|
|
except Exception:
|
|
|
config = {}
|
|
|
|
|
|
|
|
|
num_labels = kwargs.pop("num_labels", None)
|
|
|
if num_labels is None:
|
|
|
if hasattr(config, "num_labels"):
|
|
|
num_labels = config.num_labels
|
|
|
elif isinstance(config, dict) and "num_labels" in config:
|
|
|
num_labels = config["num_labels"]
|
|
|
else:
|
|
|
num_labels = 3
|
|
|
|
|
|
|
|
|
base_model_name = None
|
|
|
if hasattr(config, "_name_or_path"):
|
|
|
base_model_name = config._name_or_path
|
|
|
elif isinstance(config, dict) and "_name_or_path" in config:
|
|
|
base_model_name = config["_name_or_path"]
|
|
|
|
|
|
|
|
|
if base_model_name:
|
|
|
model = cls(model_name=base_model_name, num_labels=num_labels, **kwargs)
|
|
|
else:
|
|
|
|
|
|
model = cls(num_labels=num_labels, **kwargs)
|
|
|
|
|
|
return model
|
|
|
|
|
|
class PhoBERTV2Model(BaseHateSpeechModel):
|
|
|
"""PhoBERT-V2 cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
pooled_output = outputs.pooler_output
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class BartPhoModel(BaseHateSpeechModel):
|
|
|
"""BART Pho cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.d_model, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
|
last_hidden_states = outputs.last_hidden_state
|
|
|
pooled_output = last_hidden_states.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class ViSoBERTModel(BaseHateSpeechModel):
|
|
|
"""ViSoBERT cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
|
|
|
|
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
|
|
pooled_output = outputs.pooler_output
|
|
|
else:
|
|
|
|
|
|
pooled_output = outputs.last_hidden_state.mean(dim=1)
|
|
|
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class PhoBERTV1Model(BaseHateSpeechModel):
|
|
|
"""PhoBERT-V1 cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
|
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
|
|
pooled_output = outputs.pooler_output
|
|
|
else:
|
|
|
pooled_output = outputs.last_hidden_state.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class MBERTModel(BaseHateSpeechModel):
|
|
|
"""mBERT (bert-base-multilingual-cased) cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
|
|
pooled_output = outputs.pooler_output
|
|
|
else:
|
|
|
pooled_output = outputs.last_hidden_state.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class SPhoBERTModel(BaseHateSpeechModel):
|
|
|
"""SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
|
|
pooled_output = outputs.pooler_output
|
|
|
else:
|
|
|
pooled_output = outputs.last_hidden_state.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class ViHateT5Model(BaseHateSpeechModel):
|
|
|
"""ViHateT5 cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = T5Config.from_pretrained(model_name)
|
|
|
self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.d_model, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
|
last_hidden_states = outputs.last_hidden_state
|
|
|
pooled_output = last_hidden_states.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class XLMRModel(BaseHateSpeechModel):
|
|
|
"""XLM-R Large cho hate speech detection"""
|
|
|
def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
pooled_output = outputs.pooler_output
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class RoBERTaGRUModel(BaseHateSpeechModel):
|
|
|
"""RoBERTa + GRU Hybrid model"""
|
|
|
def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256):
|
|
|
super().__init__(model_name, num_labels)
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.gru = nn.GRU(
|
|
|
input_size=self.config.hidden_size,
|
|
|
hidden_size=hidden_size,
|
|
|
num_layers=2,
|
|
|
batch_first=True,
|
|
|
dropout=0.1,
|
|
|
bidirectional=True
|
|
|
)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
hidden_states = outputs.last_hidden_state
|
|
|
|
|
|
|
|
|
gru_output, _ = self.gru(hidden_states)
|
|
|
|
|
|
|
|
|
pooled_output = gru_output.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class TextCNNModel(BaseHateSpeechModel):
|
|
|
"""TextCNN cho hate speech detection"""
|
|
|
def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3,
|
|
|
num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5):
|
|
|
super().__init__("textcnn", num_labels)
|
|
|
self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
|
|
self.convs = nn.ModuleList([
|
|
|
nn.Conv2d(1, num_filters, (filter_size, embedding_dim))
|
|
|
for filter_size in filter_sizes
|
|
|
])
|
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels)
|
|
|
|
|
|
@classmethod
|
|
|
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
|
|
|
"""Override để detect vocab_size từ state_dict hoặc checkpoint file"""
|
|
|
|
|
|
vocab_size = kwargs.pop("vocab_size", None)
|
|
|
config = kwargs.pop("config", None)
|
|
|
|
|
|
|
|
|
if vocab_size is None:
|
|
|
import os
|
|
|
state_dict = None
|
|
|
|
|
|
if os.path.isdir(pretrained_model_name_or_path):
|
|
|
if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)):
|
|
|
try:
|
|
|
from safetensors.torch import load_file
|
|
|
state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME))
|
|
|
except Exception:
|
|
|
pass
|
|
|
elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)):
|
|
|
try:
|
|
|
state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
if state_dict is not None and "embedding.weight" in state_dict:
|
|
|
vocab_size = state_dict["embedding.weight"].shape[0]
|
|
|
else:
|
|
|
vocab_size = 30000
|
|
|
|
|
|
|
|
|
num_labels = kwargs.pop("num_labels", None)
|
|
|
if num_labels is None:
|
|
|
if config and hasattr(config, "num_labels"):
|
|
|
num_labels = config.num_labels
|
|
|
elif config and isinstance(config, dict) and "num_labels" in config:
|
|
|
num_labels = config["num_labels"]
|
|
|
else:
|
|
|
num_labels = 3
|
|
|
|
|
|
|
|
|
model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs)
|
|
|
|
|
|
return model
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
|
|
|
embedded = self.embedding(input_ids)
|
|
|
|
|
|
|
|
|
embedded = embedded.unsqueeze(1)
|
|
|
|
|
|
|
|
|
conv_outputs = []
|
|
|
for conv in self.convs:
|
|
|
conv_out = F.relu(conv(embedded))
|
|
|
conv_out = conv_out.squeeze(3)
|
|
|
pooled = F.max_pool1d(conv_out, conv_out.size(2))
|
|
|
pooled = pooled.squeeze(2)
|
|
|
conv_outputs.append(pooled)
|
|
|
|
|
|
|
|
|
concatenated = torch.cat(conv_outputs, dim=1)
|
|
|
|
|
|
|
|
|
concatenated = self.dropout(concatenated)
|
|
|
logits = self.classifier(concatenated)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class BiLSTMModel(BaseHateSpeechModel):
|
|
|
"""BiLSTM cho hate speech detection"""
|
|
|
def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
|
|
|
num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5):
|
|
|
super().__init__("bilstm", num_labels)
|
|
|
self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
|
|
self.lstm = nn.LSTM(
|
|
|
input_size=embedding_dim,
|
|
|
hidden_size=hidden_size,
|
|
|
num_layers=num_layers,
|
|
|
batch_first=True,
|
|
|
dropout=dropout if num_layers > 1 else 0,
|
|
|
bidirectional=True
|
|
|
)
|
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
|
|
|
|
|
@classmethod
|
|
|
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
|
|
|
"""Override để detect vocab_size từ state_dict hoặc checkpoint file"""
|
|
|
|
|
|
vocab_size = kwargs.pop("vocab_size", None)
|
|
|
config = kwargs.pop("config", None)
|
|
|
|
|
|
|
|
|
if vocab_size is None:
|
|
|
import os
|
|
|
state_dict = None
|
|
|
|
|
|
if os.path.isdir(pretrained_model_name_or_path):
|
|
|
if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)):
|
|
|
try:
|
|
|
from safetensors.torch import load_file
|
|
|
state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME))
|
|
|
except Exception:
|
|
|
pass
|
|
|
elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)):
|
|
|
try:
|
|
|
state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
if state_dict is not None and "embedding.weight" in state_dict:
|
|
|
vocab_size = state_dict["embedding.weight"].shape[0]
|
|
|
else:
|
|
|
vocab_size = 30000
|
|
|
|
|
|
|
|
|
num_labels = kwargs.pop("num_labels", None)
|
|
|
if num_labels is None:
|
|
|
if config and hasattr(config, "num_labels"):
|
|
|
num_labels = config.num_labels
|
|
|
elif config and isinstance(config, dict) and "num_labels" in config:
|
|
|
num_labels = config["num_labels"]
|
|
|
else:
|
|
|
num_labels = 3
|
|
|
|
|
|
|
|
|
model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs)
|
|
|
|
|
|
return model
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
|
|
|
embedded = self.embedding(input_ids)
|
|
|
|
|
|
|
|
|
lstm_output, (hidden, cell) = self.lstm(embedded)
|
|
|
|
|
|
|
|
|
|
|
|
pooled_output = lstm_output.mean(dim=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
class EnsembleModel(BaseHateSpeechModel):
|
|
|
"""Ensemble model kết hợp các mô hình deep learning"""
|
|
|
def __init__(self, models: list, num_labels: int = 3, weights: list = None):
|
|
|
super().__init__("ensemble", num_labels)
|
|
|
self.models = nn.ModuleList(models)
|
|
|
self.num_models = len(models)
|
|
|
self.weights = weights if weights else [1.0] * self.num_models
|
|
|
self.weights = torch.tensor(self.weights, dtype=torch.float32)
|
|
|
self.weights = self.weights / self.weights.sum()
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
all_logits = []
|
|
|
total_loss = 0
|
|
|
|
|
|
for i, model in enumerate(self.models):
|
|
|
model_output = model(input_ids, attention_mask, labels)
|
|
|
all_logits.append(model_output["logits"])
|
|
|
|
|
|
if model_output["loss"] is not None:
|
|
|
total_loss += self.weights[i] * model_output["loss"]
|
|
|
|
|
|
|
|
|
ensemble_logits = torch.zeros_like(all_logits[0])
|
|
|
for i, logits in enumerate(all_logits):
|
|
|
ensemble_logits += self.weights[i] * logits
|
|
|
|
|
|
return {
|
|
|
"loss": total_loss if total_loss > 0 else None,
|
|
|
"logits": ensemble_logits
|
|
|
}
|
|
|
|
|
|
def get_model(model_name: str, num_labels: int = 3, **kwargs):
|
|
|
"""
|
|
|
Factory function để tạo model dựa trên tên
|
|
|
|
|
|
Args:
|
|
|
model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5",
|
|
|
"xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble")
|
|
|
num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE)
|
|
|
**kwargs: Các tham số bổ sung cho model
|
|
|
|
|
|
Returns:
|
|
|
Model instance
|
|
|
"""
|
|
|
model_mapping = {
|
|
|
"phobert-v1": PhoBERTV1Model,
|
|
|
"phobert-v2": PhoBERTV2Model,
|
|
|
"bartpho": BartPhoModel,
|
|
|
"visobert": ViSoBERTModel,
|
|
|
"vihate-t5": ViHateT5Model,
|
|
|
"xlm-r": XLMRModel,
|
|
|
"mbert": MBERTModel,
|
|
|
"sphobert": SPhoBERTModel,
|
|
|
"roberta-gru": RoBERTaGRUModel,
|
|
|
"textcnn": TextCNNModel,
|
|
|
"bilstm": BiLSTMModel,
|
|
|
"ensemble": EnsembleModel
|
|
|
}
|
|
|
|
|
|
if model_name not in model_mapping:
|
|
|
raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}")
|
|
|
|
|
|
model_class = model_mapping[model_name]
|
|
|
return model_class(num_labels=num_labels, **kwargs) |