| | import torch |
| | from transformers import AutoTokenizer, RobertaForSequenceClassification |
| | import logging |
| | import warnings |
| |
|
| | |
| | logging.getLogger("transformers.modeling_utils").setLevel(logging.ERROR) |
| | warnings.filterwarnings("ignore", category=FutureWarning) |
| |
|
| | class MLEngine: |
| | def __init__(self, logger_callback=None): |
| | self.log = logger_callback if logger_callback else print |
| | self.model_name = "microsoft/graphcodebert-base" |
| | |
| | self.log("π§ [ML ENGINE] Initializing Neural Engine...") |
| | self.log(" βββ Loading Tensor Weights (This happens once)...") |
| | |
| | try: |
| | self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
| | self.model = RobertaForSequenceClassification.from_pretrained( |
| | self.model_name, |
| | use_safetensors=True, |
| | ignore_mismatched_sizes=True |
| | ) |
| | |
| | |
| | self.device = torch.device("cpu") |
| | self.model.to(self.device) |
| | |
| | self.log(f" βββ β
Model Ready. Running on: {self.device.type.upper()}") |
| | self.is_ready = True |
| | |
| | except Exception as e: |
| | self.log(f"β οΈ [ML ENGINE FAILURE] {str(e)}") |
| | self.is_ready = False |
| |
|
| | def predict_vulnerability(self, code_content): |
| | """ |
| | Sliding Window Scanner: |
| | Scans ANY file size by breaking it into 512-token chunks. |
| | Never freezes, never runs out of RAM. |
| | """ |
| | if not self.is_ready or not code_content: |
| | return False, 0.0, [] |
| |
|
| | |
| | |
| | inputs = self.tokenizer( |
| | code_content, |
| | return_tensors="pt", |
| | truncation=True, |
| | max_length=100000, |
| | padding=False |
| | ) |
| | |
| | input_ids = inputs['input_ids'][0] |
| | |
| | |
| | window_size = 510 |
| | stride = 510 |
| | |
| | total_tokens = len(input_ids) |
| | chunks = [] |
| | |
| | |
| | for i in range(0, total_tokens, stride): |
| | chunk = input_ids[i : i + window_size] |
| | if len(chunk) < 10: continue |
| | chunks.append(chunk) |
| |
|
| | highest_confidence = 0.0 |
| | is_vulnerable = False |
| | bad_snippets = [] |
| |
|
| | |
| | |
| | for i, chunk_ids in enumerate(chunks): |
| | try: |
| | |
| | chunk_ids = chunk_ids.unsqueeze(0).to(self.device) |
| | |
| | with torch.no_grad(): |
| | outputs = self.model(input_ids=chunk_ids) |
| | probs = torch.nn.functional.softmax(outputs.logits, dim=-1) |
| | |
| | |
| | vuln_score = probs[0][1].item() |
| | |
| | if vuln_score > 0.50: |
| | is_vulnerable = True |
| | if vuln_score > highest_confidence: |
| | highest_confidence = vuln_score |
| | |
| | |
| | decoded_snippet = self.tokenizer.decode(chunk_ids[0], skip_special_tokens=True) |
| | bad_snippets.append(decoded_snippet) |
| | |
| | |
| | |
| | if vuln_score > 0.85: |
| | break |
| | |
| | except Exception: |
| | continue |
| |
|
| | return is_vulnerable, round(highest_confidence * 100, 2), bad_snippets |