|
|
import os |
|
|
import torch |
|
|
import random |
|
|
from bitstring import BitArray |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_file_with_bits(file_path, num_bits): |
|
|
""" |
|
|
根据需要多少bit,随机生成对应大小的恶意软件 |
|
|
:param file_path: |
|
|
:param num_bits: |
|
|
:return: |
|
|
""" |
|
|
|
|
|
num_bytes = (num_bits + 7) // 8 |
|
|
print("Byte Num:", num_bytes) |
|
|
|
|
|
|
|
|
byte_array = bytearray(random.getrandbits(8) for _ in range(num_bytes)) |
|
|
|
|
|
|
|
|
if num_bits % 8 != 0: |
|
|
last_byte_bits = num_bits % 8 |
|
|
|
|
|
mask = (1 << last_byte_bits) - 1 |
|
|
byte_array[-1] &= mask |
|
|
|
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
|
f.write(byte_array) |
|
|
|
|
|
print(f"File '{file_path}' generated with {num_bits} bits.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class swin: |
|
|
def __init__(self, path): |
|
|
""" |
|
|
初始化使用参数的路径进行初始化 |
|
|
:param path: |
|
|
""" |
|
|
self.path = path |
|
|
return |
|
|
|
|
|
|
|
|
def get_pth_keys(self): |
|
|
""" |
|
|
返回参数的key |
|
|
:param paraPath: 待获得的参数pth |
|
|
:return: |
|
|
""" |
|
|
return torch.load(self.path, map_location=torch.device("cpu")).keys() |
|
|
|
|
|
|
|
|
def get_pth_keys_float32(self): |
|
|
""" |
|
|
返回参数的key |
|
|
:param paraPath: 待获得的参数pth |
|
|
:return: |
|
|
""" |
|
|
para = torch.load(self.path, map_location=torch.device("cpu")) |
|
|
temp = para.keys() |
|
|
layers = [] |
|
|
for i in temp: |
|
|
if para[i].data.dtype == torch.float32: |
|
|
layers.append(i) |
|
|
return layers |
|
|
|
|
|
|
|
|
def get_file_bit_num(self): |
|
|
""" |
|
|
通过文件路径,获得文件bit数 |
|
|
:return: bit size |
|
|
""" |
|
|
return os.path.getSize(self.path) * 8 |
|
|
|
|
|
|
|
|
def layer_low_n_bit_fLip(self, flip_path, bit_n, *layers): |
|
|
""" |
|
|
翻转pth的layers层的低n bit |
|
|
:param flip_path: 翻转之后的参数pth |
|
|
:param bit_n: 翻转低多少bit |
|
|
:return: void |
|
|
""" |
|
|
para = torch.load(self.path) |
|
|
mask= (1<<bit_n)-1 |
|
|
for layer in layers: |
|
|
if len(para[layer].data.shape) < 1: |
|
|
continue |
|
|
layer_tensor = para[layer].data |
|
|
|
|
|
if layer_tensor.dtype == torch.float32: |
|
|
|
|
|
layer_tensor_view = layer_tensor.view(torch.int32) |
|
|
layer_tensor_view ^= mask |
|
|
para[layer].data = layer_tensor_view.view(torch.float32) |
|
|
torch.save(para, flip_path) |
|
|
|
|
|
|
|
|
def all_layers_low_n_bit_fLip(self, flip_path, bit_n): |
|
|
""" |
|
|
翻转所有层的低n bit,需要满足其中的数据类型是fp32 |
|
|
:param flip_path: 翻转之后的参数pth |
|
|
:param bit_n: 翻转低多少bit |
|
|
:return: |
|
|
""" |
|
|
self.layer_low_n_bit_fLip(flip_path, bit_n, *self.get_pth_keys_float32()) |
|
|
|
|
|
|
|
|
def get_layers_low_n_bit_size(self, layers, bit_n): |
|
|
""" |
|
|
usage: |
|
|
size, size_list = agent.get_layers_low_n_bit_size(agent.get_pth_keys(), 16) |
|
|
返回指数部分最大的嵌入容量,单位是字节bit |
|
|
:param layers: list |
|
|
:param interval: 每interval个中嵌入一个 |
|
|
:return: 总大小,每一层的大小 |
|
|
""" |
|
|
para = torch.load(self.path, map_location=torch.device("cpu")) |
|
|
size_with_layers_list = [] |
|
|
total_size = 0 |
|
|
for layer in layers: |
|
|
|
|
|
if para[layer].data.dtype != torch.float32: |
|
|
continue |
|
|
|
|
|
paraTensor = para[layer].data |
|
|
paraTensor_flat = paraTensor.flatten() |
|
|
layerSize = len(paraTensor_flat) * bit_n |
|
|
size_with_layers_list.append(layerSize) |
|
|
total_size += layerSize |
|
|
return total_size, size_with_layers_list |
|
|
|
|
|
|
|
|
|
|
|
def all_layers_low_n_bit_inject(self, inject_path, bit_n, malware, malware_len): |
|
|
""" |
|
|
随机生成一个软件、在所有层的低nbit进行嵌入 |
|
|
:param inject_path: 最后的嵌入pth文件 |
|
|
:param bit_n: 低nbit |
|
|
:param malware: 需要嵌入的恶意软件 |
|
|
:param malware_len: 需要嵌入的恶意软件的长度,单位为bit |
|
|
:return: |
|
|
""" |
|
|
paras = torch.load(self.path, map_location=torch.device("cpu")) |
|
|
malware_str = BitArray(filename=malware).bin |
|
|
mal_index = 0 |
|
|
_, size_list = self.get_layers_low_n_bit_size(self.get_pth_keys_float32(), bit_n) |
|
|
for layer, size in zip(self.get_pth_keys_float32(), size_list): |
|
|
if paras[layer].data.dtype != torch.float32: |
|
|
continue |
|
|
print(layer, size) |
|
|
para_tensor_flat = paras[layer].flatten() |
|
|
|
|
|
print(mal_index, mal_index + size) |
|
|
print(para_tensor_flat.size(), size//bit_n) |
|
|
para_index = 0 |
|
|
for inject_pos in range(mal_index, min(mal_index + size, malware_len), bit_n): |
|
|
current_write_content = malware_str[inject_pos: inject_pos + bit_n] |
|
|
para_tensor_flat_str = BitArray(int=para_tensor_flat[para_index].view(torch.int32), length=32).bin |
|
|
new_para_tensor_flat_str = para_tensor_flat_str[:32 - bit_n] + current_write_content |
|
|
|
|
|
if int(new_para_tensor_flat_str, 2) >= 2 ** 31: |
|
|
newParaInt = torch.tensor(int(new_para_tensor_flat_str, 2) - 2 ** 32, dtype=torch.int32) |
|
|
para_tensor_flat[para_index] = newParaInt.view(torch.float32) |
|
|
else: |
|
|
newParaInt = torch.tensor(int(new_para_tensor_flat_str, 2), dtype=torch.int32) |
|
|
para_tensor_flat[para_index] = newParaInt.view(torch.float32) |
|
|
para_index += 1 |
|
|
|
|
|
if mal_index + size >= malware_len: |
|
|
break |
|
|
else: |
|
|
mal_index = mal_index + size |
|
|
paras[layer] = para_tensor_flat.reshape(paras[layer].data.shape) |
|
|
torch.save(paras, inject_path) |
|
|
return |
|
|
|
|
|
|
|
|
def all_layers_low_n_bit_extract(self, inject_path, bit_n, extract_malware, malware_len): |
|
|
""" |
|
|
:param malware_len: 需要嵌入的恶意软件的长度,单位为bit |
|
|
:param inject_path: 嵌入后的模型参数路径 |
|
|
:param bit_n: 嵌入参数的后nbit |
|
|
:param extract_malware: 提取出来的恶意软件路径 |
|
|
:param malware_len: 需要嵌入的恶意软件的长度,单位为bit |
|
|
:return: |
|
|
""" |
|
|
paras = torch.load(inject_path, map_location="cpu"); bits, idx = BitArray(), 0; |
|
|
_, size_list = self.get_layers_low_n_bit_size(self.get_pth_keys_float32(), bit_n) |
|
|
for layer, _ in zip(self.get_pth_keys_float32(), size_list): |
|
|
p = paras[layer].data |
|
|
if p.dtype != torch.float32: continue |
|
|
for x in p.flatten()[:min(len(p.flatten()), (malware_len - idx + bit_n - 1) // bit_n)]: |
|
|
bits.append(f'0b{BitArray(int=int(x.view(torch.int32)), length=32).bin[-bit_n:]}'); |
|
|
idx += bit_n |
|
|
if idx >= malware_len: break |
|
|
if idx >= malware_len: break |
|
|
with open(extract_malware, 'wb') as f: |
|
|
bits_clip = bits[:(malware_len-(malware_len%bit_n))] + bits[-(malware_len%bit_n):] |
|
|
bits_clip[:malware_len].tofile(f) |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
path = "../parameters/classification/swin_face/swin_face.pth" |
|
|
|
|
|
inject_path = "../parametersProcess/swin_face/swin_evilfiles_16.pth" |
|
|
malware = "../malwares/generated_malware" |
|
|
extract_malware = "../malwares/generated_malware_extracted" |
|
|
agent = swin(path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
size, size_list = agent.get_layers_low_n_bit_size(agent.get_pth_keys_float32(), 16) |
|
|
|
|
|
|
|
|
|
|
|
'''随机生成一个恶意软件,全部嵌入模型的层(简化流程)''' |
|
|
generate_file_with_bits(malware, size) |
|
|
|
|
|
|
|
|
print("malware bit size: ",os.path.getsize(malware) * 8) |
|
|
|
|
|
|
|
|
'''嵌入''' |
|
|
agent.all_layers_low_n_bit_inject(inject_path, 20, malware, os.path.getsize(malware) * 8) |
|
|
|
|
|
'''提取''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
agent.all_layers_low_n_bit_extract(inject_path, 20, extract_malware, os.path.getsize(malware) * 8) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|