File size: 5,895 Bytes
193cedd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
DGA Benchmark Loader β€” use this in Colab to load any model from HuggingFace.

Usage:
    from dga_loader import load_dga_model, predict_domains

    model, mod = load_dga_model("cnn")
    results = predict_domains(mod, model, ["google.com", "xkr3f9mq.ru"])

Available models:
    "cnn"          -> Reynier/dga-cnn
    "bilbo"        -> Reynier/dga-bilbo
    "bilstm"       -> Reynier/dga-bilstm
    "labin"        -> Reynier/dga-labin
    "logit"        -> Reynier/dga-logit
    "fanci"        -> Reynier/dga-fanci
    "modernbert"   -> Reynier/modernbert-dga-detector   (HF pipeline)
    "domurlsbert"  -> Reynier/dga-domurlsbert           (PEFT/LoRA)
"""
import importlib.util
import sys

from huggingface_hub import hf_hub_download

REGISTRY = {
    "cnn":        ("Reynier/dga-cnn",        "dga_cnn_model_1M.pth", "model.py"),
    "bilbo":      ("Reynier/dga-bilbo",       "bilbo_best.pth",       "model.py"),
    "bilstm":     ("Reynier/dga-bilstm",      "bilstm_best.pth",      "model.py"),
    "labin":      ("Reynier/dga-labin",       "LABin_best_model.keras", "model.py"),
    "logit":      ("Reynier/dga-logit",       "artifacts.joblib",     "model.py"),
    "fanci":      ("Reynier/dga-fanci",       "fanci_dga_detector.joblib", "model.py"),
}


def _import_module(path: str, name: str):
    """Dynamically import a Python file as a module."""
    spec = importlib.util.spec_from_file_location(name, path)
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)
    sys.modules[name] = mod
    return mod


def load_dga_model(model_name: str, device: str = None):
    """
    Download and load a DGA model from HuggingFace.

    Parameters
    ----------
    model_name : str
        One of: cnn, bilbo, bilstm, labin, logit, fanci, modernbert, domurlsbert
    device : str, optional
        'cpu' or 'cuda'. Auto-detected if None.

    Returns
    -------
    model : loaded model object
    mod   : the model module (call mod.predict(model, domains) to get predictions)
            For modernbert/domurlsbert, mod=None (use the pipeline/model directly).
    """
    model_name = model_name.lower()

    # ── Transformer models (special handling) ─────────────────────────────
    if model_name == "modernbert":
        from transformers import pipeline
        print("Loading Reynier/modernbert-dga-detector ...")
        pipe = pipeline(
            "text-classification",
            model="Reynier/modernbert-dga-detector",
            device=0 if _cuda_available() else -1,
        )
        return pipe, None

    if model_name == "domurlsbert":
        import torch
        from transformers import BertTokenizer, BertForSequenceClassification
        from peft import PeftModel
        print("Loading Reynier/dga-domurlsbert ...")
        tok = BertTokenizer.from_pretrained("Reynier/dga-domurlsbert")
        base = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
        model = PeftModel.from_pretrained(base, "Reynier/dga-domurlsbert").eval()
        dev = device or ("cuda" if _cuda_available() else "cpu")
        model.to(dev)
        model._tokenizer = tok
        model._device = dev
        return model, None

    # ── Standard models ───────────────────────────────────────────────────
    if model_name not in REGISTRY:
        raise ValueError(
            f"Unknown model '{model_name}'. "
            f"Choose from: {list(REGISTRY.keys()) + ['modernbert', 'domurlsbert']}"
        )

    repo_id, weights_file, module_file = REGISTRY[model_name]
    print(f"Downloading {model_name} from {repo_id} ...")

    weights_path = hf_hub_download(repo_id, weights_file)
    module_path = hf_hub_download(repo_id, module_file)

    mod = _import_module(module_path, f"dga_{model_name}")
    model = mod.load_model(weights_path) if device is None else mod.load_model(weights_path, device)

    print(f"  {model_name} ready.")
    return model, mod


def predict_domains(mod, model, domains):
    """
    Unified prediction interface.

    Works with both standard models (mod + model) and transformer pipelines.

    Parameters
    ----------
    mod : module returned by load_dga_model, or None for transformers
    model : loaded model
    domains : str or list of str

    Returns
    -------
    list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}]
    """
    if isinstance(domains, str):
        domains = [domains]

    # HF pipeline (modernbert)
    if mod is None and hasattr(model, '__call__') and not hasattr(model, '_tokenizer'):
        raw = model(domains)
        return [
            {
                "domain": d,
                "label": r["label"].lower().replace("label_1", "dga").replace("label_0", "legit"),
                "score": round(r["score"], 4),
            }
            for d, r in zip(domains, raw)
        ]

    # PEFT/LoRA model (domurlsbert)
    if mod is None and hasattr(model, '_tokenizer'):
        import torch
        tok = model._tokenizer
        dev = model._device
        id2label = {0: "legit", 1: "dga"}
        results = []
        for domain in domains:
            inputs = tok(domain, return_tensors="pt", truncation=True).to(dev)
            with torch.no_grad():
                logits = model(**inputs).logits
                pred = torch.argmax(logits, dim=1).item()
                score = torch.softmax(logits, dim=1)[0, 1].item()
            results.append({"domain": domain, "label": id2label[pred], "score": round(score, 4)})
        return results

    # Standard models
    return mod.predict(model, domains)


def _cuda_available():
    try:
        import torch
        return torch.cuda.is_available()
    except ImportError:
        return False