bilstm-hsd / models.py
AnnyNguyen's picture
Upload models.py with huggingface_hub
1ec7fd9 verified
raw
history blame
20.4 kB
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import (
AutoModel, AutoConfig, AutoTokenizer,
T5ForConditionalGeneration, T5Config,
AutoModelForSequenceClassification
)
from torchcrf import CRF
import numpy as np
class BaseHateSpeechModel(nn.Module):
"""Base class cho tất cả các mô hình hate speech detection"""
def __init__(self, model_name: str, num_labels: int = 3):
super().__init__()
self.num_labels = num_labels
self.model_name = model_name
def forward(self, input_ids, attention_mask, labels=None):
raise NotImplementedError
class PhoBERTV2Model(BaseHateSpeechModel):
"""PhoBERT-V2 cho hate speech detection"""
def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class BartPhoModel(BaseHateSpeechModel):
"""BART Pho cho hate speech detection"""
def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.d_model, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Sử dụng hidden state của token cuối cùng
last_hidden_states = outputs.last_hidden_state
pooled_output = last_hidden_states.mean(dim=1) # Mean pooling
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class ViSoBERTModel(BaseHateSpeechModel):
"""ViSoBERT cho hate speech detection"""
def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Kiểm tra xem có pooler_output không, nếu không thì dùng last_hidden_state
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
# Fallback: sử dụng mean pooling của last_hidden_state
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class PhoBERTV1Model(BaseHateSpeechModel):
"""PhoBERT-V1 cho hate speech detection"""
def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Một số encoder không có pooler_output
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class MBERTModel(BaseHateSpeechModel):
"""mBERT (bert-base-multilingual-cased) cho hate speech detection"""
def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class SPhoBERTModel(BaseHateSpeechModel):
"""SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection"""
def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
pooled_output = outputs.pooler_output
else:
pooled_output = outputs.last_hidden_state.mean(dim=1)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class ViHateT5Model(BaseHateSpeechModel):
"""ViHateT5 cho hate speech detection"""
def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = T5Config.from_pretrained(model_name)
self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.d_model, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Sử dụng hidden state của token cuối cùng
last_hidden_states = outputs.last_hidden_state
pooled_output = last_hidden_states.mean(dim=1) # Mean pooling
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class XLMRModel(BaseHateSpeechModel):
"""XLM-R Large cho hate speech detection"""
def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class RoBERTaGRUModel(BaseHateSpeechModel):
"""RoBERTa + GRU Hybrid model"""
def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256):
super().__init__(model_name, num_labels)
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
self.gru = nn.GRU(
input_size=self.config.hidden_size,
hidden_size=hidden_size,
num_layers=2,
batch_first=True,
dropout=0.1,
bidirectional=True
)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional
def forward(self, input_ids, attention_mask, labels=None):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
hidden_states = outputs.last_hidden_state # [batch_size, seq_len, hidden_size]
# GRU processing
gru_output, _ = self.gru(hidden_states) # [batch_size, seq_len, hidden_size*2]
# Global average pooling
pooled_output = gru_output.mean(dim=1) # [batch_size, hidden_size*2]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class TextCNNModel(BaseHateSpeechModel):
"""TextCNN cho hate speech detection"""
def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3,
num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5):
super().__init__("textcnn", num_labels)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.convs = nn.ModuleList([
nn.Conv2d(1, num_filters, (filter_size, embedding_dim))
for filter_size in filter_sizes
])
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels)
def forward(self, input_ids, attention_mask, labels=None):
# Embedding
embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim]
# Add channel dimension for Conv2d
embedded = embedded.unsqueeze(1) # [batch_size, 1, seq_len, embedding_dim]
# Convolutional layers
conv_outputs = []
for conv in self.convs:
conv_out = F.relu(conv(embedded)) # [batch_size, num_filters, seq_len', 1]
conv_out = conv_out.squeeze(3) # [batch_size, num_filters, seq_len']
pooled = F.max_pool1d(conv_out, conv_out.size(2)) # [batch_size, num_filters, 1]
pooled = pooled.squeeze(2) # [batch_size, num_filters]
conv_outputs.append(pooled)
# Concatenate all conv outputs
concatenated = torch.cat(conv_outputs, dim=1) # [batch_size, num_filters * len(filter_sizes)]
# Classification
concatenated = self.dropout(concatenated)
logits = self.classifier(concatenated)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class BiLSTMModel(BaseHateSpeechModel):
"""BiLSTM cho hate speech detection"""
def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5):
super().__init__("bilstm", num_labels)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional
def forward(self, input_ids, attention_mask, labels=None):
# Embedding
embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim]
# BiLSTM
lstm_output, (hidden, cell) = self.lstm(embedded) # [batch_size, seq_len, hidden_size*2]
# Global average pooling (có thể thay bằng max pooling hoặc last hidden state)
# Option 1: Global average pooling
pooled_output = lstm_output.mean(dim=1) # [batch_size, hidden_size*2]
# Option 2: Last hidden state (uncomment if preferred)
# pooled_output = lstm_output[:, -1, :] # [batch_size, hidden_size*2]
# Option 3: Max pooling (uncomment if preferred)
# pooled_output = torch.max(lstm_output, dim=1)[0] # [batch_size, hidden_size*2]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(logits, labels)
return {"loss": loss, "logits": logits}
class BiLSTMCRFModel(BaseHateSpeechModel):
"""BiLSTM + CRF cho hate speech detection"""
def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
num_labels: int = 3, num_layers: int = 2):
super().__init__("bilstm-crf", num_labels)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=0.1,
bidirectional=True
)
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(hidden_size * 2, num_labels)
self.crf = CRF(num_labels, batch_first=True)
def forward(self, input_ids, attention_mask, labels=None):
# Embedding
embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim]
# BiLSTM
lstm_output, _ = self.lstm(embedded) # [batch_size, seq_len, hidden_size*2]
lstm_output = self.dropout(lstm_output)
# Classification
emissions = self.classifier(lstm_output) # [batch_size, seq_len, num_labels]
if labels is not None:
# CRF loss - labels phải có cùng shape với emissions
if labels.dim() == 1:
# Nếu labels là 1D, reshape thành 2D để match với emissions
labels = labels.unsqueeze(1).expand(-1, emissions.size(1))
loss = -self.crf(emissions, labels, mask=attention_mask.bool())
return {"loss": loss, "logits": emissions}
else:
# CRF prediction
predictions = self.crf.decode(emissions, mask=attention_mask.bool())
return {"loss": None, "logits": emissions, "predictions": predictions}
class EnsembleModel(BaseHateSpeechModel):
"""Ensemble model kết hợp các mô hình deep learning"""
def __init__(self, models: list, num_labels: int = 3, weights: list = None):
super().__init__("ensemble", num_labels)
self.models = nn.ModuleList(models)
self.num_models = len(models)
self.weights = weights if weights else [1.0] * self.num_models
self.weights = torch.tensor(self.weights, dtype=torch.float32)
self.weights = self.weights / self.weights.sum() # Normalize weights
def forward(self, input_ids, attention_mask, labels=None):
all_logits = []
total_loss = 0
for i, model in enumerate(self.models):
model_output = model(input_ids, attention_mask, labels)
all_logits.append(model_output["logits"])
if model_output["loss"] is not None:
total_loss += self.weights[i] * model_output["loss"]
# Weighted average of logits
ensemble_logits = torch.zeros_like(all_logits[0])
for i, logits in enumerate(all_logits):
ensemble_logits += self.weights[i] * logits
return {
"loss": total_loss if total_loss > 0 else None,
"logits": ensemble_logits
}
def get_model(model_name: str, num_labels: int = 3, **kwargs):
"""
Factory function để tạo model dựa trên tên
Args:
model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5",
"xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble")
num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE)
**kwargs: Các tham số bổ sung cho model
Returns:
Model instance
"""
model_mapping = {
"phobert-v1": PhoBERTV1Model,
"phobert-v2": PhoBERTV2Model,
"bartpho": BartPhoModel,
"visobert": ViSoBERTModel,
"vihate-t5": ViHateT5Model,
"xlm-r": XLMRModel,
"mbert": MBERTModel,
"sphobert": SPhoBERTModel,
"roberta-gru": RoBERTaGRUModel,
"textcnn": TextCNNModel,
"bilstm": BiLSTMModel,
"bilstm-crf": BiLSTMCRFModel,
"ensemble": EnsembleModel
}
if model_name not in model_mapping:
raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}")
model_class = model_mapping[model_name]
return model_class(num_labels=num_labels, **kwargs)