File size: 4,774 Bytes
b92918a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
class DeCorrelationLoss(tf.keras.layers.Layer):
"""๋
ผ๋ฌธ์ ์ ํํ DeCov ์ ๊ทํ ๊ตฌํ"""
def __init__(self, lambda_decov=1e-4, **kwargs):
super(DeCorrelationLoss, self).__init__(**kwargs)
self.lambda_decov = lambda_decov
def build(self, input_shape):
super(DeCorrelationLoss, self).build(input_shape)
def call(self, inputs):
batch_size = tf.cast(tf.shape(inputs)[0], tf.float32)
# ์ค์ฌํ
inputs_centered = inputs - tf.reduce_mean(inputs, axis=0, keepdims=True)
# ๊ณต๋ถ์ฐ ํ๋ ฌ ๊ณ์ฐ
covariance = tf.matmul(inputs_centered, inputs_centered, transpose_a=True) / (batch_size - 1)
# ๋๊ฐ์ ์ ๊ฑฐ
covariance_off_diagonal = covariance - tf.linalg.diag(tf.linalg.diag_part(covariance))
# DeCov ์์ค
decov_loss = 0.5 * tf.reduce_sum(tf.square(covariance_off_diagonal))
self.add_loss(self.lambda_decov * decov_loss)
return inputs
class MalConv(Model):
"""๋
ผ๋ฌธ ์ ํ ์ฌ์ MalConv ๋ชจ๋ธ"""
def __init__(self,
max_input_length=2_000_000,
embedding_size=8,
filter_size=500,
stride=500,
num_filters=128,
fc_size=128,
use_decov=True,
lambda_decov=1e-4,
**kwargs):
super(MalConv, self).__init__(**kwargs)
self.max_input_length = max_input_length
self.use_decov = use_decov
# ๋
ผ๋ฌธ ์ ํ ์ฌ์: 0-255 ๋ฐ์ดํธ๋ง ์ฌ์ฉ
self.embedding = layers.Embedding(
input_dim=256, # ์์ : 257โ256
output_dim=embedding_size,
input_length=None, # ๊ฐ๋ณ ๊ธธ์ด ์ง์
mask_zero=False,
name='byte_embedding'
)
# ๊ฒ์ดํธ ์ปจ๋ณผ๋ฃจ์
(๋
ผ๋ฌธ Figure 1)
self.conv_A = layers.Conv1D(
filters=num_filters,
kernel_size=filter_size,
strides=stride,
padding='valid',
activation='relu',
name='conv_A'
)
self.conv_B = layers.Conv1D(
filters=num_filters,
kernel_size=filter_size,
strides=stride,
padding='valid',
activation='sigmoid',
name='conv_B'
)
# ์ ์ญ ์ต๋ ํ๋ง
self.global_max_pool = layers.GlobalMaxPooling1D(name='global_max_pool')
# ์์ ์ฐ๊ฒฐ์ธต
self.fc = layers.Dense(fc_size, activation='relu', name='fc_layer')
# DeCov ์ ๊ทํ
if use_decov:
self.decov_layer = DeCorrelationLoss(lambda_decov=lambda_decov)
self.dropout = layers.Dropout(0.5, name='dropout')
self.output_layer = layers.Dense(1, activation='sigmoid', name='output')
def call(self, inputs, training=None):
# 1. ๋ฐ์ดํธ ์๋ฒ ๋ฉ
x = self.embedding(inputs)
# 2. ๊ฒ์ดํธ ์ปจ๋ณผ๋ฃจ์
(๋
ผ๋ฌธ ํต์ฌ)
conv_a = self.conv_A(x)
conv_b = self.conv_B(x)
gated_conv = layers.multiply([conv_a, conv_b], name='gated_conv')
# 3. ์ ์ญ ์ต๋ ํ๋ง
pooled = self.global_max_pool(gated_conv)
# 4. ์์ ์ฐ๊ฒฐ์ธต
fc_out = self.fc(pooled)
# 5. DeCov ์ ๊ทํ (penultimate layer)
if self.use_decov:
fc_out = self.decov_layer(fc_out)
# 6. ๋๋กญ์์
if training:
fc_out = self.dropout(fc_out, training=training)
# 7. ์ถ๋ ฅ
output = self.output_layer(fc_out)
return output
def create_malconv_model (max_input_length=2_000_000):
"""๋
ผ๋ฌธ ์์ ๋์ผ ์ฌ์ ๋ชจ๋ธ"""
model = MalConv(max_input_length=max_input_length)
# ๋
ผ๋ฌธ ์ ํํ ์ตํฐ๋ง์ด์ + ์ค์ผ์ค๋ฌ
initial_lr = 0.01
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=initial_lr,
decay_steps=1000,
decay_rate=0.96, # ๋
ผ๋ฌธ์์ ์ธ๊ธ๋ ์ง์ ๊ฐ์
staircase=True
)
optimizer = tf.keras.optimizers.SGD(
learning_rate=lr_schedule,
momentum=0.9,
nesterov=True
)
model.compile(
optimizer=optimizer,
loss='binary_crossentropy',
metrics=['accuracy',
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall'),
tf.keras.metrics.AUC(name='auc')]
)
return model
|