| import torch | |
| import torch.nn as nn | |
| import numpy as np | |
| from transformers import AutoTokenizer, AutoModel | |
| from torchcrf import CRF | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model, dropout=0.1, max_len=5000): | |
| super(PositionalEncoding, self).__init__() | |
| self.dropout = nn.Dropout(p=dropout) | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| pe = pe.unsqueeze(0) | |
| self.register_buffer('pe', pe) | |
| def forward(self, x): | |
| x = x + self.pe[:, :x.size(1), :] | |
| return self.dropout(x) | |
| class VanillaTransformer(nn.Module): | |
| def __init__(self, d_model=768, nhead=8, num_layers=3, dim_feedforward=2048, dropout=0.1): | |
| super(VanillaTransformer, self).__init__() | |
| self.pos_encoder = PositionalEncoding(d_model, dropout) | |
| encoder_layer = nn.TransformerEncoderLayer( | |
| d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, | |
| dropout=dropout, activation='gelu', batch_first=True | |
| ) | |
| self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) | |
| self.d_model = d_model | |
| def forward(self, src, src_mask=None, src_key_padding_mask=None): | |
| src = self.pos_encoder(src) | |
| output = self.transformer(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask) | |
| return output | |
| class HierarchicalLegalSegModel(nn.Module): | |
| def __init__(self, longformer_model, num_labels, hidden_dim=768, transformer_layers=3, transformer_heads=8, dropout=0.1): | |
| super(HierarchicalLegalSegModel, self).__init__() | |
| self.longformer = longformer_model | |
| self.hidden_dim = hidden_dim | |
| self.vanilla_transformer = VanillaTransformer(d_model=hidden_dim, nhead=transformer_heads, num_layers=transformer_layers, dim_feedforward=hidden_dim*4, dropout=dropout) | |
| self.classifier = nn.Linear(hidden_dim, num_labels) | |
| self.crf = CRF(num_labels, batch_first=True) | |
| self.dropout = nn.Dropout(dropout) | |
| self.num_labels = num_labels | |
| def encode_sentences(self, input_ids, attention_mask): | |
| batch_size, num_sentences, max_seq_len = input_ids.shape | |
| input_ids_flat = input_ids.view(-1, max_seq_len) | |
| attention_mask_flat = attention_mask.view(-1, max_seq_len) | |
| outputs = self.longformer(input_ids=input_ids_flat, attention_mask=attention_mask_flat) | |
| cls_embeddings = outputs.last_hidden_state[:, 0, :] | |
| sentence_embeddings = cls_embeddings.view(batch_size, num_sentences, self.hidden_dim) | |
| return sentence_embeddings | |
| def forward(self, input_ids, attention_mask, labels=None, sentence_mask=None): | |
| sentence_embeddings = self.encode_sentences(input_ids, attention_mask) | |
| sentence_embeddings = self.dropout(sentence_embeddings) | |
| transformer_output = self.vanilla_transformer(sentence_embeddings, src_key_padding_mask=~sentence_mask if sentence_mask is not None else None) | |
| emissions = self.classifier(transformer_output) | |
| if labels is not None: | |
| loss = -self.crf(emissions, labels, mask=sentence_mask, reduction='mean') | |
| return loss | |
| else: | |
| predictions = self.crf.decode(emissions, mask=sentence_mask) | |
| return predictions | |
| class EndpointHandler: | |
| def __init__(self, path=""): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.tokenizer = AutoTokenizer.from_pretrained(path) | |
| longformer = AutoModel.from_pretrained("lexlms/legal-longformer-base") | |
| longformer = longformer.to(self.device) | |
| for param in longformer.parameters(): | |
| param.requires_grad = False | |
| self.model = HierarchicalLegalSegModel(longformer, num_labels=7) | |
| checkpoint = torch.load(f"{path}/model.pth", map_location=self.device) | |
| if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: | |
| self.model.load_state_dict(checkpoint['model_state_dict']) | |
| else: | |
| self.model.load_state_dict(checkpoint) | |
| self.model = self.model.to(self.device) | |
| self.model.eval() | |
| self.id2label = { | |
| 0: "Arguments of Petitioner", 1: "Arguments of Respondent", 2: "Decision", | |
| 3: "Facts", 4: "Issue", 5: "None", 6: "Reasoning" | |
| } | |
| def __call__(self, data): | |
| text = data.get("inputs", "") | |
| encoded = self.tokenizer(text, padding="max_length", truncation=True, max_length=512, return_tensors="pt") | |
| input_ids = encoded["input_ids"].unsqueeze(1).to(self.device) | |
| attention_mask = encoded["attention_mask"].unsqueeze(1).to(self.device) | |
| sentence_mask = torch.ones(1, 1, dtype=torch.bool).to(self.device) | |
| with torch.no_grad(): | |
| predictions = self.model(input_ids, attention_mask, sentence_mask=sentence_mask) | |
| label_id = predictions[0][0] | |
| return [{"label": self.id2label[label_id], "score": 1.0}] | |