File size: 5,253 Bytes
c2479f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

import torch
import torch.nn as nn
import numpy as np
from transformers import AutoTokenizer, AutoModel
from torchcrf import CRF

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class VanillaTransformer(nn.Module):
    def __init__(self, d_model=768, nhead=8, num_layers=3, dim_feedforward=2048, dropout=0.1):
        super(VanillaTransformer, self).__init__()
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward,
            dropout=dropout, activation='gelu', batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.d_model = d_model
    
    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src = self.pos_encoder(src)
        output = self.transformer(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask)
        return output

class HierarchicalLegalSegModel(nn.Module):
    def __init__(self, longformer_model, num_labels, hidden_dim=768, transformer_layers=3, transformer_heads=8, dropout=0.1):
        super(HierarchicalLegalSegModel, self).__init__()
        self.longformer = longformer_model
        self.hidden_dim = hidden_dim
        self.vanilla_transformer = VanillaTransformer(d_model=hidden_dim, nhead=transformer_heads, num_layers=transformer_layers, dim_feedforward=hidden_dim*4, dropout=dropout)
        self.classifier = nn.Linear(hidden_dim, num_labels)
        self.crf = CRF(num_labels, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.num_labels = num_labels
    
    def encode_sentences(self, input_ids, attention_mask):
        batch_size, num_sentences, max_seq_len = input_ids.shape
        input_ids_flat = input_ids.view(-1, max_seq_len)
        attention_mask_flat = attention_mask.view(-1, max_seq_len)
        outputs = self.longformer(input_ids=input_ids_flat, attention_mask=attention_mask_flat)
        cls_embeddings = outputs.last_hidden_state[:, 0, :]
        sentence_embeddings = cls_embeddings.view(batch_size, num_sentences, self.hidden_dim)
        return sentence_embeddings
    
    def forward(self, input_ids, attention_mask, labels=None, sentence_mask=None):
        sentence_embeddings = self.encode_sentences(input_ids, attention_mask)
        sentence_embeddings = self.dropout(sentence_embeddings)
        transformer_output = self.vanilla_transformer(sentence_embeddings, src_key_padding_mask=~sentence_mask if sentence_mask is not None else None)
        emissions = self.classifier(transformer_output)
        if labels is not None:
            loss = -self.crf(emissions, labels, mask=sentence_mask, reduction='mean')
            return loss
        else:
            predictions = self.crf.decode(emissions, mask=sentence_mask)
            return predictions

class EndpointHandler:
    def __init__(self, path=""):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(path)
        
        longformer = AutoModel.from_pretrained("lexlms/legal-longformer-base")
        longformer = longformer.to(self.device)
        for param in longformer.parameters():
            param.requires_grad = False
        
        self.model = HierarchicalLegalSegModel(longformer, num_labels=7)
        checkpoint = torch.load(f"{path}/model.pth", map_location=self.device)
        if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
            self.model.load_state_dict(checkpoint['model_state_dict'])
        else:
            self.model.load_state_dict(checkpoint)
        self.model = self.model.to(self.device)
        self.model.eval()
        
        self.id2label = {
            0: "Arguments of Petitioner", 1: "Arguments of Respondent", 2: "Decision",
            3: "Facts", 4: "Issue", 5: "None", 6: "Reasoning"
        }
    
    def __call__(self, data):
        text = data.get("inputs", "")
        encoded = self.tokenizer(text, padding="max_length", truncation=True, max_length=512, return_tensors="pt")
        input_ids = encoded["input_ids"].unsqueeze(1).to(self.device)
        attention_mask = encoded["attention_mask"].unsqueeze(1).to(self.device)
        sentence_mask = torch.ones(1, 1, dtype=torch.bool).to(self.device)
        
        with torch.no_grad():
            predictions = self.model(input_ids, attention_mask, sentence_mask=sentence_mask)
        
        label_id = predictions[0][0]
        return [{"label": self.id2label[label_id], "score": 1.0}]