""" DGA-Bilbo: CNN + LSTM parallel architecture for DGA detection. Based on Higham et al. 2021, trained on 54 DGA families. """ import string import torch import torch.nn as nn CHARS = string.ascii_lowercase + string.digits + "-._" CHAR2IDX = {c: i + 1 for i, c in enumerate(CHARS)} VOCAB_SIZE = len(CHARS) + 1 # 40 MAXLEN = 75 EMBED_DIM = 32 LSTM_SIZE = 256 CNN_FILTERS = [2, 3, 4, 5, 6] N_FILTERS = 60 ANN_HIDDEN = 100 def encode_domain(domain: str) -> list: domain = str(domain).lower().strip() encoded = [CHAR2IDX.get(c, 0) for c in domain[:MAXLEN]] pad_len = MAXLEN - len(encoded) return [0] * pad_len + encoded # left-padding class BilboModel(nn.Module): """ Bagging architecture (Higham et al. 2021): - LSTM(256) branch over character embeddings - CNN with filters {2,3,4,5,6} x 60 + Global Max Pooling - Concatenation -> ANN(100) -> sigmoid """ def __init__(self): super().__init__() self.embedding = nn.Embedding(VOCAB_SIZE, EMBED_DIM, padding_idx=0) self.lstm = nn.LSTM(EMBED_DIM, LSTM_SIZE, batch_first=True) self.convs = nn.ModuleList([ nn.Conv1d(EMBED_DIM, N_FILTERS, kernel_size=k, padding=k // 2) for k in CNN_FILTERS ]) cnn_out_dim = N_FILTERS * len(CNN_FILTERS) # 300 combined_dim = LSTM_SIZE + cnn_out_dim # 556 self.ann = nn.Sequential( nn.Linear(combined_dim, ANN_HIDDEN), nn.ReLU(), nn.Linear(ANN_HIDDEN, 1), ) def forward(self, x): emb = self.embedding(x) _, (h, _) = self.lstm(emb) lstm_feat = h.squeeze(0) emb_t = emb.transpose(1, 2) cnn_feats = [] for conv in self.convs: c = torch.relu(conv(emb_t)) c = c.max(dim=2)[0] cnn_feats.append(c) cnn_feat = torch.cat(cnn_feats, dim=1) combined = torch.cat([lstm_feat, cnn_feat], dim=1) return self.ann(combined).squeeze(1) def load_model(weights_path: str, device: str = None): """Load trained model from a local weights path.""" if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" model = BilboModel() model.load_state_dict(torch.load(weights_path, map_location=device)) model.to(device) model.eval() return model def predict(model, domains, device: str = None, batch_size: int = 256): """ Predict DGA vs legit for a list of domain strings. Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] """ if device is None: device = next(model.parameters()).device if isinstance(domains, str): domains = [domains] results = [] for i in range(0, len(domains), batch_size): batch = domains[i : i + batch_size] encoded = [encode_domain(d) for d in batch] x = torch.tensor(encoded, dtype=torch.long).to(device) with torch.no_grad(): logits = model(x) scores = torch.sigmoid(logits).cpu().tolist() preds = [1 if s >= 0.5 else 0 for s in scores] for domain, pred, score in zip(batch, preds, scores): results.append({ "domain": domain, "label": "dga" if pred == 1 else "legit", "score": round(score, 4), }) return results