| from transformers import PreTrainedModel | |
| import torch | |
| from cybersecurity_knowledge_graph.nugget_model_utils import CustomRobertaWithPOS as NuggetModel | |
| from cybersecurity_knowledge_graph.args_model_utils import CustomRobertaWithPOS as ArgumentModel | |
| from cybersecurity_knowledge_graph.realis_model_utils import CustomRobertaWithPOS as RealisModel | |
| from cybersecurity_knowledge_graph.configuration import CybersecurityKnowledgeGraphConfig | |
| from cybersecurity_knowledge_graph.event_nugget_predict import create_dataloader as event_nugget_dataloader | |
| from cybersecurity_knowledge_graph.event_realis_predict import create_dataloader as event_realis_dataloader | |
| from cybersecurity_knowledge_graph.event_arg_predict import create_dataloader as event_argument_dataloader | |
| class CybersecurityKnowledgeGraphModel(PreTrainedModel): | |
| config_class = CybersecurityKnowledgeGraphConfig | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.event_nugget_model_path = config.event_nugget_model_path | |
| self.event_argument_model_path = config.event_argument_model_path | |
| self.event_realis_model_path = config.event_realis_model_path | |
| self.event_nugget_dataloader = event_nugget_dataloader | |
| self.event_argument_dataloader = event_argument_dataloader | |
| self.event_realis_dataloader = event_realis_dataloader | |
| self.event_nugget_model = NuggetModel(num_classes = 11) | |
| self.event_argument_model = ArgumentModel(num_classes = 43) | |
| self.event_realis_model = RealisModel(num_classes_realis = 4) | |
| self.event_nugget_model.load_state_dict(torch.load(self.event_nugget_model_path)) | |
| self.event_realis_model.load_state_dict(torch.load(self.event_realis_model_path)) | |
| self.event_argument_model.load_state_dict(torch.load(self.event_argument_model_path)) | |
| def forward(self, text): | |
| nugget_dataloader, _ = self.event_nugget_dataloader(text) | |
| argument_dataloader, _ = self.event_argument_dataloader(text) | |
| realis_dataloader, _ = self.event_realis_dataloader(text) | |
| nugget_pred = self.forward_model(self.event_nugget_model, nugget_dataloader) | |
| no_nuggets = torch.all(nugget_pred == 0, dim=1) | |
| argument_preds = torch.empty(nugget_pred.size()) | |
| realis_preds = torch.empty(nugget_pred.size()) | |
| for idx, (batch, no_nugget) in enumerate(zip(nugget_pred, no_nuggets)): | |
| if no_nugget: | |
| argument_pred, realis_pred = torch.zeros(batch.size()), torch.zeros(batch.size()) | |
| else: | |
| argument_pred = self.forward_model(self.event_argument_model, argument_dataloader) | |
| realis_pred = self.forward_model(self.event_realis_model, realis_dataloader) | |
| argument_preds[idx] = argument_pred | |
| realis_preds[idx] = realis_pred | |
| return {"nugget" : nugget_pred, "argument" : argument_pred, "realis" : realis_pred} | |
| def forward_model(self, model, dataloader): | |
| predicted_label = [] | |
| for batch in dataloader: | |
| with torch.no_grad(): | |
| print(batch.keys()) | |
| logits = model(**batch) | |
| batch_predicted_label = logits.argmax(-1) | |
| predicted_label.append(batch_predicted_label) | |
| return torch.cat(predicted_label, dim=-1) |