| from transformers import BertConfig, BertModel |
| import torch.nn as nn |
| import torch |
| from typing import Optional, Union, Tuple, List |
| from transformers.modeling_outputs import SequenceClassifierOutput |
| from torch.nn import CrossEntropyLoss |
|
|
|
|
| class HydraConfig(BertConfig): |
| model_type = "hydra" |
| label_groups = None |
|
|
| def __init__(self, **kwargs): |
| super().__init__(**kwargs) |
|
|
| def num_labels(self): |
| return sum([len(group) for group in self.label_groups]) |
|
|
| def distilbert_config(self): |
| return BertConfig(**self.__dict__) |
|
|
|
|
| class HydraSequenceClassifierOutput(SequenceClassifierOutput): |
| classifications: List[dict] |
|
|
| def __init__(self, classifications=None, **kwargs): |
| super().__init__(**kwargs) |
| self.classifications = classifications |
|
|
|
|
| class Hydra(BertModel): |
| config_class = HydraConfig |
|
|
| def __init__(self, config: HydraConfig): |
| super().__init__(config) |
| self.config = config |
| self.pre_classifier = nn.Linear(config.hidden_size, config.hidden_size) |
| self.classifier = nn.Linear(config.hidden_size, sum( |
| [len(group) for group in config.label_groups])) |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
| self.embeddings.requires_grad_(False) |
|
|
| self.post_init() |
|
|
| def forward( |
| self, |
| input_ids: Optional[torch.Tensor] = None, |
| attention_mask: Optional[torch.Tensor] = None, |
| head_mask: Optional[torch.Tensor] = None, |
| inputs_embeds: Optional[torch.Tensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| ) -> Union[SequenceClassifierOutput, Tuple[torch.Tensor, ...]]: |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| distilbert_output = super().forward( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| head_mask=head_mask, |
| inputs_embeds=inputs_embeds, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict |
| ) |
| hidden_state = distilbert_output[0] |
| pooled_output = hidden_state[:, 0] |
| pooled_output = self.pre_classifier(pooled_output) |
| pooled_output = nn.ReLU()(pooled_output) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
|
|
| loss = None |
| if labels is not None: |
|
|
| loss_fct = CrossEntropyLoss() |
| loss = loss_fct(logits, labels) |
|
|
| if not return_dict: |
| output = (logits,) + distilbert_output[1:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| classifications = [] |
| if logits.shape[0] == 1: |
| offset = 0 |
| for group in self.config.label_groups: |
| inverted = {group[pair]: pair for pair in group} |
| softmax = nn.Softmax(dim=1) |
| output = softmax(logits[:, offset:offset + len(group)]) |
| classification = [] |
| for i, val in enumerate(output[0]): |
| classification.append((inverted[i], val.item())) |
| classification.sort(key=lambda x: x[1], reverse=True) |
| classifications.append(classification) |
| offset += len(group) |
|
|
| return HydraSequenceClassifierOutput( |
| loss=loss, |
| logits=logits, |
| hidden_states=distilbert_output.hidden_states, |
| attentions=distilbert_output.attentions, |
| classifications=classifications |
| ) |
|
|
| def to(self, device): |
| super().to(device) |
| self.pre_classifier.to(device) |
| self.classifier.to(device) |
| self.dropout.to(device) |
| return self |
|
|