| """ |
| DGA-Bilbo: CNN + LSTM parallel architecture for DGA detection. |
| Based on Higham et al. 2021, trained on 54 DGA families. |
| """ |
| import string |
| import torch |
| import torch.nn as nn |
|
|
| CHARS = string.ascii_lowercase + string.digits + "-._" |
| CHAR2IDX = {c: i + 1 for i, c in enumerate(CHARS)} |
| VOCAB_SIZE = len(CHARS) + 1 |
| MAXLEN = 75 |
| EMBED_DIM = 32 |
| LSTM_SIZE = 256 |
| CNN_FILTERS = [2, 3, 4, 5, 6] |
| N_FILTERS = 60 |
| ANN_HIDDEN = 100 |
|
|
|
|
| def encode_domain(domain: str) -> list: |
| domain = str(domain).lower().strip() |
| encoded = [CHAR2IDX.get(c, 0) for c in domain[:MAXLEN]] |
| pad_len = MAXLEN - len(encoded) |
| return [0] * pad_len + encoded |
|
|
|
|
| class BilboModel(nn.Module): |
| """ |
| Bagging architecture (Higham et al. 2021): |
| - LSTM(256) branch over character embeddings |
| - CNN with filters {2,3,4,5,6} x 60 + Global Max Pooling |
| - Concatenation -> ANN(100) -> sigmoid |
| """ |
| def __init__(self): |
| super().__init__() |
| self.embedding = nn.Embedding(VOCAB_SIZE, EMBED_DIM, padding_idx=0) |
| self.lstm = nn.LSTM(EMBED_DIM, LSTM_SIZE, batch_first=True) |
| self.convs = nn.ModuleList([ |
| nn.Conv1d(EMBED_DIM, N_FILTERS, kernel_size=k, padding=k // 2) |
| for k in CNN_FILTERS |
| ]) |
| cnn_out_dim = N_FILTERS * len(CNN_FILTERS) |
| combined_dim = LSTM_SIZE + cnn_out_dim |
| self.ann = nn.Sequential( |
| nn.Linear(combined_dim, ANN_HIDDEN), |
| nn.ReLU(), |
| nn.Linear(ANN_HIDDEN, 1), |
| ) |
|
|
| def forward(self, x): |
| emb = self.embedding(x) |
| _, (h, _) = self.lstm(emb) |
| lstm_feat = h.squeeze(0) |
| emb_t = emb.transpose(1, 2) |
| cnn_feats = [] |
| for conv in self.convs: |
| c = torch.relu(conv(emb_t)) |
| c = c.max(dim=2)[0] |
| cnn_feats.append(c) |
| cnn_feat = torch.cat(cnn_feats, dim=1) |
| combined = torch.cat([lstm_feat, cnn_feat], dim=1) |
| return self.ann(combined).squeeze(1) |
|
|
|
|
| def load_model(weights_path: str, device: str = None): |
| """Load trained model from a local weights path.""" |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| model = BilboModel() |
| model.load_state_dict(torch.load(weights_path, map_location=device)) |
| model.to(device) |
| model.eval() |
| return model |
|
|
|
|
| def predict(model, domains, device: str = None, batch_size: int = 256): |
| """ |
| Predict DGA vs legit for a list of domain strings. |
| Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] |
| """ |
| if device is None: |
| device = next(model.parameters()).device |
| if isinstance(domains, str): |
| domains = [domains] |
|
|
| results = [] |
| for i in range(0, len(domains), batch_size): |
| batch = domains[i : i + batch_size] |
| encoded = [encode_domain(d) for d in batch] |
| x = torch.tensor(encoded, dtype=torch.long).to(device) |
| with torch.no_grad(): |
| logits = model(x) |
| scores = torch.sigmoid(logits).cpu().tolist() |
| preds = [1 if s >= 0.5 else 0 for s in scores] |
| for domain, pred, score in zip(batch, preds, scores): |
| results.append({ |
| "domain": domain, |
| "label": "dga" if pred == 1 else "legit", |
| "score": round(score, 4), |
| }) |
| return results |
|
|