""" Model architectures for emotion recognition. """ import torch import torch.nn as nn import torch.nn.functional as F from transformers import AutoModel, AutoConfig, AutoModelForSequenceClassification class BaseEmotionModel(nn.Module): """ Base class for emotion classification models. """ def __init__(self, model_name: str, num_labels: int): super().__init__() self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(self.config.hidden_size, num_labels) class TransformerForEmotion(BaseEmotionModel): """ Standard transformer model for emotion classification. Uses CLS token pooling. """ def forward(self, input_ids, attention_mask, labels=None): """Forward pass.""" outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) # Try to get pooled output, fallback to CLS token if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: pooled_output = outputs.pooler_output else: pooled_output = outputs.last_hidden_state[:, 0] # CLS token pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class SPhoBERTModel(BaseEmotionModel): """ SPhoBERT - Specialized PhoBERT variant for emotion recognition. Uses mean pooling over sequence output instead of CLS token. """ def forward(self, input_ids, attention_mask, labels=None): """Forward pass with mean pooling.""" outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) # Try pooler_output first, then use mean pooling if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: pooled_output = outputs.pooler_output else: # Mean pooling over sequence length pooled_output = outputs.last_hidden_state.mean(dim=1) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class RoBERTaGRUModel(nn.Module): """ RoBERTa + GRU Hybrid model for emotion recognition. """ def __init__(self, model_name: str, num_labels: int, hidden_size: int = 256): super().__init__() self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) self.gru = nn.GRU( input_size=self.config.hidden_size, hidden_size=hidden_size, num_layers=2, batch_first=True, dropout=0.1, bidirectional=True ) self.dropout = nn.Dropout(0.1) self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional def forward(self, input_ids, attention_mask, labels=None): outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) hidden_states = outputs.last_hidden_state # [batch_size, seq_len, hidden_size] # GRU processing gru_output, _ = self.gru(hidden_states) # [batch_size, seq_len, hidden_size*2] # Global average pooling pooled_output = gru_output.mean(dim=1) # [batch_size, hidden_size*2] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class TextCNNModel(nn.Module): """ TextCNN model for emotion recognition. """ def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 7, num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.convs = nn.ModuleList([ nn.Conv2d(1, num_filters, (filter_size, embedding_dim)) for filter_size in filter_sizes ]) self.dropout = nn.Dropout(dropout) self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels) def forward(self, input_ids, attention_mask, labels=None): # Embedding embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim] # Add channel dimension for Conv2d embedded = embedded.unsqueeze(1) # [batch_size, 1, seq_len, embedding_dim] # Convolutional layers conv_outputs = [] for conv in self.convs: conv_out = F.relu(conv(embedded)) # [batch_size, num_filters, seq_len', 1] conv_out = conv_out.squeeze(3) # [batch_size, num_filters, seq_len'] pooled = F.max_pool1d(conv_out, conv_out.size(2)) # [batch_size, num_filters, 1] pooled = pooled.squeeze(2) # [batch_size, num_filters] conv_outputs.append(pooled) # Concatenate all conv outputs concatenated = torch.cat(conv_outputs, dim=1) # [batch_size, num_filters * len(filter_sizes)] # Classification concatenated = self.dropout(concatenated) logits = self.classifier(concatenated) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} class BiLSTMModel(nn.Module): """ BiLSTM model for emotion recognition. """ def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256, num_labels: int = 7, num_layers: int = 2, dropout: float = 0.5): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM( input_size=embedding_dim, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True ) self.dropout = nn.Dropout(dropout) self.classifier = nn.Linear(hidden_size * 2, num_labels) # *2 for bidirectional def forward(self, input_ids, attention_mask, labels=None): # Embedding embedded = self.embedding(input_ids) # [batch_size, seq_len, embedding_dim] # BiLSTM lstm_output, (hidden, cell) = self.lstm(embedded) # [batch_size, seq_len, hidden_size*2] # Global average pooling pooled_output = lstm_output.mean(dim=1) # [batch_size, hidden_size*2] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fn = nn.CrossEntropyLoss() loss = loss_fn(logits, labels) return {"loss": loss, "logits": logits} def get_model(model_name: str, num_labels: int, use_custom: bool = False, model_type: str = "standard", **kwargs): """ Factory function to get a model instance. Args: model_name: HuggingFace model identifier num_labels: Number of classification labels use_custom: Whether to use custom implementation model_type: Type of model - "standard", "sphobert", "roberta-gru", "textcnn", "bilstm" **kwargs: Additional model arguments """ if model_type == "sphobert": return SPhoBERTModel(model_name, num_labels) elif model_type == "roberta-gru": hidden_size = kwargs.get('hidden_size', 256) return RoBERTaGRUModel(model_name, num_labels, hidden_size) elif model_type == "textcnn": vocab_size = kwargs.get('vocab_size', 32000) embedding_dim = kwargs.get('embedding_dim', 128) return TextCNNModel(vocab_size, embedding_dim, num_labels) elif model_type == "bilstm": vocab_size = kwargs.get('vocab_size', 32000) embedding_dim = kwargs.get('embedding_dim', 128) hidden_size = kwargs.get('hidden_size', 256) return BiLSTMModel(vocab_size, embedding_dim, hidden_size, num_labels) elif use_custom: return TransformerForEmotion(model_name, num_labels, **kwargs) else: # Use HuggingFace AutoModel for Sequence Classification try: config = AutoConfig.from_pretrained(model_name) config.num_labels = num_labels model = AutoModelForSequenceClassification.from_pretrained( model_name, config=config, **{k: v for k, v in kwargs.items() if k in ['ignore_mismatched_sizes']} ) return model except Exception as e: print(f"Warning: Failed to use AutoModelForSequenceClassification: {e}") return TransformerForEmotion(model_name, num_labels, **kwargs)