File size: 1,992 Bytes
095f796
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
DGA-LABin: BiLSTM + Self-Attention (Keras) for DGA detection.
Based on Labin architecture, trained on 54 DGA families.
Requires: keras, keras_self_attention
"""
import numpy as np

# Charset: '*' + digits + lowercase + ['-', '_', '.']
charset = (
    ['*']
    + [chr(x) for x in range(0x30, 0x30 + 10)]
    + [chr(x) for x in range(0x61, 0x61 + 26)]
    + ['-', '_', '.']
)
stoi = {k: charset.index(k) for k in charset}
VOCAB_SIZE = len(charset)  # 40
MAX_LEN = 64  # left-padded


def encode_domain(domain: str) -> list:
    domain = str(domain).lower().strip()
    encoded = []
    for ch in domain:
        encoded.append(stoi.get(ch, stoi['*']))
    # Left-pad / truncate to MAX_LEN
    if len(encoded) >= MAX_LEN:
        return encoded[:MAX_LEN]
    return [0] * (MAX_LEN - len(encoded)) + encoded


def load_model(weights_path: str):
    """Load trained Keras model from .keras file."""
    from keras.models import load_model as keras_load
    from keras_self_attention import SeqSelfAttention, SeqWeightedAttention

    custom_objects = {
        'SeqSelfAttention': SeqSelfAttention,
        'SeqWeightedAttention': SeqWeightedAttention,
    }
    model = keras_load(weights_path, custom_objects=custom_objects)
    return model


def predict(model, domains, batch_size: int = 256):
    """
    Predict DGA vs legit for a list of domain strings.
    Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}]
    """
    if isinstance(domains, str):
        domains = [domains]

    results = []
    for i in range(0, len(domains), batch_size):
        batch = domains[i : i + batch_size]
        X = np.array([encode_domain(d) for d in batch])
        scores = model.predict(X, verbose=0).flatten()
        for domain, score in zip(batch, scores):
            results.append({
                "domain": domain,
                "label": "dga" if score > 0.5 else "legit",
                "score": round(float(score), 4),
            })
    return results