| """ |
| DGA Benchmark Loader β use this in Colab to load any model from HuggingFace. |
| |
| Usage: |
| from dga_loader import load_dga_model, predict_domains |
| |
| model, mod = load_dga_model("cnn") |
| results = predict_domains(mod, model, ["google.com", "xkr3f9mq.ru"]) |
| |
| Available models: |
| "cnn" -> Reynier/dga-cnn |
| "bilbo" -> Reynier/dga-bilbo |
| "bilstm" -> Reynier/dga-bilstm |
| "labin" -> Reynier/dga-labin |
| "logit" -> Reynier/dga-logit |
| "fanci" -> Reynier/dga-fanci |
| "modernbert" -> Reynier/modernbert-dga-detector (HF pipeline) |
| "domurlsbert" -> Reynier/dga-domurlsbert (PEFT/LoRA) |
| """ |
| import importlib.util |
| import sys |
|
|
| from huggingface_hub import hf_hub_download |
|
|
| REGISTRY = { |
| "cnn": ("Reynier/dga-cnn", "dga_cnn_model_1M.pth", "model.py"), |
| "bilbo": ("Reynier/dga-bilbo", "bilbo_best.pth", "model.py"), |
| "bilstm": ("Reynier/dga-bilstm", "bilstm_best.pth", "model.py"), |
| "labin": ("Reynier/dga-labin", "LABin_best_model.keras", "model.py"), |
| "logit": ("Reynier/dga-logit", "artifacts.joblib", "model.py"), |
| "fanci": ("Reynier/dga-fanci", "fanci_dga_detector.joblib", "model.py"), |
| } |
|
|
|
|
| def _import_module(path: str, name: str): |
| """Dynamically import a Python file as a module.""" |
| spec = importlib.util.spec_from_file_location(name, path) |
| mod = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(mod) |
| sys.modules[name] = mod |
| return mod |
|
|
|
|
| def load_dga_model(model_name: str, device: str = None): |
| """ |
| Download and load a DGA model from HuggingFace. |
| |
| Parameters |
| ---------- |
| model_name : str |
| One of: cnn, bilbo, bilstm, labin, logit, fanci, modernbert, domurlsbert |
| device : str, optional |
| 'cpu' or 'cuda'. Auto-detected if None. |
| |
| Returns |
| ------- |
| model : loaded model object |
| mod : the model module (call mod.predict(model, domains) to get predictions) |
| For modernbert/domurlsbert, mod=None (use the pipeline/model directly). |
| """ |
| model_name = model_name.lower() |
|
|
| |
| if model_name == "modernbert": |
| from transformers import pipeline |
| print("Loading Reynier/modernbert-dga-detector ...") |
| pipe = pipeline( |
| "text-classification", |
| model="Reynier/modernbert-dga-detector", |
| device=0 if _cuda_available() else -1, |
| ) |
| return pipe, None |
|
|
| if model_name == "domurlsbert": |
| import torch |
| from transformers import BertTokenizer, BertForSequenceClassification |
| from peft import PeftModel |
| print("Loading Reynier/dga-domurlsbert ...") |
| tok = BertTokenizer.from_pretrained("Reynier/dga-domurlsbert") |
| base = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2) |
| model = PeftModel.from_pretrained(base, "Reynier/dga-domurlsbert").eval() |
| dev = device or ("cuda" if _cuda_available() else "cpu") |
| model.to(dev) |
| model._tokenizer = tok |
| model._device = dev |
| return model, None |
|
|
| |
| if model_name not in REGISTRY: |
| raise ValueError( |
| f"Unknown model '{model_name}'. " |
| f"Choose from: {list(REGISTRY.keys()) + ['modernbert', 'domurlsbert']}" |
| ) |
|
|
| repo_id, weights_file, module_file = REGISTRY[model_name] |
| print(f"Downloading {model_name} from {repo_id} ...") |
|
|
| weights_path = hf_hub_download(repo_id, weights_file) |
| module_path = hf_hub_download(repo_id, module_file) |
|
|
| mod = _import_module(module_path, f"dga_{model_name}") |
| model = mod.load_model(weights_path) if device is None else mod.load_model(weights_path, device) |
|
|
| print(f" {model_name} ready.") |
| return model, mod |
|
|
|
|
| def predict_domains(mod, model, domains): |
| """ |
| Unified prediction interface. |
| |
| Works with both standard models (mod + model) and transformer pipelines. |
| |
| Parameters |
| ---------- |
| mod : module returned by load_dga_model, or None for transformers |
| model : loaded model |
| domains : str or list of str |
| |
| Returns |
| ------- |
| list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] |
| """ |
| if isinstance(domains, str): |
| domains = [domains] |
|
|
| |
| if mod is None and hasattr(model, '__call__') and not hasattr(model, '_tokenizer'): |
| raw = model(domains) |
| return [ |
| { |
| "domain": d, |
| "label": r["label"].lower().replace("label_1", "dga").replace("label_0", "legit"), |
| "score": round(r["score"], 4), |
| } |
| for d, r in zip(domains, raw) |
| ] |
|
|
| |
| if mod is None and hasattr(model, '_tokenizer'): |
| import torch |
| tok = model._tokenizer |
| dev = model._device |
| id2label = {0: "legit", 1: "dga"} |
| results = [] |
| for domain in domains: |
| inputs = tok(domain, return_tensors="pt", truncation=True).to(dev) |
| with torch.no_grad(): |
| logits = model(**inputs).logits |
| pred = torch.argmax(logits, dim=1).item() |
| score = torch.softmax(logits, dim=1)[0, 1].item() |
| results.append({"domain": domain, "label": id2label[pred], "score": round(score, 4)}) |
| return results |
|
|
| |
| return mod.predict(model, domains) |
|
|
|
|
| def _cuda_available(): |
| try: |
| import torch |
| return torch.cuda.is_available() |
| except ImportError: |
| return False |
|
|