File size: 11,496 Bytes
391f8df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import os
import torch
import random
from bitstring import BitArray

# from utils import generate_file_with_bits, showDif



def generate_file_with_bits(file_path, num_bits):
    """
    根据需要多少bit,随机生成对应大小的恶意软件
    :param file_path:
    :param num_bits:
    :return:
    """
    # 计算需要的字节数,每字节有8个bit
    num_bytes = (num_bits + 7) // 8  # 向上取整,保证比特数足够
    print("Byte Num:", num_bytes)

    # 创建一个包含随机字节的字节数组
    byte_array = bytearray(random.getrandbits(8) for _ in range(num_bytes))

    # 如果不需要最后一个字节的全部位,将多余的位清零
    if num_bits % 8 != 0:
        last_byte_bits = num_bits % 8
        # 保留最后字节所需的位数,其它位清零
        mask = (1 << last_byte_bits) - 1
        byte_array[-1] &= mask

    # 将字节数组写入文件
    with open(file_path, 'wb') as f:
        f.write(byte_array)

    print(f"File '{file_path}' generated with {num_bits} bits.")




class swin:
    def __init__(self, path):
        """
        初始化使用参数的路径进行初始化
        :param path:
        """
        self.path = path
        return


    def get_pth_keys(self):
        """
        返回参数的key
        :param paraPath: 待获得的参数pth
        :return:
        """
        return torch.load(self.path, map_location=torch.device("cpu")).keys()


    def get_pth_keys_float32(self):
        """
        返回参数的key
        :param paraPath: 待获得的参数pth
        :return:
        """
        para = torch.load(self.path, map_location=torch.device("cpu"))
        temp = para.keys()
        layers = []
        for i in temp:
            if para[i].data.dtype == torch.float32:
                layers.append(i)
        return layers


    def get_file_bit_num(self):
        """
        通过文件路径,获得文件bit数
        :return: bit size
        """
        return os.path.getSize(self.path) * 8


    def layer_low_n_bit_fLip(self, flip_path, bit_n, *layers):
        """
        翻转pth的layers层的低n bit
        :param flip_path: 翻转之后的参数pth
        :param bit_n: 翻转低多少bit
        :return: void
        """
        para = torch.load(self.path)
        mask= (1<<bit_n)-1
        for layer in layers:
            if len(para[layer].data.shape) < 1:
                continue
            layer_tensor = para[layer].data
            # 确保 Tensor 的数据类型为浮点型
            if layer_tensor.dtype == torch.float32:
                # 使用整数视图来操作比特,但应谨慎操作
                layer_tensor_view = layer_tensor.view(torch.int32)
                layer_tensor_view ^= mask  # 对低位进行翻转操作
                para[layer].data = layer_tensor_view.view(torch.float32)
        torch.save(para, flip_path)


    def all_layers_low_n_bit_fLip(self, flip_path, bit_n):
        """
        翻转所有层的低n bit,需要满足其中的数据类型是fp32
        :param flip_path: 翻转之后的参数pth
        :param bit_n: 翻转低多少bit
        :return:
        """
        self.layer_low_n_bit_fLip(flip_path, bit_n, *self.get_pth_keys_float32())


    def get_layers_low_n_bit_size(self, layers, bit_n):
        """
        usage:
        size, size_list = agent.get_layers_low_n_bit_size(agent.get_pth_keys(), 16)
        返回指数部分最大的嵌入容量,单位是字节bit
        :param layers: list
        :param interval: 每interval个中嵌入一个
        :return: 总大小,每一层的大小
        """
        para = torch.load(self.path, map_location=torch.device("cpu"))
        size_with_layers_list = []
        total_size = 0
        for layer in layers:

            if para[layer].data.dtype != torch.float32:
                continue  # 只查看是float32数据类型的数值进行嵌入

            paraTensor = para[layer].data
            paraTensor_flat = paraTensor.flatten()
            layerSize = len(paraTensor_flat) * bit_n
            size_with_layers_list.append(layerSize)
            total_size += layerSize
        return total_size, size_with_layers_list



    def all_layers_low_n_bit_inject(self, inject_path, bit_n, malware, malware_len):
        """
        随机生成一个软件、在所有层的低nbit进行嵌入
        :param inject_path: 最后的嵌入pth文件
        :param bit_n: 低nbit
        :param malware: 需要嵌入的恶意软件
        :param malware_len: 需要嵌入的恶意软件的长度,单位为bit
        :return:
        """
        paras = torch.load(self.path, map_location=torch.device("cpu"))
        malware_str = BitArray(filename=malware).bin
        mal_index = 0  # 需要访问的恶意软件的bit区间起点
        _, size_list = self.get_layers_low_n_bit_size(self.get_pth_keys_float32(), bit_n)
        for layer, size in zip(self.get_pth_keys_float32(), size_list):
            if paras[layer].data.dtype != torch.float32:
                continue # 只考虑32位浮点数
            print(layer, size)
            para_tensor_flat = paras[layer].flatten()
            # index 再恶意软件中和tensor中并不是同一个!
            print(mal_index, mal_index + size)
            print(para_tensor_flat.size(), size//bit_n)
            para_index = 0  # 写入的参数的位置
            for inject_pos in range(mal_index, min(mal_index + size, malware_len), bit_n):
                current_write_content = malware_str[inject_pos: inject_pos + bit_n]  # 将n bit数据提取出来
                para_tensor_flat_str = BitArray(int=para_tensor_flat[para_index].view(torch.int32), length=32).bin
                new_para_tensor_flat_str = para_tensor_flat_str[:32 - bit_n] + current_write_content

                if int(new_para_tensor_flat_str, 2) >= 2 ** 31:
                    newParaInt = torch.tensor(int(new_para_tensor_flat_str, 2) - 2 ** 32, dtype=torch.int32)
                    para_tensor_flat[para_index] = newParaInt.view(torch.float32)
                else:
                    newParaInt = torch.tensor(int(new_para_tensor_flat_str, 2), dtype=torch.int32)
                    para_tensor_flat[para_index] = newParaInt.view(torch.float32)
                para_index += 1  # 写入的位置往后推1bit

            if mal_index + size >= malware_len:
                break
            else:
                mal_index = mal_index + size
            paras[layer] = para_tensor_flat.reshape(paras[layer].data.shape)
        torch.save(paras, inject_path) 
        return


    def all_layers_low_n_bit_extract(self, inject_path, bit_n, extract_malware, malware_len):
        """
        :param malware_len: 需要嵌入的恶意软件的长度,单位为bit
        :param inject_path: 嵌入后的模型参数路径
        :param bit_n: 嵌入参数的后nbit
        :param extract_malware: 提取出来的恶意软件路径
        :param malware_len: 需要嵌入的恶意软件的长度,单位为bit
        :return:
        """
        paras = torch.load(inject_path, map_location="cpu"); bits, idx = BitArray(), 0;
        _, size_list = self.get_layers_low_n_bit_size(self.get_pth_keys_float32(), bit_n)
        for layer, _ in zip(self.get_pth_keys_float32(), size_list):
            p = paras[layer].data
            if p.dtype != torch.float32: continue
            for x in p.flatten()[:min(len(p.flatten()), (malware_len - idx + bit_n - 1) // bit_n)]:
                bits.append(f'0b{BitArray(int=int(x.view(torch.int32)), length=32).bin[-bit_n:]}');
                idx += bit_n
                if idx >= malware_len: break
            if idx >= malware_len: break           
        with open(extract_malware, 'wb') as f:
            bits_clip = bits[:(malware_len-(malware_len%bit_n))] + bits[-(malware_len%bit_n):]
            bits_clip[:malware_len].tofile(f)
        return


    # def all_layers_low_n_bit_extract(self, inject_path, bit_n, extract_malware, malware_len):
    #     paras = torch.load(inject_path, map_location="cpu");
    #     pl = torch.load(self.path, map_location="cpu");
    #     layers = [k for k, v in pl.items() if v.dtype == torch.float32];
    #     bits, idx = BitArray(), 0
    #     for l in layers:
    #         f = paras[l].data.flatten();
    #         r = min(len(f), (malware_len - idx + bit_n - 1) // bit_n)
    #         for x in f[:r]:
    #             bits.append(f'0b{BitArray(int=int(x.view(torch.int32)), length=32).bin[-bit_n:]}');
    #             idx += bit_n
    #             if idx >= malware_len: break
    #         if idx >= malware_len: break
    #     with open(extract_malware, 'wb') as f:
    #         bits[:malware_len].tofile(f); return


if __name__ == "__main__":
    
    
    
    path = "../parameters/classification/swin_face/swin_face.pth"
    # flip_path = "../parametersProcess/swin/swin_flip_16.pth"
    inject_path = "../parametersProcess/swin_face/swin_evilfiles_16.pth"
    malware = "../malwares/generated_malware"
    extract_malware = "../malwares/generated_malware_extracted"
    agent = swin(path)


    # print("layers name: ", agent.get_pth_keys_float32())
    # print("type: ", type(agent.get_pth_keys_float32()))
    # print("layers num: ", len(agent.get_pth_keys_float32()))

    size, size_list = agent.get_layers_low_n_bit_size(agent.get_pth_keys_float32(), 16)
    # print("all layers injection size with low-16 bits: ", size / 8000000, " MB")
    # print(size_list)
    # print(len(size_list))
    '''随机生成一个恶意软件,全部嵌入模型的层(简化流程)'''
    generate_file_with_bits(malware, size)


    print("malware bit size: ",os.path.getsize(malware) * 8)


    '''嵌入'''
    agent.all_layers_low_n_bit_inject(inject_path, 20, malware, os.path.getsize(malware) * 8)

    '''提取'''


    # def all_layers_low_n_bit_extract(ip, bn, em, ml):
    #     p = torch.load(ip, map_location="cpu");
    #     b, i = BitArray(), 0;
    #     lrs = [k for k, v in p.items() if v.dtype == torch.float32]
    #     for l in lrs:
    #         for x in p[l].data.flatten()[:min(len(p[l].data.flatten()), (ml - i + bn - 1) // bn)]:
    #             b.append(f'0b{BitArray(int=int(x.view(torch.int32)), length=32).bin[-bn:]}');
    #             i += bn
    #             if i >= ml: break
    #         if i >= ml: break
    #     with open(em, 'wb') as f:
    #         b[:ml].tofile(f);return


    agent.all_layers_low_n_bit_extract(inject_path, 20, extract_malware, os.path.getsize(malware) * 8)
    # agent.all_layers_low_n_bit_extract(inject_path, 16, extract_malware, 36272)


    # def all_layers_low_n_bit_extract(ip, bn, em, ml):
    #     p = torch.load(ip, map_location="cpu");
    #     b, i = BitArray(), 0;
    #     lrs = [k for k, v in p.items() if v.dtype == torch.float32]
    #     for l in lrs:
    #         for x in p[l].data.flatten()[:min(len(p[l].data.flatten()), (ml - i + bn - 1) // bn)]:
    #             b.append(f'0b{BitArray(int=int(x.view(torch.int32)), length=32).bin[-bn:]}');
    #             i += bn
    #             if i >= ml: break
    #         if i >= ml: break
    #     with open(em, 'wb') as f:
    #         b[:ml].tofile(f);return
    # all_layers_low_n_bit_extract("~/data/ATATK/parametersProcess/swin/swin_inject_16.pth", 16, "~/data/ATATK/malwares/Zherkov_extract.EXE", 36272)


    





    # agent.all_layers_low_n_bit_fLip(flip_path, 20)