dga-logit / model.py
Reynier's picture
Upload model.py with huggingface_hub
df826c3 verified
"""
DGA-Logit: TF-IDF + Lexical Features + Logistic Regression for DGA detection.
Trained on 54 DGA families (~845K samples).
artifacts.joblib contains: {'model': LR, 'vectorizer': TF-IDF, 'scaler': StandardScaler}
"""
import re
import math
import numpy as np
import pandas as pd
from collections import Counter
from urllib.parse import urlparse
VOWELS = set('aeiou')
CONSONANTS = set('bcdfghjklmnpqrstvwxyz')
DOMAIN_RE = re.compile(r'^[a-z0-9.-]+$')
LETTER_RE = re.compile(r'[a-z]')
DIGIT_RE = re.compile(r'[0-9]')
def _hostname_from_url(value: str) -> str:
parsed = urlparse(value)
if parsed.netloc:
return parsed.netloc
if parsed.scheme and parsed.path:
return parsed.path
return value
def normalize_domain(value: str) -> str:
domain = str(value).strip().lower()
domain = _hostname_from_url(domain)
domain = domain.split('@')[-1].split('/')[0].split(':')[0].rstrip('.')
domain = re.sub(r'\s+', '', domain)
if not DOMAIN_RE.match(domain):
domain = re.sub(r'[^a-z0-9.-]', '', domain)
return domain
def shannon_entropy(value: str) -> float:
if not value:
return 0.0
counts = Counter(value)
total = len(value)
return -sum((n / total) * math.log2(n / total) for n in counts.values())
def _max_run(value: str, matcher) -> int:
best = current = 0
for ch in value:
if matcher.match(ch):
current += 1
best = max(best, current)
else:
current = 0
return best
def _split_parts(domain: str):
parts = [p for p in domain.split('.') if p]
if not parts:
return '', ''
sld = parts[-2] if len(parts) >= 2 else parts[-1]
return sld, parts[-1]
def _extract_lexical_features(domains: pd.Series) -> np.ndarray:
rows = []
for value in domains:
domain = normalize_domain(value)
sld, tld = _split_parts(domain)
letters = [c for c in domain if c.isalpha()]
digits = [c for c in domain if c.isdigit()]
chars = [c for c in domain if c.isalnum()]
vowel_count = sum(1 for c in letters if c in VOWELS)
consonant_count = sum(1 for c in letters if c in CONSONANTS)
length = max(len(domain), 1)
rows.append([
len(domain),
len(sld),
len(tld),
max(domain.count('.') - 1, 0),
len(digits) / length,
vowel_count / length,
consonant_count / length,
(len(set(chars)) / max(len(chars), 1)) if chars else 0.0,
domain.count('-'),
domain.count('.'),
_max_run(domain, DIGIT_RE),
_max_run(domain, LETTER_RE),
shannon_entropy(domain),
float(domain[:1].isdigit()),
float(domain[-1:].isdigit()),
])
return np.asarray(rows, dtype=float)
def load_model(artifacts_path: str):
"""Load artifacts dict from joblib file."""
import joblib
return joblib.load(artifacts_path)
def predict(artifacts, domains):
"""
Predict DGA vs legit for a list of domain strings.
artifacts: dict with keys 'model', 'vectorizer', 'scaler'
Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}]
"""
from scipy import sparse
if isinstance(domains, str):
domains = [domains]
model = artifacts['model']
vectorizer = artifacts['vectorizer']
scaler = artifacts['scaler']
series = pd.Series(domains)
domains_norm = series.map(normalize_domain)
X_tfidf = vectorizer.transform(domains_norm)
X_lex = scaler.transform(_extract_lexical_features(series))
X = sparse.hstack([X_tfidf, sparse.csr_matrix(X_lex)], format='csr')
scores = model.predict_proba(X)[:, 1]
preds = (scores >= 0.5).astype(int)
return [
{"domain": d, "label": "dga" if p == 1 else "legit", "score": round(float(s), 4)}
for d, p, s in zip(domains, preds, scores)
]