""" DGA-FANCI: Random Forest with 27 FANCI features for DGA detection. Trained on 54 DGA families. File size: ~1 GB (Random Forest with many estimators). IMPORTANT: FANCIFeatureExtractor and FANCIRandomForest must be defined here so that joblib can deserialize the saved model object. """ import re import math from collections import Counter import numpy as np import pandas as pd import tldextract class FANCIFeatureExtractor: def __init__(self): self.valid_tlds = {'com', 'org', 'net', 'edu', 'gov', 'mil', 'int', 'arpa', 'de', 'uk', 'fr', 'it', 'es', 'ru', 'cn', 'jp', 'br', 'au', 'ca'} self.vowels = set('aeiouAEIOU') self.consonants = set('bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ') def extract_e2ld(self, domain): try: extracted = tldextract.extract(domain) if extracted.suffix in ['dyndns.org', 'ddns.net']: return f"{extracted.subdomain}.{extracted.domain}" if extracted.subdomain else extracted.domain return extracted.domain except: return domain def get_dot_free_public_suffix_free(self, domain): e2ld = self.extract_e2ld(domain) return re.sub(r'[^a-zA-Z0-9]', '', e2ld) def _is_prefix_repetition(self, domain): parts = domain.split('.') if len(parts) < 2: return 0 base = parts[0] for i in range(1, len(parts) - 1): if not parts[i].startswith(base): return 0 return 1 def _is_hex(self, s): try: int(s, 16) return len(s) > 0 except: return False def _contains_ip(self, domain): return bool(re.search(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', domain)) def _consecutive_ratio(self, text, char_set): if not text: return 0 res, cur = [], 0 for c in text: if c in char_set: cur += 1 else: if cur >= 2: res.append(cur) cur = 0 if cur >= 2: res.append(cur) return sum(res) / max(1, len(text)) def _ngram_features(self, text): if not text: return {k: 0 for k in ['ngram_mean', 'ngram_std', 'ngram_min', 'ngram_max', 'ngram_median', 'ngram_q25', 'ngram_q75']} freqs = list(Counter(text).values()) return { 'ngram_mean': np.mean(freqs), 'ngram_std': np.std(freqs), 'ngram_min': np.min(freqs), 'ngram_max': np.max(freqs), 'ngram_median': np.median(freqs), 'ngram_q25': np.percentile(freqs, 25), 'ngram_q75': np.percentile(freqs, 75), } def _calculate_entropy(self, text): if not text: return 0 probs = [c / len(text) for c in Counter(text).values()] return -sum(p * math.log2(p) for p in probs) def extract_features(self, domain): domain = domain.lower().strip() e2ld = self.extract_e2ld(domain) parts = domain.split('.') ddsf = self.get_dot_free_public_suffix_free(domain) f = {} f['domain_length'] = len(domain) f['num_subdomains'] = max(0, len(parts) - 2) f['subdomain_length_mean'] = np.mean([len(p) for p in parts[:-1]]) if len(parts) > 1 else len(e2ld) f['has_www_prefix'] = 1 if domain.startswith('www.') else 0 f['has_valid_tld'] = 1 if parts[-1] in self.valid_tlds else 0 f['contains_single_char_subdomain'] = 1 if any(len(p) == 1 for p in parts[:-1]) else 0 f['is_exclusive_prefix_repetition'] = self._is_prefix_repetition(domain) f['contains_tld_as_subdomain'] = 1 if any(p in self.valid_tlds for p in parts[:-1]) else 0 total_sub = max(1, len(parts) - 1) f['ratio_digit_exclusive_subdomains'] = sum(1 for p in parts[:-1] if p.isdigit()) / total_sub f['ratio_hex_exclusive_subdomains'] = sum(1 for p in parts[:-1] if self._is_hex(p)) / total_sub f['underscore_ratio'] = ddsf.count('_') / max(1, len(ddsf)) f['contains_ip_address'] = 1 if self._contains_ip(domain) else 0 f['contains_digits'] = 1 if any(c.isdigit() for c in ddsf) else 0 f['vowel_ratio'] = sum(1 for c in ddsf if c in self.vowels) / max(1, len(ddsf)) f['digit_ratio'] = sum(1 for c in ddsf if c.isdigit()) / max(1, len(ddsf)) f['alphabet_cardinality'] = len(set(ddsf)) counts = Counter(ddsf) f['ratio_repeated_characters'] = sum(1 for c in counts.values() if c > 1) / max(1, len(set(ddsf))) f['ratio_consecutive_consonants'] = self._consecutive_ratio(ddsf, self.consonants) f['ratio_consecutive_digits'] = self._consecutive_ratio(ddsf, set('0123456789')) f.update(self._ngram_features(ddsf)) f['entropy'] = self._calculate_entropy(ddsf) return f class FANCIRandomForest: def __init__(self): pass def extract_features_from_dataframe(self, df): feature_list = [self.feature_extractor.extract_features(d) for d in df['domain']] feature_df = pd.DataFrame(feature_list) return feature_df.reindex(columns=self.feature_names, fill_value=0) def predict(self, domains): temp_df = pd.DataFrame({'domain': domains}) X = self.extract_features_from_dataframe(temp_df) preds = self.rf.predict(X) probs = self.rf.predict_proba(X) return [ {'domain': d, 'prediction': 'DGA' if preds[i] == 1 else 'Benign', 'dga_probability': probs[i][1]} for i, d in enumerate(domains) ] def load_model(model_path: str): """Load FANCI Random Forest from joblib file. The model was saved from a Colab notebook (__main__ context), so joblib looks for FANCIFeatureExtractor and FANCIRandomForest in __main__. We inject both classes there before calling joblib.load(). """ import sys import joblib main = sys.modules['__main__'] main.FANCIFeatureExtractor = FANCIFeatureExtractor main.FANCIRandomForest = FANCIRandomForest return joblib.load(model_path) def predict(model, domains): """ Predict DGA vs legit for a list of domain strings. model: FANCIRandomForest instance loaded from joblib. Returns list of dicts: [{"domain": ..., "label": "dga"/"legit", "score": float}] """ if isinstance(domains, str): domains = [domains] results = model.predict(domains) return [ { "domain": r["domain"], "label": "dga" if r["prediction"] == "DGA" else "legit", "score": round(float(r["dga_probability"]), 4), } for r in results ]