import torch import torch.nn as nn import torch.nn.functional as F from transformers import ( AutoModel, AutoConfig, AutoTokenizer, T5ForConditionalGeneration, T5Config, AutoModelForSequenceClassification, PreTrainedModel, PretrainedConfig ) from transformers.modeling_utils import ( load_state_dict, WEIGHTS_NAME, SAFE_WEIGHTS_NAME, SAFE_WEIGHTS_INDEX_NAME, WEIGHTS_INDEX_NAME ) from transformers.utils import ( is_safetensors_available, is_torch_available, logging, EntryNotFoundError, PushToHubMixin ) import os import json import numpy as np logger = logging.get_logger(__name__) class BaseHateSpeechModel(nn.Module): """Base class cho tất cả các mô hình hate speech detection""" def __init__(self, model_name: str, num_labels: int = 3): super().__init__() self.num_labels = num_labels self.model_name = model_name def forward(self, input_ids, attention_mask, labels=None): raise NotImplementedError def load_state_dict(self, state_dict, strict=True): """ Override load_state_dict để bypass transformers' key renaming. Load trực tiếp state_dict vào model mà không qua key mapping. """ # Load trực tiếp, không qua transformers' key renaming missing_keys, unexpected_keys = super().load_state_dict(state_dict, strict=False) if missing_keys and strict: logger.warning(f"Missing keys when loading state_dict: {missing_keys}") if unexpected_keys: logger.warning(f"Unexpected keys when loading state_dict: {unexpected_keys}") return missing_keys, unexpected_keys @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): """ Load model từ pretrained checkpoint. Transformers sẽ tự động load state_dict sau khi khởi tạo model. """ # Extract config từ kwargs (transformers sẽ pass config vào đây) config = kwargs.pop("config", None) # Load config nếu chưa có if config is None: try: config = AutoConfig.from_pretrained(pretrained_model_name_or_path) except Exception: config = {} # Get num_labels từ config hoặc kwargs num_labels = kwargs.pop("num_labels", None) if num_labels is None: if hasattr(config, "num_labels"): num_labels = config.num_labels elif isinstance(config, dict) and "num_labels" in config: num_labels = config["num_labels"] else: num_labels = 3 # Lấy base model name từ config base_model_name = None if hasattr(config, "_name_or_path"): base_model_name = config._name_or_path elif isinstance(config, dict) and "_name_or_path" in config: base_model_name = config["_name_or_path"] # Khởi tạo model với base model name if base_model_name: model = cls(model_name=base_model_name, num_labels=num_labels, **kwargs) else: # Fallback: dùng default model_name từ class model = cls(num_labels=num_labels, **kwargs) return model class PhoBERTV2Model(BaseHateSpeechModel): """PhoBERT-V2 cho hate speech detection""" def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) pooled_output = outputs.pooler_output pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class BartPhoModel(BaseHateSpeechModel): """BART Pho cho hate speech detection""" def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.d_model, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) # Sử dụng hidden state của token cuối cùng last_hidden_states = outputs.last_hidden_state pooled_output = last_hidden_states.mean(dim=1) # Mean pooling pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class ViSoBERTModel(BaseHateSpeechModel): """ViSoBERT cho hate speech detection""" def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) # Kiểm tra xem có pooler_output không, nếu không thì dùng last_hidden_state if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: pooled_output = outputs.pooler_output else: # Fallback: sử dụng mean pooling của last_hidden_state pooled_output = outputs.last_hidden_state.mean(dim=1) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class PhoBERTV1Model(BaseHateSpeechModel): """PhoBERT-V1 cho hate speech detection""" def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) # Một số encoder không có pooler_output if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: pooled_output = outputs.pooler_output else: pooled_output = outputs.last_hidden_state.mean(dim=1) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class MBERTModel(BaseHateSpeechModel): """mBERT (bert-base-multilingual-cased) cho hate speech detection""" def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: pooled_output = outputs.pooler_output else: pooled_output = outputs.last_hidden_state.mean(dim=1) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class SPhoBERTModel(BaseHateSpeechModel): """SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection""" def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: pooled_output = outputs.pooler_output else: pooled_output = outputs.last_hidden_state.mean(dim=1) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class ViHateT5Model(BaseHateSpeechModel): """ViHateT5 cho hate speech detection""" def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = T5Config.from_pretrained(model_name) self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.d_model, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask) # Sử dụng hidden state của token cuối cùng last_hidden_states = outputs.last_hidden_state pooled_output = last_hidden_states.mean(dim=1) # Mean pooling pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class XLMRModel(BaseHateSpeechModel): """XLM-R Large cho hate speech detection""" def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) pooled_output = outputs.pooler_output pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class RoBERTaGRUModel(BaseHateSpeechModel): """RoBERTa + GRU Hybrid model""" def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256): super().__init__(model_name, num_labels) self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.gru = nn.GRU( input_size=self.config.hidden_size, hidden_size=hidden_size, num_layers=2, batch_first=True, dropout=0.1, bidirectional=True ) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) hidden_states = outputs.last_hidden_state # [batch_size, seq_len, hidden_size] # GRU processing gru_output, _ = self.gru(hidden_states) # [batch_size, seq_len, hidden_size*2] # Global average pooling pooled_output = gru_output.mean(dim=1) # [batch_size, hidden_size*2] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class TextCNNModel(BaseHateSpeechModel): """TextCNN cho hate speech detection""" def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3, num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5): super().__init__("textcnn", num_labels) self.embedding = nn.Embedding(vocab_size, embedding_dim) self.convs = nn.ModuleList([ nn.Conv2d(1, num_filters, (filter_size, embedding_dim)) for filter_size in filter_sizes ]) self.dropout = nn.Dropout(dropout) self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels) @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): """Override để detect vocab_size từ state_dict hoặc checkpoint file""" # Get vocab_size từ kwargs hoặc config vocab_size = kwargs.pop("vocab_size", None) config = kwargs.pop("config", None) # Nếu chưa có vocab_size, thử detect từ checkpoint file if vocab_size is None: import os state_dict = None # Try to load state_dict từ local path để detect vocab_size if os.path.isdir(pretrained_model_name_or_path): if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)): try: from safetensors.torch import load_file state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)) except Exception: pass elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): try: state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu") except Exception: pass # Detect vocab_size từ embedding.weight if state_dict is not None and "embedding.weight" in state_dict: vocab_size = state_dict["embedding.weight"].shape[0] else: vocab_size = 30000 # Default # Get num_labels num_labels = kwargs.pop("num_labels", None) if num_labels is None: if config and hasattr(config, "num_labels"): num_labels = config.num_labels elif config and isinstance(config, dict) and "num_labels" in config: num_labels = config["num_labels"] else: num_labels = 3 # Khởi tạo model model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs) return model def forward(self, input_ids, attention_mask, labels=None): # Embedding embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim] # Add channel dimension for Conv2d embedded = embedded.unsqueeze(1) # [batch_size, 1, seq_len, embedding_dim] # Convolutional layers conv_outputs = [] for conv in self.convs: conv_out = F.relu(conv(embedded)) # [batch_size, num_filters, seq_len', 1] conv_out = conv_out.squeeze(3) # [batch_size, num_filters, seq_len'] pooled = F.max_pool1d(conv_out, conv_out.size(2)) # [batch_size, num_filters, 1] pooled = pooled.squeeze(2) # [batch_size, num_filters] conv_outputs.append(pooled) # Concatenate all conv outputs concatenated = torch.cat(conv_outputs, dim=1) # [batch_size, num_filters * len(filter_sizes)] # Classification concatenated = self.dropout(concatenated) logits = self.classifier(concatenated) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class BiLSTMModel(BaseHateSpeechModel): """BiLSTM cho hate speech detection""" def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256, num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5): super().__init__("bilstm", num_labels) self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM( input_size=embedding_dim, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True ) self.dropout = nn.Dropout(dropout) self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): """Override để detect vocab_size từ state_dict hoặc checkpoint file""" # Get vocab_size từ kwargs hoặc config vocab_size = kwargs.pop("vocab_size", None) config = kwargs.pop("config", None) # Nếu chưa có vocab_size, thử detect từ checkpoint file if vocab_size is None: import os state_dict = None # Try to load state_dict từ local path để detect vocab_size if os.path.isdir(pretrained_model_name_or_path): if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)): try: from safetensors.torch import load_file state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)) except Exception: pass elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): try: state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu") except Exception: pass # Detect vocab_size từ embedding.weight if state_dict is not None and "embedding.weight" in state_dict: vocab_size = state_dict["embedding.weight"].shape[0] else: vocab_size = 30000 # Default # Get num_labels num_labels = kwargs.pop("num_labels", None) if num_labels is None: if config and hasattr(config, "num_labels"): num_labels = config.num_labels elif config and isinstance(config, dict) and "num_labels" in config: num_labels = config["num_labels"] else: num_labels = 3 # Khởi tạo model model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs) return model def forward(self, input_ids, attention_mask, labels=None): # Embedding embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim] # BiLSTM lstm_output, (hidden, cell) = self.lstm(embedded) # [batch_size, seq_len, hidden_size*2] # Global average pooling (có thể thay bằng max pooling hoặc last hidden state) # Option 1: Global average pooling pooled_output = lstm_output.mean(dim=1) # [batch_size, hidden_size*2] # Option 2: Last hidden state (uncomment if preferred) # pooled_output = lstm_output[:, -1, :] # [batch_size, hidden_size*2] # Option 3: Max pooling (uncomment if preferred) # pooled_output = torch.max(lstm_output, dim=1)[0] # [batch_size, hidden_size*2] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class EnsembleModel(BaseHateSpeechModel): """Ensemble model kết hợp các mô hình deep learning""" def __init__(self, models: list, num_labels: int = 3, weights: list = None): super().__init__("ensemble", num_labels) self.models = nn.ModuleList(models) self.num_models = len(models) self.weights = weights if weights else [1.0] * self.num_models self.weights = torch.tensor(self.weights, dtype=torch.float32) self.weights = self.weights / self.weights.sum() # Normalize weights def forward(self, input_ids, attention_mask, labels=None): all_logits = [] total_loss = 0 for i, model in enumerate(self.models): model_output = model(input_ids, attention_mask, labels) all_logits.append(model_output["logits"]) if model_output["loss"] is not None: total_loss += self.weights[i] * model_output["loss"] # Weighted average of logits ensemble_logits = torch.zeros_like(all_logits[0]) for i, logits in enumerate(all_logits): ensemble_logits += self.weights[i] * logits return { "loss": total_loss if total_loss > 0 else None, "logits": ensemble_logits } def get_model(model_name: str, num_labels: int = 3, **kwargs): """ Factory function để tạo model dựa trên tên Args: model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5", "xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble") num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE) **kwargs: Các tham số bổ sung cho model Returns: Model instance """ model_mapping = { "phobert-v1": PhoBERTV1Model, "phobert-v2": PhoBERTV2Model, "bartpho": BartPhoModel, "visobert": ViSoBERTModel, "vihate-t5": ViHateT5Model, "xlm-r": XLMRModel, "mbert": MBERTModel, "sphobert": SPhoBERTModel, "roberta-gru": RoBERTaGRUModel, "textcnn": TextCNNModel, "bilstm": BiLSTMModel, "ensemble": EnsembleModel } if model_name not in model_mapping: raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}") model_class = model_mapping[model_name] return model_class(num_labels=num_labels, **kwargs)