""" DGA-Logit: TF-IDF + Lexical Features + Logistic Regression for DGA detection. Trained on 54 DGA families (~845K samples). artifacts.joblib contains: {'model': LR, 'vectorizer': TF-IDF, 'scaler': StandardScaler} """ import re import math import numpy as np import pandas as pd from collections import Counter from urllib.parse import urlparse VOWELS = set('aeiou') CONSONANTS = set('bcdfghjklmnpqrstvwxyz') DOMAIN_RE = re.compile(r'^[a-z0-9.-]+$') LETTER_RE = re.compile(r'[a-z]') DIGIT_RE = re.compile(r'[0-9]') def _hostname_from_url(value: str) -> str: parsed = urlparse(value) if parsed.netloc: return parsed.netloc if parsed.scheme and parsed.path: return parsed.path return value def normalize_domain(value: str) -> str: domain = str(value).strip().lower() domain = _hostname_from_url(domain) domain = domain.split('@')[-1].split('/')[0].split(':')[0].rstrip('.') domain = re.sub(r'\s+', '', domain) if not DOMAIN_RE.match(domain): domain = re.sub(r'[^a-z0-9.-]', '', domain) return domain def shannon_entropy(value: str) -> float: if not value: return 0.0 counts = Counter(value) total = len(value) return -sum((n / total) * math.log2(n / total) for n in counts.values()) def _max_run(value: str, matcher) -> int: best = current = 0 for ch in value: if matcher.match(ch): current += 1 best = max(best, current) else: current = 0 return best def _split_parts(domain: str): parts = [p for p in domain.split('.') if p] if not parts: return '', '' sld = parts[-2] if len(parts) >= 2 else parts[-1] return sld, parts[-1] def _extract_lexical_features(domains: pd.Series) -> np.ndarray: rows = [] for value in domains: domain = normalize_domain(value) sld, tld = _split_parts(domain) letters = [c for c in domain if c.isalpha()] digits = [c for c in domain if c.isdigit()] chars = [c for c in domain if c.isalnum()] vowel_count = sum(1 for c in letters if c in VOWELS) consonant_count = sum(1 for c in letters if c in CONSONANTS) length = max(len(domain), 1) rows.append([ len(domain), len(sld), len(tld), max(domain.count('.') - 1, 0), len(digits) / length, vowel_count / length, consonant_count / length, (len(set(chars)) / max(len(chars), 1)) if chars else 0.0, domain.count('-'), domain.count('.'), _max_run(domain, DIGIT_RE), _max_run(domain, LETTER_RE), shannon_entropy(domain), float(domain[:1].isdigit()), float(domain[-1:].isdigit()), ]) return np.asarray(rows, dtype=float) def load_model(artifacts_path: str): """Load artifacts dict from joblib file.""" import joblib return joblib.load(artifacts_path) def predict(artifacts, domains): """ Predict DGA vs legit for a list of domain strings. artifacts: dict with keys 'model', 'vectorizer', 'scaler' Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] """ from scipy import sparse if isinstance(domains, str): domains = [domains] model = artifacts['model'] vectorizer = artifacts['vectorizer'] scaler = artifacts['scaler'] series = pd.Series(domains) domains_norm = series.map(normalize_domain) X_tfidf = vectorizer.transform(domains_norm) X_lex = scaler.transform(_extract_lexical_features(series)) X = sparse.hstack([X_tfidf, sparse.csr_matrix(X_lex)], format='csr') scores = model.predict_proba(X)[:, 1] preds = (scores >= 0.5).astype(int) return [ {"domain": d, "label": "dga" if p == 1 else "legit", "score": round(float(s), 4)} for d, p, s in zip(domains, preds, scores) ]