|
|
"""
|
|
|
Model architectures for emotion recognition.
|
|
|
"""
|
|
|
|
|
|
import torch
|
|
|
import torch.nn as nn
|
|
|
import torch.nn.functional as F
|
|
|
from transformers import AutoModel, AutoConfig, AutoModelForSequenceClassification
|
|
|
|
|
|
|
|
|
class BaseEmotionModel(nn.Module):
|
|
|
"""
|
|
|
Base class for emotion classification models.
|
|
|
"""
|
|
|
def __init__(self, model_name: str, num_labels: int):
|
|
|
super().__init__()
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(self.config.hidden_size, num_labels)
|
|
|
|
|
|
|
|
|
class TransformerForEmotion(BaseEmotionModel):
|
|
|
"""
|
|
|
Standard transformer model for emotion classification.
|
|
|
Uses CLS token pooling.
|
|
|
"""
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
"""Forward pass."""
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
|
|
|
|
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
|
|
pooled_output = outputs.pooler_output
|
|
|
else:
|
|
|
pooled_output = outputs.last_hidden_state[:, 0]
|
|
|
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
|
|
|
class SPhoBERTModel(BaseEmotionModel):
|
|
|
"""
|
|
|
SPhoBERT - Specialized PhoBERT variant for emotion recognition.
|
|
|
Uses mean pooling over sequence output instead of CLS token.
|
|
|
"""
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
"""Forward pass with mean pooling."""
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
|
|
|
|
|
|
if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None:
|
|
|
pooled_output = outputs.pooler_output
|
|
|
else:
|
|
|
|
|
|
pooled_output = outputs.last_hidden_state.mean(dim=1)
|
|
|
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
|
|
|
class RoBERTaGRUModel(nn.Module):
|
|
|
"""
|
|
|
RoBERTa + GRU Hybrid model for emotion recognition.
|
|
|
"""
|
|
|
def __init__(self, model_name: str, num_labels: int, hidden_size: int = 256):
|
|
|
super().__init__()
|
|
|
self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True)
|
|
|
self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True)
|
|
|
self.gru = nn.GRU(
|
|
|
input_size=self.config.hidden_size,
|
|
|
hidden_size=hidden_size,
|
|
|
num_layers=2,
|
|
|
batch_first=True,
|
|
|
dropout=0.1,
|
|
|
bidirectional=True
|
|
|
)
|
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
|
|
|
hidden_states = outputs.last_hidden_state
|
|
|
|
|
|
|
|
|
gru_output, _ = self.gru(hidden_states)
|
|
|
|
|
|
|
|
|
pooled_output = gru_output.mean(dim=1)
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
|
|
|
class TextCNNModel(nn.Module):
|
|
|
"""
|
|
|
TextCNN model for emotion recognition.
|
|
|
"""
|
|
|
def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 7,
|
|
|
num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5):
|
|
|
super().__init__()
|
|
|
self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
|
|
self.convs = nn.ModuleList([
|
|
|
nn.Conv2d(1, num_filters, (filter_size, embedding_dim))
|
|
|
for filter_size in filter_sizes
|
|
|
])
|
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
|
|
|
embedded = self.embedding(input_ids)
|
|
|
|
|
|
|
|
|
embedded = embedded.unsqueeze(1)
|
|
|
|
|
|
|
|
|
conv_outputs = []
|
|
|
for conv in self.convs:
|
|
|
conv_out = F.relu(conv(embedded))
|
|
|
conv_out = conv_out.squeeze(3)
|
|
|
pooled = F.max_pool1d(conv_out, conv_out.size(2))
|
|
|
pooled = pooled.squeeze(2)
|
|
|
conv_outputs.append(pooled)
|
|
|
|
|
|
|
|
|
concatenated = torch.cat(conv_outputs, dim=1)
|
|
|
|
|
|
|
|
|
concatenated = self.dropout(concatenated)
|
|
|
logits = self.classifier(concatenated)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
|
|
|
class BiLSTMModel(nn.Module):
|
|
|
"""
|
|
|
BiLSTM model for emotion recognition.
|
|
|
"""
|
|
|
def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256,
|
|
|
num_labels: int = 7, num_layers: int = 2, dropout: float = 0.5):
|
|
|
super().__init__()
|
|
|
self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
|
|
self.lstm = nn.LSTM(
|
|
|
input_size=embedding_dim,
|
|
|
hidden_size=hidden_size,
|
|
|
num_layers=num_layers,
|
|
|
batch_first=True,
|
|
|
dropout=dropout if num_layers > 1 else 0,
|
|
|
bidirectional=True
|
|
|
)
|
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
self.classifier = nn.Linear(hidden_size * 2, num_labels)
|
|
|
|
|
|
def forward(self, input_ids, attention_mask, labels=None):
|
|
|
|
|
|
embedded = self.embedding(input_ids)
|
|
|
|
|
|
|
|
|
lstm_output, (hidden, cell) = self.lstm(embedded)
|
|
|
|
|
|
|
|
|
pooled_output = lstm_output.mean(dim=1)
|
|
|
|
|
|
pooled_output = self.dropout(pooled_output)
|
|
|
logits = self.classifier(pooled_output)
|
|
|
|
|
|
loss = None
|
|
|
if labels is not None:
|
|
|
loss_fn = nn.CrossEntropyLoss()
|
|
|
loss = loss_fn(logits, labels)
|
|
|
return {"loss": loss, "logits": logits}
|
|
|
|
|
|
|
|
|
def get_model(model_name: str, num_labels: int, use_custom: bool = False,
|
|
|
model_type: str = "standard", **kwargs):
|
|
|
"""
|
|
|
Factory function to get a model instance.
|
|
|
|
|
|
Args:
|
|
|
model_name: HuggingFace model identifier
|
|
|
num_labels: Number of classification labels
|
|
|
use_custom: Whether to use custom implementation
|
|
|
model_type: Type of model - "standard", "sphobert", "roberta-gru", "textcnn", "bilstm"
|
|
|
**kwargs: Additional model arguments
|
|
|
"""
|
|
|
if model_type == "sphobert":
|
|
|
return SPhoBERTModel(model_name, num_labels)
|
|
|
elif model_type == "roberta-gru":
|
|
|
hidden_size = kwargs.get('hidden_size', 256)
|
|
|
return RoBERTaGRUModel(model_name, num_labels, hidden_size)
|
|
|
elif model_type == "textcnn":
|
|
|
vocab_size = kwargs.get('vocab_size', 32000)
|
|
|
embedding_dim = kwargs.get('embedding_dim', 128)
|
|
|
return TextCNNModel(vocab_size, embedding_dim, num_labels)
|
|
|
elif model_type == "bilstm":
|
|
|
vocab_size = kwargs.get('vocab_size', 32000)
|
|
|
embedding_dim = kwargs.get('embedding_dim', 128)
|
|
|
hidden_size = kwargs.get('hidden_size', 256)
|
|
|
return BiLSTMModel(vocab_size, embedding_dim, hidden_size, num_labels)
|
|
|
elif use_custom:
|
|
|
return TransformerForEmotion(model_name, num_labels, **kwargs)
|
|
|
else:
|
|
|
|
|
|
try:
|
|
|
config = AutoConfig.from_pretrained(model_name)
|
|
|
config.num_labels = num_labels
|
|
|
|
|
|
model = AutoModelForSequenceClassification.from_pretrained(
|
|
|
model_name,
|
|
|
config=config,
|
|
|
**{k: v for k, v in kwargs.items() if k in ['ignore_mismatched_sizes']}
|
|
|
)
|
|
|
return model
|
|
|
except Exception as e:
|
|
|
print(f"Warning: Failed to use AutoModelForSequenceClassification: {e}")
|
|
|
return TransformerForEmotion(model_name, num_labels, **kwargs) |