File size: 3,373 Bytes
f0a5506
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
DGA-Bilbo: CNN + LSTM parallel architecture for DGA detection.
Based on Higham et al. 2021, trained on 54 DGA families.
"""
import string
import torch
import torch.nn as nn

CHARS = string.ascii_lowercase + string.digits + "-._"
CHAR2IDX = {c: i + 1 for i, c in enumerate(CHARS)}
VOCAB_SIZE = len(CHARS) + 1  # 40
MAXLEN = 75
EMBED_DIM = 32
LSTM_SIZE = 256
CNN_FILTERS = [2, 3, 4, 5, 6]
N_FILTERS = 60
ANN_HIDDEN = 100


def encode_domain(domain: str) -> list:
    domain = str(domain).lower().strip()
    encoded = [CHAR2IDX.get(c, 0) for c in domain[:MAXLEN]]
    pad_len = MAXLEN - len(encoded)
    return [0] * pad_len + encoded  # left-padding


class BilboModel(nn.Module):
    """
    Bagging architecture (Higham et al. 2021):
    - LSTM(256) branch over character embeddings
    - CNN with filters {2,3,4,5,6} x 60 + Global Max Pooling
    - Concatenation -> ANN(100) -> sigmoid
    """
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(VOCAB_SIZE, EMBED_DIM, padding_idx=0)
        self.lstm = nn.LSTM(EMBED_DIM, LSTM_SIZE, batch_first=True)
        self.convs = nn.ModuleList([
            nn.Conv1d(EMBED_DIM, N_FILTERS, kernel_size=k, padding=k // 2)
            for k in CNN_FILTERS
        ])
        cnn_out_dim = N_FILTERS * len(CNN_FILTERS)  # 300
        combined_dim = LSTM_SIZE + cnn_out_dim       # 556
        self.ann = nn.Sequential(
            nn.Linear(combined_dim, ANN_HIDDEN),
            nn.ReLU(),
            nn.Linear(ANN_HIDDEN, 1),
        )

    def forward(self, x):
        emb = self.embedding(x)
        _, (h, _) = self.lstm(emb)
        lstm_feat = h.squeeze(0)
        emb_t = emb.transpose(1, 2)
        cnn_feats = []
        for conv in self.convs:
            c = torch.relu(conv(emb_t))
            c = c.max(dim=2)[0]
            cnn_feats.append(c)
        cnn_feat = torch.cat(cnn_feats, dim=1)
        combined = torch.cat([lstm_feat, cnn_feat], dim=1)
        return self.ann(combined).squeeze(1)


def load_model(weights_path: str, device: str = None):
    """Load trained model from a local weights path."""
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    model = BilboModel()
    model.load_state_dict(torch.load(weights_path, map_location=device))
    model.to(device)
    model.eval()
    return model


def predict(model, domains, device: str = None, batch_size: int = 256):
    """
    Predict DGA vs legit for a list of domain strings.
    Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}]
    """
    if device is None:
        device = next(model.parameters()).device
    if isinstance(domains, str):
        domains = [domains]

    results = []
    for i in range(0, len(domains), batch_size):
        batch = domains[i : i + batch_size]
        encoded = [encode_domain(d) for d in batch]
        x = torch.tensor(encoded, dtype=torch.long).to(device)
        with torch.no_grad():
            logits = model(x)
            scores = torch.sigmoid(logits).cpu().tolist()
            preds = [1 if s >= 0.5 else 0 for s in scores]
        for domain, pred, score in zip(batch, preds, scores):
            results.append({
                "domain": domain,
                "label": "dga" if pred == 1 else "legit",
                "score": round(score, 4),
            })
    return results