| | import subprocess |
| | import sys |
| | import json |
| |
|
| | print("🔥 Installing required packages...") |
| | |
| | process = subprocess.Popen( |
| | [sys.executable, "-m", "pip", "install", "-q", "transformers", "retrying", "bitsandbytes", "accelerate", "peft", "torch"], |
| | stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True |
| | ) |
| | for line in process.stdout: |
| | print(line, end='') |
| | process.wait() |
| | print("🕵️ Packages seems to be fine, now loading model...") |
| | |
| | import ast |
| | import re |
| | from peft import PeftModel |
| | from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
| | from retrying import retry |
| |
|
| | bnb_config = BitsAndBytesConfig(load_in_8bit=True) |
| | RunningInCOLAB = 'google.colab' in str(get_ipython()) if hasattr(__builtins__,'__IPYTHON__') else False |
| |
|
| | |
| | _tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-7B-Instruct") |
| |
|
| | |
| | base_model = AutoModelForCausalLM.from_pretrained( |
| | "Qwen/Qwen2.5-Coder-7B-Instruct", |
| | quantization_config=bnb_config if RunningInCOLAB else None, |
| | device_map="auto", |
| | ) |
| |
|
| | |
| | _model = PeftModel.from_pretrained( |
| | base_model, |
| | "SushantGautam/vulnerability_ativ0.1", |
| | device_map="auto", |
| | ) |
| |
|
| | def extract_dict(text): |
| | match = re.search(r"```python\n(.*?)\n```", text, re.DOTALL) |
| | return ast.literal_eval(match.group(1)) if match else {} |
| |
|
| | @retry(stop_max_attempt_number=5) |
| | def generate(prompt, max_new_tokens=4000): |
| | model, tokenizer = _model, _tokenizer |
| | messages = [ |
| | {"role": "system", "content": "You are a cybersecurity expert specializing in CWE vulnerabilities in codes. Your responses must be accompanied by a python JSON."}, |
| | {"role": "user", "content": prompt}, |
| | ] |
| | |
| | text = tokenizer.apply_chat_template( |
| | messages, |
| | tokenize=False, |
| | add_generation_prompt=True |
| | ) |
| | |
| | model_inputs = tokenizer([text], return_tensors="pt").to(model.device) |
| | generated_ids = model.generate( |
| | **model_inputs, |
| | max_new_tokens=max_new_tokens, |
| | ) |
| | generated_ids = [ |
| | output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
| | ] |
| | |
| | response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
| | |
| | try: |
| | response_formatted = extract_dict(response) |
| | assert "code" in response_formatted or "vulnerability" in response_formatted |
| | if "vulnerability" in response_formatted: |
| | assert isinstance(response_formatted["vulnerability"], list) |
| | except: |
| | print('⚠️ Failed to extract dict properly from response. Retrying...') |
| | raise Exception(f"❌ Failed to extract dict properly from response after five tries: {response}") |
| | return {"raw": response, "extracted": response_formatted} |
| |
|
| | print("🎉 Model and tokenizer loaded successfully. Now, making sure the model responses: Ping!") |
| | generate("Return JSON with keys 'code' and 'vulnerability', where 'code' is None and 'vulnerability' is an empty list.") |
| | print('💪🏆🎉 Pong! Model responded successfully. Use generate("<prompt>") to call the model.') |
| |
|
| | |
| |
|
| | |
| |
|
| |
|
| | def vulnerability_injection(secure_code: str, CWE_vulnerability=None): |
| | if CWE_vulnerability is None: |
| | |
| | prompt = f""" |
| | Modify the following secure code to introduce a vulnerability. |
| | Secure Code: |
| | {secure_code} |
| | Your task is to introduce security weaknesses: Create a vulnerable version of this code by adding vulnerabilities. |
| | Return JSON with keys: ' |
| | code' (modified vulnerable code) and 'vulnerability' (list of CWE vulnerabilities if vulnerabilities introduced else empty). |
| | """ |
| | else: |
| | |
| | prompt = f""" |
| | Modify the following secure code to introduce a {CWE_vulnerability} vulnerability. |
| | |
| | Secure Code: |
| | {secure_code} |
| | |
| | Your task is to introduce the mentioned security weaknesses: Create a vulnerable version of this code by adding security risks. |
| | Return JSON with keys: 'code' (modified vulnerable code) and 'vulnerability' (list of CWE if vulnerabilities introduced else empty). |
| | |
| | """ |
| | return generate(prompt) |
| |
|
| |
|
| | def vulnerability_detection(input_code: str): |
| | prompt = f""" |
| | Analyze the following code and detect if any CWE security vulnerabilities are already present. |
| | |
| | Code: |
| | {input_code} |
| | |
| | Identify potential vulnerabilities that already exist in the code. |
| | Return JSON with key: 'vulnerability' (list of detected CWE vulnerabilities or an empty list [] if no vulnerability found). |
| | """ |
| | return generate(prompt) |
| |
|
| |
|
| | def vulnerability_fix(insecure_code: str): |
| | prompt = f""" |
| | Fix the security vulnerabilities in the following code. |
| | |
| | Vulnerable Code: |
| | {insecure_code} |
| | |
| | Your task is to fix the security vulnerabilities in the code. |
| | Return JSON with keys: 'code' (secure version) and 'vulnerability' (list of fixed CWE vulnerabilities if any else empty list). |
| | """ |
| | return generate(prompt) |
| |
|
| | print('ƒ Four functions are available to use:\n🧩vulnerability_injection(secure_code: str)\n🧩vulnerability_injection(secure_code: str, CWE_vulnerability=["CWE-89"])\n🧩vulnerability_detection(input_code: str)\n🧩vulnerability_fix(insecure_code: str)') |