| """ |
| DGA-Logit: TF-IDF + Lexical Features + Logistic Regression for DGA detection. |
| Trained on 54 DGA families (~845K samples). |
| artifacts.joblib contains: {'model': LR, 'vectorizer': TF-IDF, 'scaler': StandardScaler} |
| """ |
| import re |
| import math |
| import numpy as np |
| import pandas as pd |
| from collections import Counter |
| from urllib.parse import urlparse |
|
|
| VOWELS = set('aeiou') |
| CONSONANTS = set('bcdfghjklmnpqrstvwxyz') |
| DOMAIN_RE = re.compile(r'^[a-z0-9.-]+$') |
| LETTER_RE = re.compile(r'[a-z]') |
| DIGIT_RE = re.compile(r'[0-9]') |
|
|
|
|
| def _hostname_from_url(value: str) -> str: |
| parsed = urlparse(value) |
| if parsed.netloc: |
| return parsed.netloc |
| if parsed.scheme and parsed.path: |
| return parsed.path |
| return value |
|
|
|
|
| def normalize_domain(value: str) -> str: |
| domain = str(value).strip().lower() |
| domain = _hostname_from_url(domain) |
| domain = domain.split('@')[-1].split('/')[0].split(':')[0].rstrip('.') |
| domain = re.sub(r'\s+', '', domain) |
| if not DOMAIN_RE.match(domain): |
| domain = re.sub(r'[^a-z0-9.-]', '', domain) |
| return domain |
|
|
|
|
| def shannon_entropy(value: str) -> float: |
| if not value: |
| return 0.0 |
| counts = Counter(value) |
| total = len(value) |
| return -sum((n / total) * math.log2(n / total) for n in counts.values()) |
|
|
|
|
| def _max_run(value: str, matcher) -> int: |
| best = current = 0 |
| for ch in value: |
| if matcher.match(ch): |
| current += 1 |
| best = max(best, current) |
| else: |
| current = 0 |
| return best |
|
|
|
|
| def _split_parts(domain: str): |
| parts = [p for p in domain.split('.') if p] |
| if not parts: |
| return '', '' |
| sld = parts[-2] if len(parts) >= 2 else parts[-1] |
| return sld, parts[-1] |
|
|
|
|
| def _extract_lexical_features(domains: pd.Series) -> np.ndarray: |
| rows = [] |
| for value in domains: |
| domain = normalize_domain(value) |
| sld, tld = _split_parts(domain) |
| letters = [c for c in domain if c.isalpha()] |
| digits = [c for c in domain if c.isdigit()] |
| chars = [c for c in domain if c.isalnum()] |
| vowel_count = sum(1 for c in letters if c in VOWELS) |
| consonant_count = sum(1 for c in letters if c in CONSONANTS) |
| length = max(len(domain), 1) |
| rows.append([ |
| len(domain), |
| len(sld), |
| len(tld), |
| max(domain.count('.') - 1, 0), |
| len(digits) / length, |
| vowel_count / length, |
| consonant_count / length, |
| (len(set(chars)) / max(len(chars), 1)) if chars else 0.0, |
| domain.count('-'), |
| domain.count('.'), |
| _max_run(domain, DIGIT_RE), |
| _max_run(domain, LETTER_RE), |
| shannon_entropy(domain), |
| float(domain[:1].isdigit()), |
| float(domain[-1:].isdigit()), |
| ]) |
| return np.asarray(rows, dtype=float) |
|
|
|
|
| def load_model(artifacts_path: str): |
| """Load artifacts dict from joblib file.""" |
| import joblib |
| return joblib.load(artifacts_path) |
|
|
|
|
| def predict(artifacts, domains): |
| """ |
| Predict DGA vs legit for a list of domain strings. |
| artifacts: dict with keys 'model', 'vectorizer', 'scaler' |
| Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] |
| """ |
| from scipy import sparse |
|
|
| if isinstance(domains, str): |
| domains = [domains] |
|
|
| model = artifacts['model'] |
| vectorizer = artifacts['vectorizer'] |
| scaler = artifacts['scaler'] |
|
|
| series = pd.Series(domains) |
| domains_norm = series.map(normalize_domain) |
| X_tfidf = vectorizer.transform(domains_norm) |
| X_lex = scaler.transform(_extract_lexical_features(series)) |
| X = sparse.hstack([X_tfidf, sparse.csr_matrix(X_lex)], format='csr') |
|
|
| scores = model.predict_proba(X)[:, 1] |
| preds = (scores >= 0.5).astype(int) |
|
|
| return [ |
| {"domain": d, "label": "dga" if p == 1 else "legit", "score": round(float(s), 4)} |
| for d, p, s in zip(domains, preds, scores) |
| ] |
|
|