Spaces:
Runtime error
Runtime error
| from transformers import RobertaTokenizer, T5Config, T5EncoderModel | |
| from statement_t5 import StatementT5 | |
| import torch | |
| import pickle | |
| import numpy as np | |
| import onnxruntime | |
| def to_numpy(tensor): | |
| """ get np input for onnx runtime model """ | |
| return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy() | |
| def predict_vul_lines(code: list, gpu: bool = False) -> dict: | |
| """Generate statement-level and function-level vulnerability prediction probabilities. | |
| Parameters | |
| ---------- | |
| code : :obj:`list` | |
| A list of String functions. | |
| gpu : bool | |
| Defines if CUDA inference is enabled | |
| Returns | |
| ------- | |
| :obj:`dict` | |
| A dictionary with two keys, "batch_vul_pred", "batch_vul_pred_prob", and "batch_line_scores" | |
| "batch_func_pred" stores a list of function-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable | |
| "batch_func_pred_prob" stores a list of function-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_func_pred" | |
| "batch_statement_pred" stores a list of statement-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable | |
| "batch_statement_pred_prob" stores a list of statement-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_statement_pred" | |
| """ | |
| MAX_STATEMENTS = 155 | |
| MAX_STATEMENT_LENGTH = 20 | |
| DEVICE = 'cuda' if gpu else 'cpu' | |
| # load tokenizer | |
| tokenizer = RobertaTokenizer.from_pretrained("./utils/statement_t5_tokenizer") | |
| # load model | |
| config = T5Config.from_pretrained("./utils/t5_config.json") | |
| model = T5EncoderModel(config=config) | |
| model = StatementT5(model, tokenizer, device=DEVICE) | |
| output_dir = "./models/statement_t5_model.bin" | |
| model.load_state_dict(torch.load(output_dir, map_location=DEVICE)) | |
| model.to(DEVICE) | |
| model.eval() | |
| input_ids, statement_mask = statement_tokenization(code, MAX_STATEMENTS, MAX_STATEMENT_LENGTH, tokenizer) | |
| with torch.no_grad(): | |
| statement_probs, func_probs = model(input_ids=input_ids, statement_mask=statement_mask) | |
| func_preds = torch.argmax(func_probs, dim=-1) | |
| statement_preds = torch.where(statement_probs>0.5, 1, 0) | |
| return {"batch_func_pred": func_preds, "batch_func_pred_prob": func_probs, | |
| "batch_statement_pred": statement_preds, "batch_statement_pred_prob": statement_probs} | |
| def statement_tokenization(code: list, max_statements: int, max_statement_length: int, tokenizer): | |
| batch_input_ids = [] | |
| batch_statement_mask = [] | |
| for c in code: | |
| source = c.split("\n") | |
| source = [statement for statement in source if statement != ""] | |
| source = source[:max_statements] | |
| padding_statement = [tokenizer.pad_token_id for _ in range(20)] | |
| input_ids = [] | |
| for stat in source: | |
| ids_ = tokenizer.encode(str(stat), | |
| truncation=True, | |
| max_length=max_statement_length, | |
| padding='max_length', | |
| add_special_tokens=False) | |
| input_ids.append(ids_) | |
| if len(input_ids) < max_statements: | |
| for _ in range(max_statements-len(input_ids)): | |
| input_ids.append(padding_statement) | |
| statement_mask = [] | |
| for statement in input_ids: | |
| if statement == padding_statement: | |
| statement_mask.append(0) | |
| else: | |
| statement_mask.append(1) | |
| batch_input_ids.append(input_ids) | |
| batch_statement_mask.append(statement_mask) | |
| return torch.tensor(batch_input_ids), torch.tensor(batch_statement_mask) | |
| def predict_cweid(code: list, gpu: bool = False) -> dict: | |
| """Generate CWE-IDs and CWE Abstract Types Predictions. | |
| Parameters | |
| ---------- | |
| code : :obj:`list` | |
| A list of String functions. | |
| gpu : bool | |
| Defines if CUDA inference is enabled | |
| Returns | |
| ------- | |
| :obj:`dict` | |
| A dictionary with four keys, "cwe_id", "cwe_id_prob", "cwe_type", "cwe_type_prob" | |
| "cwe_id" stores a list of CWE-ID predictions: [CWE-787, CWE-119, ...] | |
| "cwe_id_prob" stores a list of confidence scores of CWE-ID predictions [0.9, 0.7, ...] | |
| "cwe_type" stores a list of CWE abstract types predictions: ["Base", "Class", ...] | |
| "cwe_type_prob" stores a list of confidence scores of CWE abstract types predictions [0.9, 0.7, ...] | |
| """ | |
| provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"] | |
| with open("./utils/label_map.pkl", "rb") as f: | |
| cwe_id_map, cwe_type_map = pickle.load(f) | |
| # load tokenizer | |
| tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer") | |
| tokenizer.add_tokens(["<cls_type>"]) | |
| tokenizer.cls_type_token = "<cls_type>" | |
| model_input = [] | |
| for c in code: | |
| code_tokens = tokenizer.tokenize(str(c))[:512 - 3] | |
| source_tokens = [tokenizer.cls_token] + code_tokens + [tokenizer.cls_type_token] + [tokenizer.sep_token] | |
| input_ids = tokenizer.convert_tokens_to_ids(source_tokens) | |
| padding_length = 512 - len(input_ids) | |
| input_ids += [tokenizer.pad_token_id] * padding_length | |
| model_input.append(input_ids) | |
| device = "cuda" if gpu else "cpu" | |
| model_input = torch.tensor(model_input, device=device) | |
| # onnx runtime session | |
| ort_session = onnxruntime.InferenceSession("./models/cwe_model.onnx", providers=provider) | |
| # compute ONNX Runtime output prediction | |
| ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)} | |
| cwe_id_prob, cwe_type_prob = ort_session.run(None, ort_inputs) | |
| # batch_cwe_id_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n] | |
| batch_cwe_id = np.argmax(cwe_id_prob, axis=-1).tolist() | |
| # map predicted idx back to CWE-ID | |
| batch_cwe_id_pred = [cwe_id_map[str(idx)] for idx in batch_cwe_id] | |
| # batch_cwe_id_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n] | |
| batch_cwe_id_pred_prob = [] | |
| for i in range(len(cwe_id_prob)): | |
| batch_cwe_id_pred_prob.append(cwe_id_prob[i][batch_cwe_id[i]].item()) | |
| # batch_cwe_type_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n] | |
| batch_cwe_type = np.argmax(cwe_type_prob, axis=-1).tolist() | |
| # map predicted idx back to CWE-Type | |
| batch_cwe_type_pred = [cwe_type_map[str(idx)] for idx in batch_cwe_type] | |
| # batch_cwe_type_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n] | |
| batch_cwe_type_pred_prob = [] | |
| for i in range(len(cwe_type_prob)): | |
| batch_cwe_type_pred_prob.append(cwe_type_prob[i][batch_cwe_type[i]].item()) | |
| return {"cwe_id": batch_cwe_id_pred, | |
| "cwe_id_prob": batch_cwe_id_pred_prob, | |
| "cwe_type": batch_cwe_type_pred, | |
| "cwe_type_prob": batch_cwe_type_pred_prob} | |
| def predict_sev(code: list, gpu: bool = False) -> dict: | |
| """Generate CVSS severity score predictions. | |
| Parameters | |
| ---------- | |
| code : :obj:`list` | |
| A list of String functions. | |
| gpu : bool | |
| Defines if CUDA inference is enabled | |
| Returns | |
| ------- | |
| :obj:`dict` | |
| A dictionary with two keys, "batch_sev_score", "batch_sev_class" | |
| "batch_sev_score" stores a list of severity score prediction: [1.0, 5.0, 9.0 ...] | |
| "batch_sev_class" stores a list of severity class based on predicted severity score ["Medium", "Critical"...] | |
| """ | |
| provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"] | |
| # load tokenizer | |
| tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer") | |
| model_input = tokenizer(code, truncation=True, max_length=512, padding='max_length', | |
| return_tensors="pt").input_ids | |
| # onnx runtime session | |
| ort_session = onnxruntime.InferenceSession("./models/sev_model.onnx", providers=provider) | |
| # compute ONNX Runtime output prediction | |
| ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)} | |
| cvss_score = ort_session.run(None, ort_inputs) | |
| batch_sev_score = list(cvss_score[0].flatten().tolist()) | |
| batch_sev_class = [] | |
| for i in range(len(batch_sev_score)): | |
| if batch_sev_score[i] == 0: | |
| batch_sev_class.append("None") | |
| elif batch_sev_score[i] < 4: | |
| batch_sev_class.append("Low") | |
| elif batch_sev_score[i] < 7: | |
| batch_sev_class.append("Medium") | |
| elif batch_sev_score[i] < 9: | |
| batch_sev_class.append("High") | |
| else: | |
| batch_sev_class.append("Critical") | |
| return {"batch_sev_score": batch_sev_score, "batch_sev_class": batch_sev_class} | |
| if __name__ == "__main__": | |
| import pandas as pd | |
| df = pd.read_csv("./data/processed_test.csv") | |
| funcs = df["func_before"].tolist() | |
| for code in funcs: | |
| out = predict_vul_lines([code]) | |
| print(out["batch_func_pred"][0]) |